|
from smolagents import (
|
|
CodeAgent,
|
|
DuckDuckGoSearchTool,
|
|
HfApiModel,
|
|
LiteLLMModel,
|
|
OpenAIServerModel,
|
|
PythonInterpreterTool,
|
|
tool,
|
|
InferenceClientModel
|
|
)
|
|
from typing import List, Dict, Any, Optional
|
|
import os
|
|
import tempfile
|
|
import re
|
|
import json
|
|
import requests
|
|
from urllib.parse import urlparse
|
|
|
|
@tool
|
|
def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
|
|
"""
|
|
Save content to a temporary file and return the path.
|
|
Useful for processing files from the GAIA API.
|
|
|
|
Args:
|
|
content: The content to save to the file
|
|
filename: Optional filename, will generate a random name if not provided
|
|
|
|
Returns:
|
|
Path to the saved file
|
|
"""
|
|
temp_dir = tempfile.gettempdir()
|
|
if filename is None:
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False)
|
|
filepath = temp_file.name
|
|
else:
|
|
filepath = os.path.join(temp_dir, filename)
|
|
|
|
|
|
with open(filepath, 'w') as f:
|
|
f.write(content)
|
|
|
|
return f"File saved to {filepath}. You can read this file to process its contents."
|
|
|
|
@tool
|
|
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
|
|
"""
|
|
Download a file from a URL and save it to a temporary location.
|
|
|
|
Args:
|
|
url: The URL to download from
|
|
filename: Optional filename, will generate one based on URL if not provided
|
|
|
|
Returns:
|
|
Path to the downloaded file
|
|
"""
|
|
try:
|
|
|
|
if not filename:
|
|
path = urlparse(url).path
|
|
filename = os.path.basename(path)
|
|
if not filename:
|
|
|
|
import uuid
|
|
filename = f"downloaded_{uuid.uuid4().hex[:8]}"
|
|
|
|
|
|
temp_dir = tempfile.gettempdir()
|
|
filepath = os.path.join(temp_dir, filename)
|
|
|
|
|
|
response = requests.get(url, stream=True)
|
|
response.raise_for_status()
|
|
|
|
|
|
with open(filepath, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
return f"File downloaded to {filepath}. You can now process this file."
|
|
except Exception as e:
|
|
return f"Error downloading file: {str(e)}"
|
|
|
|
@tool
|
|
def extract_text_from_image(image_path: str) -> str:
|
|
"""
|
|
Extract text from an image using pytesseract (if available).
|
|
|
|
Args:
|
|
image_path: Path to the image file
|
|
|
|
Returns:
|
|
Extracted text or error message
|
|
"""
|
|
try:
|
|
|
|
import pytesseract
|
|
from PIL import Image
|
|
|
|
|
|
image = Image.open(image_path)
|
|
|
|
|
|
text = pytesseract.image_to_string(image)
|
|
|
|
return f"Extracted text from image:\n\n{text}"
|
|
except ImportError:
|
|
return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system."
|
|
except Exception as e:
|
|
return f"Error extracting text from image: {str(e)}"
|
|
|
|
@tool
|
|
def analyze_csv_file(file_path: str, query: str) -> str:
|
|
"""
|
|
Analyze a CSV file using pandas and answer a question about it.
|
|
|
|
Args:
|
|
file_path: Path to the CSV file
|
|
query: Question about the data
|
|
|
|
Returns:
|
|
Analysis result or error message
|
|
"""
|
|
try:
|
|
import pandas as pd
|
|
|
|
|
|
df = pd.read_csv(file_path)
|
|
|
|
|
|
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
|
|
result += f"Columns: {', '.join(df.columns)}\n\n"
|
|
|
|
|
|
result += "Summary statistics:\n"
|
|
result += str(df.describe())
|
|
|
|
return result
|
|
except ImportError:
|
|
return "Error: pandas is not installed. Please install it with 'pip install pandas'."
|
|
except Exception as e:
|
|
return f"Error analyzing CSV file: {str(e)}"
|
|
|
|
@tool
|
|
def analyze_excel_file(file_path: str, query: str) -> str:
|
|
"""
|
|
Analyze an Excel file using pandas and answer a question about it.
|
|
|
|
Args:
|
|
file_path: Path to the Excel file
|
|
query: Question about the data
|
|
|
|
Returns:
|
|
Analysis result or error message
|
|
"""
|
|
try:
|
|
import pandas as pd
|
|
|
|
|
|
df = pd.read_excel(file_path)
|
|
|
|
|
|
result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
|
|
result += f"Columns: {', '.join(df.columns)}\n\n"
|
|
|
|
|
|
result += "Summary statistics:\n"
|
|
result += str(df.describe())
|
|
|
|
return result
|
|
except ImportError:
|
|
return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'."
|
|
except Exception as e:
|
|
return f"Error analyzing Excel file: {str(e)}"
|
|
|
|
class GAIAAgent:
|
|
def __init__(
|
|
self,
|
|
model_type: str = "HfApiModel",
|
|
model_id: Optional[str] = None,
|
|
api_key: Optional[str] = None,
|
|
api_base: Optional[str] = None,
|
|
temperature: float = 0.2,
|
|
executor_type: str = "local",
|
|
additional_imports: List[str] = None,
|
|
additional_tools: List[Any] = None,
|
|
system_prompt: Optional[str] = None,
|
|
verbose: bool = False,
|
|
provider: Optional[str] = None,
|
|
timeout: Optional[int] = None
|
|
):
|
|
"""
|
|
Initialize a GAIAAgent with specified configuration
|
|
|
|
Args:
|
|
model_type: Type of model to use (HfApiModel, LiteLLMModel, OpenAIServerModel, InferenceClientModel)
|
|
model_id: ID of the model to use
|
|
api_key: API key for the model provider
|
|
api_base: Base URL for API calls
|
|
temperature: Temperature for text generation
|
|
executor_type: Type of executor for code execution ('local' or 'e2b')
|
|
additional_imports: Additional Python modules to allow importing
|
|
additional_tools: Additional tools to provide to the agent
|
|
system_prompt: Custom system prompt to use (not directly used, kept for backward compatibility)
|
|
verbose: Enable verbose logging
|
|
provider: Provider for InferenceClientModel (e.g., "hf-inference")
|
|
timeout: Timeout in seconds for API calls
|
|
"""
|
|
|
|
self.verbose = verbose
|
|
self.system_prompt = system_prompt
|
|
|
|
|
|
if model_type == "HfApiModel":
|
|
if api_key is None:
|
|
api_key = os.getenv("HUGGINGFACEHUB_API_TOKEN")
|
|
if not api_key:
|
|
raise ValueError("No Hugging Face token provided. Please set HUGGINGFACEHUB_API_TOKEN environment variable or pass api_key parameter.")
|
|
|
|
if self.verbose:
|
|
print(f"Using Hugging Face token: {api_key[:5]}...")
|
|
|
|
self.model = HfApiModel(
|
|
model_id=model_id or "meta-llama/Llama-3-70B-Instruct",
|
|
token=api_key,
|
|
temperature=temperature
|
|
)
|
|
elif model_type == "InferenceClientModel":
|
|
if api_key is None:
|
|
api_key = os.getenv("HUGGINGFACEHUB_API_TOKEN")
|
|
if not api_key:
|
|
raise ValueError("No Hugging Face token provided. Please set HUGGINGFACEHUB_API_TOKEN environment variable or pass api_key parameter.")
|
|
|
|
if self.verbose:
|
|
print(f"Using Hugging Face token: {api_key[:5]}...")
|
|
|
|
self.model = InferenceClientModel(
|
|
model_id=model_id or "meta-llama/Llama-3-70B-Instruct",
|
|
provider=provider or "hf-inference",
|
|
token=api_key,
|
|
timeout=timeout or 120,
|
|
temperature=temperature
|
|
)
|
|
elif model_type == "LiteLLMModel":
|
|
from smolagents import LiteLLMModel
|
|
self.model = LiteLLMModel(
|
|
model_id=model_id or "gpt-4o",
|
|
api_key=api_key or os.getenv("OPENAI_API_KEY"),
|
|
temperature=temperature
|
|
)
|
|
elif model_type == "OpenAIServerModel":
|
|
|
|
xai_api_key = os.getenv("XAI_API_KEY")
|
|
xai_api_base = os.getenv("XAI_API_BASE")
|
|
|
|
|
|
if xai_api_key and api_key is None:
|
|
api_key = xai_api_key
|
|
if self.verbose:
|
|
print(f"Using xAI API key: {api_key[:5]}...")
|
|
|
|
|
|
if api_key is None:
|
|
api_key = os.getenv("OPENAI_API_KEY")
|
|
if not api_key:
|
|
raise ValueError("No OpenAI API key provided. Please set OPENAI_API_KEY or XAI_API_KEY environment variable or pass api_key parameter.")
|
|
|
|
|
|
if xai_api_base and api_base is None:
|
|
api_base = xai_api_base
|
|
if self.verbose:
|
|
print(f"Using xAI API base URL: {api_base}")
|
|
|
|
|
|
if api_base is None:
|
|
api_base = os.getenv("AGENT_API_BASE")
|
|
if api_base and self.verbose:
|
|
print(f"Using API base from AGENT_API_BASE: {api_base}")
|
|
|
|
self.model = OpenAIServerModel(
|
|
model_id=model_id or "gpt-4o",
|
|
api_key=api_key,
|
|
api_base=api_base,
|
|
temperature=temperature
|
|
)
|
|
else:
|
|
raise ValueError(f"Unknown model type: {model_type}")
|
|
|
|
if self.verbose:
|
|
print(f"Initialized model: {model_type} - {model_id}")
|
|
|
|
|
|
self.tools = [
|
|
DuckDuckGoSearchTool(),
|
|
PythonInterpreterTool(),
|
|
save_and_read_file,
|
|
download_file_from_url,
|
|
analyze_csv_file,
|
|
analyze_excel_file
|
|
]
|
|
|
|
|
|
try:
|
|
import pytesseract
|
|
from PIL import Image
|
|
self.tools.append(extract_text_from_image)
|
|
if self.verbose:
|
|
print("Added image processing tool")
|
|
except ImportError:
|
|
if self.verbose:
|
|
print("Image processing libraries not available")
|
|
|
|
|
|
if additional_tools:
|
|
self.tools.extend(additional_tools)
|
|
|
|
if self.verbose:
|
|
print(f"Initialized with {len(self.tools)} tools")
|
|
|
|
|
|
self.imports = ["pandas", "numpy", "datetime", "json", "re", "math", "os", "requests", "csv", "urllib"]
|
|
if additional_imports:
|
|
self.imports.extend(additional_imports)
|
|
|
|
|
|
executor_kwargs = {}
|
|
if executor_type == "e2b":
|
|
try:
|
|
|
|
from e2b_code_interpreter import Sandbox
|
|
if self.verbose:
|
|
print("Using e2b executor")
|
|
except ImportError:
|
|
if self.verbose:
|
|
print("e2b dependencies not found, falling back to local executor")
|
|
executor_type = "local"
|
|
|
|
self.agent = CodeAgent(
|
|
tools=self.tools,
|
|
model=self.model,
|
|
additional_authorized_imports=self.imports,
|
|
executor_type=executor_type,
|
|
executor_kwargs=executor_kwargs,
|
|
verbosity_level=2 if self.verbose else 0
|
|
)
|
|
|
|
if self.verbose:
|
|
print("Agent initialized and ready")
|
|
|
|
def answer_question(self, question: str, task_file_path: Optional[str] = None) -> str:
|
|
"""
|
|
Process a GAIA benchmark question and return the answer
|
|
|
|
Args:
|
|
question: The question to answer
|
|
task_file_path: Optional path to a file associated with the question
|
|
|
|
Returns:
|
|
The answer to the question
|
|
"""
|
|
try:
|
|
if self.verbose:
|
|
print(f"Processing question: {question}")
|
|
if task_file_path:
|
|
print(f"With associated file: {task_file_path}")
|
|
|
|
|
|
context = question
|
|
file_content = None
|
|
|
|
|
|
if task_file_path:
|
|
try:
|
|
with open(task_file_path, 'r') as f:
|
|
file_content = f.read()
|
|
|
|
|
|
import os
|
|
file_ext = os.path.splitext(task_file_path)[1].lower()
|
|
|
|
context = f"""
|
|
Question: {question}
|
|
|
|
This question has an associated file. Here is the file content:
|
|
|
|
```{file_ext}
|
|
{file_content}
|
|
```
|
|
|
|
Analyze the file content above to answer the question.
|
|
"""
|
|
except Exception as file_e:
|
|
context = f"""
|
|
Question: {question}
|
|
|
|
This question has an associated file at path: {task_file_path}
|
|
However, there was an error reading the file: {file_e}
|
|
You can still try to answer the question based on the information provided.
|
|
"""
|
|
|
|
|
|
|
|
if question.startswith(".") or ".rewsna eht sa" in question:
|
|
context = f"""
|
|
This question appears to be in reversed text. Here's the reversed version:
|
|
{question[::-1]}
|
|
|
|
Now answer the question above. Remember to format your answer exactly as requested.
|
|
"""
|
|
|
|
|
|
full_prompt = f"""{context}
|
|
|
|
When answering, provide ONLY the precise answer requested.
|
|
Do not include explanations, steps, reasoning, or additional text.
|
|
Be direct and specific. GAIA benchmark requires exact matching answers.
|
|
For example, if asked "What is the capital of France?", respond simply with "Paris".
|
|
"""
|
|
|
|
|
|
answer = self.agent.run(full_prompt)
|
|
|
|
|
|
|
|
answer = self._clean_answer(answer)
|
|
|
|
if self.verbose:
|
|
print(f"Generated answer: {answer}")
|
|
|
|
return answer
|
|
except Exception as e:
|
|
error_msg = f"Error answering question: {e}"
|
|
if self.verbose:
|
|
print(error_msg)
|
|
return error_msg
|
|
|
|
def _clean_answer(self, answer: any) -> str:
|
|
"""
|
|
Clean up the answer to remove common prefixes and formatting
|
|
that models often add but that can cause exact match failures.
|
|
|
|
Args:
|
|
answer: The raw answer from the model
|
|
|
|
Returns:
|
|
The cleaned answer as a string
|
|
"""
|
|
|
|
if not isinstance(answer, str):
|
|
|
|
if isinstance(answer, float):
|
|
|
|
|
|
if answer.is_integer():
|
|
formatted_answer = str(int(answer))
|
|
else:
|
|
|
|
if abs(answer) >= 1000:
|
|
formatted_answer = f"${answer:,.2f}"
|
|
else:
|
|
formatted_answer = str(answer)
|
|
return formatted_answer
|
|
elif isinstance(answer, int):
|
|
return str(answer)
|
|
else:
|
|
|
|
return str(answer)
|
|
|
|
|
|
|
|
answer = answer.strip()
|
|
|
|
|
|
prefixes_to_remove = [
|
|
"The answer is ",
|
|
"Answer: ",
|
|
"Final answer: ",
|
|
"The result is ",
|
|
"To answer this question: ",
|
|
"Based on the information provided, ",
|
|
"According to the information: ",
|
|
]
|
|
|
|
for prefix in prefixes_to_remove:
|
|
if answer.startswith(prefix):
|
|
answer = answer[len(prefix):].strip()
|
|
|
|
|
|
if (answer.startswith('"') and answer.endswith('"')) or (answer.startswith("'") and answer.endswith("'")):
|
|
answer = answer[1:-1].strip()
|
|
|
|
return answer |