|
import os |
|
import gradio as gr |
|
import requests |
|
import inspect |
|
import asyncio |
|
from typing import Dict, List, AsyncGenerator, Union, Tuple, Optional |
|
|
|
|
|
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage |
|
from langchain_core.tools import tool |
|
from langchain_openai import ChatOpenAI |
|
from langgraph.checkpoint.memory import MemorySaver |
|
from langgraph.prebuilt import create_react_agent |
|
|
|
|
|
DEFAULT_API_URL = "http://127.0.0.1:8000" |
|
|
|
|
|
@tool |
|
def get_lat_lng(location_description: str) -> dict[str, float]: |
|
"""Get the latitude and longitude of a location.""" |
|
print(f"Tool: Getting lat/lng for {location_description}") |
|
|
|
if "tokyo" in location_description.lower(): |
|
return {"lat": 35.6895, "lng": 139.6917} |
|
elif "paris" in location_description.lower(): |
|
return {"lat": 48.8566, "lng": 2.3522} |
|
elif "new york" in location_description.lower(): |
|
return {"lat": 40.7128, "lng": -74.0060} |
|
else: |
|
return {"lat": 51.5072, "lng": -0.1276} |
|
|
|
@tool |
|
def get_weather(lat: float, lng: float) -> dict[str, str]: |
|
"""Get the weather at a location.""" |
|
print(f"Tool: Getting weather for lat={lat}, lng={lng}") |
|
|
|
if lat > 45: |
|
return {"temperature": "15°C", "description": "Cloudy"} |
|
elif lat > 30: |
|
return {"temperature": "25°C", "description": "Sunny"} |
|
else: |
|
return {"temperature": "30°C", "description": "Very Sunny"} |
|
|
|
|
|
class MyLangChainAgent: |
|
""" |
|
A sample LangChain agent class designed for interaction and submission. |
|
NOTE: The current tools (weather/location) are placeholders and WILL NOT |
|
correctly answer GAIA benchmark questions. This class structure |
|
demonstrates how to integrate an agent with the submission API. |
|
Replace LLM, tools, and potentially the agent type for actual GAIA tasks. |
|
""" |
|
def __init__(self, model_name="gpt-4", temperature=0): |
|
|
|
if not os.getenv("OPENAI_API_KEY"): |
|
raise ValueError("OPENAI_API_KEY environment variable not set.") |
|
|
|
self.llm = ChatOpenAI(temperature=temperature, model=model_name) |
|
self.tools = [get_lat_lng, get_weather] |
|
self.memory = MemorySaver() |
|
|
|
self.agent_executor = create_react_agent(self.llm, self.tools, checkpointer=self.memory) |
|
print("MyLangChainAgent initialized.") |
|
|
|
async def __call__(self, question: str, thread_id: str) -> AsyncGenerator[Union[str, Dict[str, str]], str]: |
|
""" |
|
Runs the agent asynchronously, yielding intermediate steps and returning the final answer. |
|
|
|
Args: |
|
question: The input question string. |
|
thread_id: A unique identifier for the conversation thread. |
|
|
|
Yields: |
|
Intermediate steps (tool calls/results) as strings or dicts. |
|
|
|
Returns: |
|
The final AI answer as a string. |
|
""" |
|
print(f"Agent executing for thread_id: {thread_id} on question: {question[:50]}...") |
|
lc_messages: List[BaseMessage] = [HumanMessage(content=question)] |
|
final_answer = "" |
|
full_response_content = "" |
|
|
|
async for chunk in self.agent_executor.astream_events( |
|
{"messages": lc_messages}, |
|
config={"configurable": {"thread_id": thread_id}}, |
|
version="v1" |
|
): |
|
event = chunk["event"] |
|
data = chunk["data"] |
|
|
|
|
|
if event == "on_chat_model_stream": |
|
content = data["chunk"].content |
|
if content: |
|
|
|
full_response_content += content |
|
|
|
|
|
|
|
elif event == "on_tool_start": |
|
tool_input_str = str(data.get('input', '')) |
|
yield f"🛠️ Using tool: **{data['name']}** with input: `{tool_input_str}`" |
|
|
|
elif event == "on_tool_end": |
|
tool_output_str = str(data.get('output', '')) |
|
yield f"✅ Tool **{data['name']}** finished.\nResult: `{tool_output_str}`" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final_answer = full_response_content.strip() |
|
print(f"Agent execution finished. Final Answer: {final_answer[:100]}...") |
|
|
|
|
|
return final_answer |
|
|
|
def __repr__(self) -> str: |
|
""" |
|
Return the source code required to reconstruct this agent, including |
|
the class definition, tool functions, and necessary imports. |
|
""" |
|
imports = [ |
|
"import os", |
|
"from typing import Dict, List, AsyncGenerator, Union, Tuple, Optional", |
|
"from langchain_core.messages import HumanMessage, AIMessage, BaseMessage", |
|
"from langchain_core.tools import tool", |
|
"from langchain_openai import ChatOpenAI", |
|
"from langgraph.checkpoint.memory import MemorySaver", |
|
"from langgraph.prebuilt import create_react_agent", |
|
"import inspect", |
|
"import asyncio", |
|
"\n" |
|
] |
|
|
|
tool_sources = [] |
|
for t in self.tools: |
|
try: |
|
tool_sources.append(inspect.getsource(t)) |
|
except (TypeError, OSError) as e: |
|
print(f"Warning: Could not get source for tool {t.__name__}: {e}") |
|
tool_sources.append(f"# Could not automatically get source for tool: {t.__name__}\n") |
|
|
|
|
|
class_source = inspect.getsource(MyLangChainAgent) |
|
|
|
|
|
full_source = "\n".join(imports) + "\n\n" + \ |
|
"\n\n".join(tool_sources) + "\n\n" + \ |
|
class_source |
|
return full_source |
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
agent_instance = MyLangChainAgent() |
|
except ValueError as e: |
|
print(f"ERROR initializing agent: {e}") |
|
|
|
agent_instance = None |
|
|
|
def format_chat_history(history: List[List[Optional[str]]]) -> List[Tuple[Optional[str], Optional[str]]]: |
|
"""Helper to format Gradio history for display.""" |
|
|
|
|
|
formatted = [] |
|
for turn in history: |
|
formatted.append(tuple(turn)) |
|
return formatted |
|
|
|
|
|
async def fetch_and_display_question(api_url: str): |
|
"""Calls the backend to get a random question.""" |
|
if not api_url: |
|
return "Please enter the API URL.", "", "", gr.update(value=""), gr.update(value="") |
|
|
|
question_url = f"{api_url.strip('/')}/random-question" |
|
print(f"Fetching question from: {question_url}") |
|
try: |
|
response = requests.get(question_url, timeout=10) |
|
response.raise_for_status() |
|
data = response.json() |
|
task_id = data.get("task_id") |
|
question_text = data.get("question") |
|
if task_id and question_text: |
|
print(f"Fetched Task ID: {task_id}") |
|
|
|
return "Question fetched successfully!", task_id, question_text, "", [] |
|
else: |
|
return "Error: Invalid data format received from API.", "", "", "", [] |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching question: {e}") |
|
return f"Error fetching question: {e}", "", "", "", [] |
|
except Exception as e: |
|
print(f"An unexpected error occurred: {e}") |
|
return f"An unexpected error occurred: {e}", "", "", "", [] |
|
|
|
|
|
async def run_agent_interaction( |
|
message: str, |
|
history: List[List[Optional[str]]], |
|
current_task_id: str, |
|
|
|
): |
|
"""Handles the chat interaction, runs the agent, yields steps, updates final answer state.""" |
|
if agent_instance is None: |
|
yield "Agent not initialized. Please check API keys and restart." |
|
return |
|
|
|
if not current_task_id: |
|
yield "Please fetch a question first using the button above." |
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
thread_id = f"gaia_task_{current_task_id}_{os.urandom(4).hex()}" |
|
|
|
print(f"Running agent for user message: {message[:50]}...") |
|
history.append([message, None]) |
|
|
|
final_agent_answer = None |
|
full_yielded_response = "" |
|
|
|
|
|
async for step in agent_instance(message, thread_id=thread_id): |
|
if isinstance(step, str): |
|
|
|
history[-1][1] = step |
|
yield format_chat_history(history) |
|
full_yielded_response = step |
|
|
|
|
|
|
|
|
|
|
|
|
|
final_agent_answer = step |
|
print(f"Agent final answer received: {final_agent_answer[:100]}...") |
|
|
|
|
|
if final_agent_answer: |
|
history[-1][1] = final_agent_answer |
|
elif full_yielded_response: |
|
|
|
history[-1][1] = full_yielded_response |
|
final_agent_answer = full_yielded_response |
|
else: |
|
history[-1][1] = "Agent did not produce a final answer." |
|
final_agent_answer = "" |
|
|
|
|
|
yield format_chat_history(history), final_agent_answer |
|
|
|
|
|
def submit_to_leaderboard( |
|
api_url: str, |
|
username: str, |
|
task_id: str, |
|
agent_answer: str, |
|
|
|
): |
|
"""Submits the agent's answer and code to the FastAPI backend.""" |
|
if agent_instance is None: |
|
return "Agent not initialized. Cannot submit." |
|
if not api_url: |
|
return "Please enter the API URL." |
|
if not username: |
|
return "Please enter your Hugging Face username." |
|
if not task_id: |
|
return "No task ID available. Please fetch a question first." |
|
if agent_answer is None or agent_answer.strip() == "": |
|
|
|
print("Warning: Submitting empty answer.") |
|
|
|
|
|
|
|
submit_url = f"{api_url.strip('/')}/submit" |
|
print(f"Submitting to: {submit_url}") |
|
|
|
|
|
try: |
|
agent_code = agent_instance.__repr__() |
|
|
|
except Exception as e: |
|
print(f"Error getting agent representation: {e}") |
|
return f"Error generating agent code for submission: {e}" |
|
|
|
|
|
submission_data = { |
|
"username": username.strip(), |
|
"agent_code": agent_code, |
|
"answers": [ |
|
{ |
|
"task_id": task_id, |
|
"submitted_answer": agent_answer |
|
} |
|
|
|
] |
|
} |
|
|
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=30) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
|
|
result_message = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Score: {result_data.get('score')}\n" |
|
f"Correct: {result_data.get('correct_count')}/{result_data.get('total_attempted')}\n" |
|
f"Message: {result_data.get('message')}\n" |
|
f"Timestamp: {result_data.get('timestamp')}" |
|
) |
|
print("Submission successful.") |
|
return result_message |
|
except requests.exceptions.HTTPError as e: |
|
|
|
error_detail = e.response.text |
|
try: |
|
error_json = e.response.json() |
|
error_detail = error_json.get('detail', error_detail) |
|
except requests.exceptions.JSONDecodeError: |
|
pass |
|
print(f"HTTP Error during submission: {e.response.status_code} - {error_detail}") |
|
return f"Submission Failed (HTTP {e.response.status_code}): {error_detail}" |
|
except requests.exceptions.RequestException as e: |
|
print(f"Network error during submission: {e}") |
|
return f"Submission Failed: Network error - {e}" |
|
except Exception as e: |
|
print(f"An unexpected error occurred during submission: {e}") |
|
return f"Submission Failed: An unexpected error occurred - {e}" |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Agent Evaluation Interface") |
|
gr.Markdown( |
|
"Fetch a random question from the evaluation API, interact with the agent " |
|
"(Note: the default agent answers weather questions, not GAIA), " |
|
"and submit the agent's final answer to the leaderboard." |
|
) |
|
|
|
|
|
|
|
current_task_id = gr.State("") |
|
current_question_text = gr.State("") |
|
current_agent_answer = gr.State("") |
|
|
|
|
|
with gr.Row(): |
|
api_url_input = gr.Textbox(label="FastAPI API URL", value=DEFAULT_API_URL) |
|
hf_username_input = gr.Textbox(label="Hugging Face Username") |
|
|
|
with gr.Row(): |
|
fetch_button = gr.Button("Get Random Question") |
|
submission_status_display = gr.Textbox(label="Status", interactive=False) |
|
|
|
with gr.Row(): |
|
question_display = gr.Textbox(label="Current Question", lines=3, interactive=False) |
|
|
|
gr.Markdown("---") |
|
gr.Markdown("## Agent Interaction") |
|
|
|
chatbot = gr.Chatbot(label="Agent Conversation", height=400) |
|
msg_input = gr.Textbox(label="Send a message to the Agent (or just observe)") |
|
|
|
|
|
final_answer_display = gr.Textbox(label="Agent's Final Answer (Extracted)", interactive=False) |
|
|
|
gr.Markdown("---") |
|
gr.Markdown("## Submission") |
|
with gr.Row(): |
|
submit_button = gr.Button("Submit Current Answer to Leaderboard") |
|
|
|
submission_result_display = gr.Markdown(label="Submission Result", value="*Submit an answer to see the result here.*") |
|
|
|
|
|
|
|
|
|
|
|
fetch_button.click( |
|
fn=fetch_and_display_question, |
|
inputs=[api_url_input], |
|
outputs=[ |
|
submission_status_display, |
|
current_task_id, |
|
question_display, |
|
final_answer_display, |
|
chatbot |
|
] |
|
) |
|
|
|
|
|
msg_input.submit( |
|
fn=run_agent_interaction, |
|
inputs=[ |
|
msg_input, |
|
chatbot, |
|
current_task_id, |
|
|
|
], |
|
outputs=[ |
|
chatbot, |
|
current_agent_answer |
|
] |
|
).then( |
|
|
|
lambda answer_state: answer_state, |
|
inputs=[current_agent_answer], |
|
outputs=[final_answer_display] |
|
) |
|
|
|
|
|
msg_input.submit(lambda: "", None, msg_input, queue=False) |
|
|
|
|
|
|
|
submit_button.click( |
|
fn=submit_to_leaderboard, |
|
inputs=[ |
|
api_url_input, |
|
hf_username_input, |
|
current_task_id, |
|
current_agent_answer, |
|
|
|
], |
|
outputs=[submission_result_display] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
if agent_instance is None: |
|
print("\nFATAL: Agent could not be initialized. Gradio app will not run correctly.") |
|
print("Please ensure OPENAI_API_KEY is set and valid.\n") |
|
|
|
|
|
else: |
|
print("Launching Gradio Interface...") |
|
demo.launch(debug=True, server_name="0.0.0.0") |