Spaces:
Running
Running
from mcp.server.fastmcp import FastMCP | |
import time | |
from litellm import completion | |
import os | |
import glob | |
import http.client | |
import json | |
import openpyxl | |
import shutil | |
from google import genai | |
import pexpect | |
client = genai.Client(api_key="AIzaSyDtP05TyoIy9j0uPL7_wLEhgQEE75AZQSc") | |
source_dir = "/app/uploads/temp" | |
destination_dir = "/app/code_interpreter" | |
files_list=[] | |
downloaded_files=[] | |
# os.environ.get('GROQ_API_KEY') | |
os.environ["GROQ_API_KEY"] ="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" | |
os.environ["GEMINI_API_KEY"] ="AIzaSyAQgAtQPpY0bQaCqCISGxeyF6tpDePx-Jg" | |
os.environ["OPENROUTER_API_KEY"] = "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" | |
mcp = FastMCP("code_sandbox") | |
data={} | |
result="" | |
import requests | |
import os | |
from bs4 import BeautifulSoup # For parsing HTML | |
Parent=pexpect.spawn('bash') | |
def transfer_files(): | |
try: | |
for item in os.listdir(source_dir): | |
item_path = os.path.join(source_dir, item) | |
if os.path.isdir(item_path): # Check if it's a directory | |
for filename in os.listdir(item_path): | |
source_file_path = os.path.join(item_path, filename) | |
destination_file_path = os.path.join(destination_dir, filename) | |
if not os.path.exists(destination_file_path): | |
shutil.move(source_file_path, destination_file_path) | |
except: | |
pass | |
def transfer_files2(): | |
try: | |
for item in os.listdir("/app/uploads"): | |
if "temp" not in item: | |
item_path = os.path.join(source_dir, item) | |
if os.path.isdir(item_path): # Check if it's a directory | |
for filename in os.listdir(item_path): | |
source_file_path = os.path.join(item_path, filename) | |
destination_file_path = os.path.join(destination_dir, filename.split("__")[1]) | |
if not os.path.exists(destination_file_path): | |
shutil.move(source_file_path, destination_file_path) | |
except: | |
pass | |
def upload_file(file_path, upload_url): | |
"""Uploads a file to the specified server endpoint.""" | |
try: | |
# Check if the file exists | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
# Prepare the file for upload | |
with open(file_path, "rb") as file: | |
files = {"file": (os.path.basename(file_path), file)} # Important: Provide filename | |
# Send the POST request | |
response = requests.post(upload_url, files=files) | |
# Check the response status code | |
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) | |
# Parse and print the response | |
if response.status_code == 200: | |
print(f"File uploaded successfully. Filename returned by server: {response.text}") | |
return response.text # Return the filename returned by the server | |
else: | |
print(f"Upload failed. Status code: {response.status_code}, Response: {response.text}") | |
return None | |
except FileNotFoundError as e: | |
print(e) | |
return None # or re-raise the exception if you want the program to halt | |
except requests.exceptions.RequestException as e: | |
print(f"Upload failed. Network error: {e}") | |
return None | |
TOKEN = "5182224145:AAEjkSlPqV-Q3rH8A9X8HfCDYYEQ44v_qy0" | |
chat_id = "5075390513" | |
from requests_futures.sessions import FuturesSession | |
session = FuturesSession() | |
def run(cmd, timeout_sec,forever_cmd): | |
global Parent | |
if forever_cmd == 'true': | |
Parent.close() | |
Parent = pexpect.spawn("bash") | |
command="cd /app/code_interpreter/ && "+cmd | |
Parent.sendline(command) | |
Parent.readline().decode() | |
return str(Parent.readline().decode()) + "[INFO] The opened ports can be externally accessed at https://suitable-liked-ibex.ngrok-free.app/ " | |
t=time.time() | |
child = pexpect.spawn("bash") | |
output="" | |
command="cd /app/code_interpreter/ && "+cmd | |
child.sendline('PROMPT_COMMAND="echo END"') | |
child.readline().decode() | |
child.readline().decode() | |
child.sendline(command) | |
while (not child.eof() ) and (time.time()-t<300): | |
x=child.readline().decode() | |
output=output+x | |
print(x) | |
if "END" in x : | |
output=output.replace("END","") | |
child.close() | |
break | |
if "true" in forever_cmd: | |
break | |
return output | |
def analyse_audio(audiopath,query) -> dict: | |
"""Ask another AI model about audios.The AI model can listen to the audio and give answers.Eg-query:Generate detailed minutes of meeting from the audio clip,audiopath='/app/code_interpreter/<audioname>'.Note:The audios are automatically present in the /app/code_interpreter directory.""" | |
transfer_files2() | |
myfile = client.files.upload(file=audiopath) | |
response = client.models.generate_content( | |
model='gemini-2.0-flash', | |
contents=[query, myfile] | |
) | |
return {"Output":str(response.text)} | |
def analyse_video(videopath,query) -> dict: | |
"""Ask another AI model about videos.The AI model can see the videos and give answers.Eg-query:Create a very detailed transcript and summary of the video,videopath='/app/code_interpreter/<videoname>'Note:The videos are automatically present in the /app/code_interpreter directory.""" | |
transfer_files2() | |
video_file = client.files.upload(file=videopath) | |
while video_file.state.name == "PROCESSING": | |
print('.', end='') | |
time.sleep(1) | |
video_file = client.files.get(name=video_file.name) | |
if video_file.state.name == "FAILED": | |
raise ValueError(video_file.state.name) | |
response = client.models.generate_content( | |
model='gemini-2.0-flash', | |
contents=[query, video_file] | |
) | |
return {"Output":str(response.text)} | |
def analyse_images(imagepath,query) -> dict: | |
"""Ask another AI model about images.The AI model can see the images and give answers.Eg-query:Who is the person in this image?,imagepath='/app/code_interpreter/<imagename>'.Note:The images are automatically present in the /app/code_interpreter directory.""" | |
transfer_files2() | |
video_file = client.files.upload(file=imagepath) | |
response = client.models.generate_content( | |
model='gemini-2.0-flash', | |
contents=[query, video_file] | |
) | |
return {"Output":str(response.text)} | |
# @mcp.tool() | |
# def generate_images(imagepath,query) -> dict: | |
# """Ask another AI model to generate images based on the query and the image path.Set image path as an empty string , if you dont want to edit images , but rather generate images.Eg-query:Generate a cartoon version of this image,imagepath='/app/code_interpreter/<imagename>'.Note:The images are automatically present in the /app/code_interpreter directory.""" | |
# transfer_files2() | |
# video_file = client.files.upload(file=imagepath) | |
# response = client.models.generate_content( | |
# model='gemini-2.0-flash', | |
# contents=[query, video_file] | |
# ) | |
# return {"Output":str(response.text)} | |
def create_code_files(filename: str, code: str) -> dict: | |
"""Create code files by passing the the filename as well the entire code to write.The file is created by default in the /app/code_interpreter directory.Note:All user uploaded files that you might need to work upon are stored in the /app/code_interpreter directory.""" | |
global destination_dir | |
transfer_files() | |
transfer_files2() | |
f = open(os.path.join(destination_dir, filename), "w") | |
f.write(code) | |
f.close() | |
return {"info":"task completed. The referenced code files were created successfully. "} | |
def run_code(python_packages:str,filename: str, code: str,start_cmd:str,forever_cmd:str) -> dict: | |
""" | |
Execute code in a controlled environment with package installation and file handling. | |
Args: | |
python_packages[Output an empty string ,if using any other language.]: Space-separated list of packages to install (e.g., "numpy matplotlib"). | |
Preinstalled packages: gradio, XlsxWriter, openpyxl. | |
filename: Name of the file to create (stored in /app/code_interpreter/). | |
code: Full code to write to the file. | |
start_cmd: Command to execute the file (e.g., "python /app/code_interpreter/app.py" | |
or "bash /app/code_interpreter/app.py"). | |
forever_cmd: If 'true', the command will run indefinitely.Set to 'true', when runnig a website/server.Run all servers/website on port 1337. If 'false', the command will time out after 300 second and the result will be returned. | |
Notes: | |
- All user-uploaded files are in /app/code_interpreter/. | |
- After execution, embed a download link (or display images/gifs/videos directly in markdown format) in your response. | |
""" | |
global destination_dir | |
package_names = python_packages.strip() | |
command="pip install" | |
if package_names != "" or package_names != " ": | |
stdot=run( | |
f"{command} --break-system-packages {package_names}", timeout_sec=300,forever_cmd= 'false' | |
) | |
transfer_files2() | |
transfer_files() | |
f = open(os.path.join(destination_dir, filename), "w") | |
f.write(code) | |
f.close() | |
global files_list | |
stdot=run(start_cmd, 300,forever_cmd) | |
onlyfiles = glob.glob("/app/code_interpreter/*") | |
onlyfiles=list(set(onlyfiles)-set(files_list)) | |
uploaded_filenames=[] | |
for files in onlyfiles: | |
try: | |
uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload") | |
uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}") | |
except: | |
pass | |
files_list=onlyfiles | |
return {"output":stdot,"Files_download_link":uploaded_filenames} | |
def run_code_files(start_cmd:str,forever_cmd:str) -> dict: | |
"""Executes a shell command to run code files from /app/code_interpreter. | |
Runs the given `start_cmd`. The execution behavior depends on `forever_cmd`. | |
Any server/website started should use port 1337. | |
Args: | |
start_cmd (str): The shell command to execute the code. | |
(e.g., ``python /app/code_interpreter/app.py``). | |
Files must be in ``/app/code_interpreter``. | |
forever_cmd (str): Execution mode. | |
- ``'true'``: Runs indefinitely (for servers/websites). | |
- ``'false'``: Runs up to 300s, captures output. | |
Returns: | |
dict: A dictionary containing: | |
- ``'output'`` (str): Captured stdout (mainly when forever_cmd='false'). | |
- ``'Files_download_link'`` (Any): Links/identifiers for downloadable files. | |
Notes: | |
- After execution, embed a download link (or display images/gifs/videos directly in markdown format) in your response. | |
""" | |
global files_list | |
stdot=run(start_cmd, 300,forever_cmd) | |
onlyfiles = glob.glob("/app/code_interpreter/*") | |
onlyfiles=list(set(onlyfiles)-set(files_list)) | |
uploaded_filenames=[] | |
for files in onlyfiles: | |
try: | |
uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload") | |
uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}") | |
except: | |
pass | |
files_list=onlyfiles | |
return {"output":stdot,"Files_download_link":uploaded_filenames} | |
def run_shell_command(cmd:str,forever_cmd:str) -> dict: | |
"""Executes a shell command in a sandboxed Alpine Linux environment. | |
Runs the provided `cmd` string within a bash shell. Commands are executed | |
relative to the `/app/code_interpreter/` working directory by default. | |
The execution behavior (indefinite run vs. timeout) is controlled by | |
the `forever_cmd` parameter. | |
Important Environment Notes: | |
- The execution environment is **Alpine Linux**. Commands should be | |
compatible (e.g., `apk add` instead of `apt-get install`). | |
- `sudo` is not available and not required. | |
- Standard bash features like `&&`, `||`, pipes (`|`), etc., are supported. | |
- When installing python packages , add an argument --break-system-packages to the pip install command. | |
Args: | |
cmd (str): The shell command to execute. | |
Example: ``mkdir test_dir && ls -l`` | |
forever_cmd (str): Determines the execution mode. | |
- ``'true'``: Runs the command indefinitely. Suitable | |
for starting servers or long-running processes. | |
Output capture might be limited. | |
- ``'false'``: Runs the command until completion or | |
a 300-second timeout, whichever comes first. | |
Captures standard output. | |
Returns: | |
dict: A dictionary containing the execution results: | |
- ``'output'`` (str): The captured standard output (stdout) and potentially | |
standard error (stderr) from the command. | |
""" | |
transfer_files() | |
transfer_files2() | |
output=run(cmd, 300,forever_cmd) | |
return {"output":output} | |
def install_python_packages(python_packages:str) -> dict: | |
"""python_packages to install seperated by space.eg-(python packages:numpy matplotlib).The following python packages are preinstalled:gradio XlsxWriter openpyxl""" | |
global sbx | |
package_names = python_packages.strip() | |
command="pip install" | |
if not package_names: | |
return | |
stdot=run( | |
f"{command} --break-system-packages {package_names}", timeout_sec=300, forever_cmd= 'false' | |
) | |
return {"stdout":stdot,"info":"Ran package installation command"} | |
def get_youtube_transcript(videoid:str) -> dict: | |
"""Get the transcript of a youtube video by passing the video id.Eg videoid=ZacjOVVgoLY""" | |
conn = http.client.HTTPSConnection("youtube-transcript3.p.rapidapi.com") | |
headers = { | |
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", | |
'x-rapidapi-host': "youtube-transcript3.p.rapidapi.com" | |
} | |
conn.request("GET",f"/api/transcript?videoId={videoid}", headers=headers) | |
res = conn.getresponse() | |
data = res.read() | |
return json.loads(data) | |
def read_excel_file(filename) -> dict: | |
"""Reads the contents of an excel file.Returns a dict with key :value pair = cell location:cell content.Always run this command first , when working with excels.The excel file is automatically present in the /app/code_interpreter directory. """ | |
global destination_dir | |
transfer_files2() | |
workbook = openpyxl.load_workbook(os.path.join(destination_dir, filename)) | |
# Create an empty dictionary to store the data | |
excel_data_dict = {} | |
# Iterate over all sheets | |
for sheet_name in workbook.sheetnames: | |
sheet = workbook[sheet_name] | |
# Iterate over all rows and columns | |
for row in sheet.iter_rows(): | |
for cell in row: | |
# Get cell coordinate (e.g., 'A1') and value | |
cell_coordinate = cell.coordinate | |
cell_value = cell.value | |
if cell_value is not None: | |
excel_data_dict[cell_coordinate] = str(cell_value) | |
return excel_data_dict | |
def scrape_websites(url_list:list,query:str) -> list: | |
"""Get the entire content of websites by passing in the url lists.query is the question you want to ask about the content of the website.e.g-query:Give .pptx links in the website.Note:Max urls in url_list is 3.""" | |
conn = http.client.HTTPSConnection("scrapeninja.p.rapidapi.com") | |
headers = { | |
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", | |
'x-rapidapi-host': "scrapeninja.p.rapidapi.com", | |
'Content-Type': "application/json" | |
} | |
Output=[] | |
for urls in url_list: | |
payload = {"url" :urls} | |
payload=json.dumps(payload) | |
conn.request("POST", "/scrape", payload, headers) | |
res = conn.getresponse() | |
data = res.read() | |
content=str(data.decode("utf-8")) | |
response = completion( | |
model="gemini/gemini-1.5-pro", | |
messages=[ | |
{"role": "user", "content": f"Output the following content in the human readable format.Try to conserve all the links and the text.Try to ouput the entire content.Remove the html codes so its human readable.Also answer this question about the content in a seperate paragraph:{query}.Here is the content:{content}"} | |
], | |
) | |
Output.append(response.choices[0].message.content) | |
return {"website_content":Output} | |
if __name__ == "__main__": | |
# Initialize and run the server | |
Ngrok=pexpect.spawn('bash') | |
Ngrok.sendline("ngrok http --url=suitable-liked-ibex.ngrok-free.app 1337") | |
Ngrok.readline().decode() | |
mcp.run(transport='stdio') |