import os import gradio as gr import requests import inspect # To get source code for __repr__ import asyncio from typing import Dict, List, AsyncGenerator, Union, Tuple, Optional # --- LangChain Specific Imports --- from langchain_core.messages import HumanMessage, AIMessage, BaseMessage from langchain_core.tools import tool from langchain_openai import ChatOpenAI from langgraph.checkpoint.memory import MemorySaver from langgraph.prebuilt import create_react_agent # --- Constants --- DEFAULT_API_URL = "http://127.0.0.1:8000" # Default URL for your FastAPI app # --- Tools (Keep these defined globally or ensure they are included in __repr__) --- @tool def get_lat_lng(location_description: str) -> dict[str, float]: """Get the latitude and longitude of a location.""" print(f"Tool: Getting lat/lng for {location_description}") # Replace with actual API call in a real app if "tokyo" in location_description.lower(): return {"lat": 35.6895, "lng": 139.6917} elif "paris" in location_description.lower(): return {"lat": 48.8566, "lng": 2.3522} elif "new york" in location_description.lower(): return {"lat": 40.7128, "lng": -74.0060} else: return {"lat": 51.5072, "lng": -0.1276} # Default London @tool def get_weather(lat: float, lng: float) -> dict[str, str]: """Get the weather at a location.""" print(f"Tool: Getting weather for lat={lat}, lng={lng}") # Replace with actual API call in a real app if lat > 45: # Northern locations return {"temperature": "15°C", "description": "Cloudy"} elif lat > 30: # Mid locations return {"temperature": "25°C", "description": "Sunny"} else: # Southern locations return {"temperature": "30°C", "description": "Very Sunny"} # --- Agent Class Definition --- class MyLangChainAgent: """ A sample LangChain agent class designed for interaction and submission. NOTE: The current tools (weather/location) are placeholders and WILL NOT correctly answer GAIA benchmark questions. This class structure demonstrates how to integrate an agent with the submission API. Replace LLM, tools, and potentially the agent type for actual GAIA tasks. """ def __init__(self, model_name="gpt-4", temperature=0): # Ensure API key is available if not os.getenv("OPENAI_API_KEY"): raise ValueError("OPENAI_API_KEY environment variable not set.") self.llm = ChatOpenAI(temperature=temperature, model=model_name) self.tools = [get_lat_lng, get_weather] # Use the globally defined tools self.memory = MemorySaver() # Create the agent executor self.agent_executor = create_react_agent(self.llm, self.tools, checkpointer=self.memory) print("MyLangChainAgent initialized.") async def __call__(self, question: str, thread_id: str) -> AsyncGenerator[Union[str, Dict[str, str]], str]: """ Runs the agent asynchronously, yielding intermediate steps and returning the final answer. Args: question: The input question string. thread_id: A unique identifier for the conversation thread. Yields: Intermediate steps (tool calls/results) as strings or dicts. Returns: The final AI answer as a string. """ print(f"Agent executing for thread_id: {thread_id} on question: {question[:50]}...") lc_messages: List[BaseMessage] = [HumanMessage(content=question)] final_answer = "" full_response_content = "" # Store the complete AI response chunks async for chunk in self.agent_executor.astream_events( {"messages": lc_messages}, config={"configurable": {"thread_id": thread_id}}, version="v1" ): event = chunk["event"] data = chunk["data"] # print(f"DEBUG: Event: {event}, Data Keys: {data.keys()}") # Debugging line if event == "on_chat_model_stream": content = data["chunk"].content if content: # print(f"DEBUG: AI Chunk: {content}") # Debugging line full_response_content += content # Yield potentially incomplete response for live typing effect if needed # yield {"type": "stream", "content": content } elif event == "on_tool_start": tool_input_str = str(data.get('input', '')) yield f"🛠️ Using tool: **{data['name']}** with input: `{tool_input_str}`" elif event == "on_tool_end": tool_output_str = str(data.get('output', '')) yield f"✅ Tool **{data['name']}** finished.\nResult: `{tool_output_str}`" # Detect the end of the conversation turn (heuristic) # The 'on_chain_end' event for the top-level graph might signal the end. # Or check the 'messages' list in the final state if available. # For create_react_agent, the final AIMessage is often the last main event. # We will capture the last full AI message content after the loop. # After iterating through all chunks, the final answer should be in full_response_content final_answer = full_response_content.strip() print(f"Agent execution finished. Final Answer: {final_answer[:100]}...") # Yield the complete final answer distinctly if needed # yield {"type": "final_answer_marker", "content": final_answer} # Example marker return final_answer # Return the final answer def __repr__(self) -> str: """ Return the source code required to reconstruct this agent, including the class definition, tool functions, and necessary imports. """ imports = [ "import os", "from typing import Dict, List, AsyncGenerator, Union, Tuple, Optional", "from langchain_core.messages import HumanMessage, AIMessage, BaseMessage", "from langchain_core.tools import tool", "from langchain_openai import ChatOpenAI", "from langgraph.checkpoint.memory import MemorySaver", "from langgraph.prebuilt import create_react_agent", "import inspect", # Needed if repr itself uses inspect dynamically "import asyncio", # Needed for async call "\n" ] # Get source code of tool functions tool_sources = [] for t in self.tools: try: tool_sources.append(inspect.getsource(t)) except (TypeError, OSError) as e: print(f"Warning: Could not get source for tool {t.__name__}: {e}") tool_sources.append(f"# Could not automatically get source for tool: {t.__name__}\n") # Get source code of the class itself class_source = inspect.getsource(MyLangChainAgent) # Combine imports, tools, and class definition full_source = "\n".join(imports) + "\n\n" + \ "\n\n".join(tool_sources) + "\n\n" + \ class_source return full_source # --- Gradio UI and Logic --- # Initialize the agent (do this once outside the request functions) # Handle potential API key error during initialization try: agent_instance = MyLangChainAgent() except ValueError as e: print(f"ERROR initializing agent: {e}") # Provide a dummy agent or exit if critical agent_instance = None # Or raise SystemExit("Agent initialization failed") def format_chat_history(history: List[List[Optional[str]]]) -> List[Tuple[Optional[str], Optional[str]]]: """Helper to format Gradio history for display.""" # Gradio's history format is List[List[user_msg | None, ai_msg | None]] # We want List[Tuple[user_msg | None, ai_msg | None]] for Chatbot formatted = [] for turn in history: formatted.append(tuple(turn)) return formatted async def fetch_and_display_question(api_url: str): """Calls the backend to get a random question.""" if not api_url: return "Please enter the API URL.", "", "", gr.update(value=""), gr.update(value="") # Clear chat too question_url = f"{api_url.strip('/')}/random-question" print(f"Fetching question from: {question_url}") try: response = requests.get(question_url, timeout=10) response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) data = response.json() task_id = data.get("task_id") question_text = data.get("question") if task_id and question_text: print(f"Fetched Task ID: {task_id}") # Return updates for Gradio components: Status, Task ID, Question Text, Clear Agent Answer, Clear Chat return "Question fetched successfully!", task_id, question_text, "", [] # Clears answer and chat history else: return "Error: Invalid data format received from API.", "", "", "", [] except requests.exceptions.RequestException as e: print(f"Error fetching question: {e}") return f"Error fetching question: {e}", "", "", "", [] except Exception as e: print(f"An unexpected error occurred: {e}") return f"An unexpected error occurred: {e}", "", "", "", [] async def run_agent_interaction( message: str, history: List[List[Optional[str]]], current_task_id: str, # agent_instance: MyLangChainAgent # Agent passed via state potentially ): """Handles the chat interaction, runs the agent, yields steps, updates final answer state.""" if agent_instance is None: yield "Agent not initialized. Please check API keys and restart." return if not current_task_id: yield "Please fetch a question first using the button above." return # The 'message' here is the user's latest input in the chat. # For this workflow, we assume the main input is the fetched question. # We'll use the fetched question (implicitly stored) to run the agent. # If you want interactive chat *about* the question, the logic needs adjustment. # For simplicity, let's assume the user's message *is* the question or a prompt related to it. # In the GAIA context, usually, the agent just runs on the provided question directly. # We'll use the `current_task_id` to generate a unique thread_id for LangGraph memory. thread_id = f"gaia_task_{current_task_id}_{os.urandom(4).hex()}" print(f"Running agent for user message: {message[:50]}...") history.append([message, None]) # Add user message to history final_agent_answer = None full_yielded_response = "" # Use the agent's __call__ method async for step in agent_instance(message, thread_id=thread_id): if isinstance(step, str): # Intermediate step (tool call, result, maybe stream chunk) history[-1][1] = step # Update the AI's response in the last turn yield format_chat_history(history) # Update chatbot UI full_yielded_response = step # Track last yielded message # If __call__ yielded dicts for streaming, handle here: # elif isinstance(step, dict) and step.get("type") == "stream": # history[-1][1] = (history[-1][1] or "") + step["content"] # yield format_chat_history(history) # After the loop, the `step` variable holds the return value (final answer) final_agent_answer = step print(f"Agent final answer received: {final_agent_answer[:100]}...") # Update the history with the definitive final answer if final_agent_answer: history[-1][1] = final_agent_answer # Replace intermediate steps with final one elif full_yielded_response: # Fallback if final answer wasn't returned correctly but we yielded something history[-1][1] = full_yielded_response final_agent_answer = full_yielded_response # Use the last yielded message as answer else: history[-1][1] = "Agent did not produce a final answer." final_agent_answer = "" # Ensure it's a string # Yield the final state of the history and update the hidden state for the final answer yield format_chat_history(history), final_agent_answer def submit_to_leaderboard( api_url: str, username: str, task_id: str, agent_answer: str, # agent_instance: MyLangChainAgent # Pass agent via state if needed ): """Submits the agent's answer and code to the FastAPI backend.""" if agent_instance is None: return "Agent not initialized. Cannot submit." if not api_url: return "Please enter the API URL." if not username: return "Please enter your Hugging Face username." if not task_id: return "No task ID available. Please fetch a question first." if agent_answer is None or agent_answer.strip() == "": # Check if None or empty # Maybe allow submission of empty answer? Depends on requirements. print("Warning: Submitting empty answer.") # return "Agent has not provided an answer yet." submit_url = f"{api_url.strip('/')}/submit" print(f"Submitting to: {submit_url}") # Get agent code try: agent_code = agent_instance.__repr__() # print(f"Agent Code (first 200 chars):\n{agent_code[:200]}...") # Debug except Exception as e: print(f"Error getting agent representation: {e}") return f"Error generating agent code for submission: {e}" # Prepare submission data according to Pydantic model in FastAPI submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": [ { "task_id": task_id, "submitted_answer": agent_answer # Use the stored final answer } # Add more answers here if submitting a batch ] } try: response = requests.post(submit_url, json=submission_data, timeout=30) response.raise_for_status() result_data = response.json() # Format the result nicely for display result_message = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Score: {result_data.get('score')}\n" f"Correct: {result_data.get('correct_count')}/{result_data.get('total_attempted')}\n" f"Message: {result_data.get('message')}\n" f"Timestamp: {result_data.get('timestamp')}" ) print("Submission successful.") return result_message except requests.exceptions.HTTPError as e: # Try to get detail from response body if available error_detail = e.response.text try: error_json = e.response.json() error_detail = error_json.get('detail', error_detail) except requests.exceptions.JSONDecodeError: pass # Keep the raw text if not JSON print(f"HTTP Error during submission: {e.response.status_code} - {error_detail}") return f"Submission Failed (HTTP {e.response.status_code}): {error_detail}" except requests.exceptions.RequestException as e: print(f"Network error during submission: {e}") return f"Submission Failed: Network error - {e}" except Exception as e: print(f"An unexpected error occurred during submission: {e}") return f"Submission Failed: An unexpected error occurred - {e}" # --- Build Gradio Interface using Blocks --- with gr.Blocks() as demo: gr.Markdown("# Agent Evaluation Interface") gr.Markdown( "Fetch a random question from the evaluation API, interact with the agent " "(Note: the default agent answers weather questions, not GAIA), " "and submit the agent's final answer to the leaderboard." ) # --- State Variables --- # Store current task info, agent's final answer, and the agent instance current_task_id = gr.State("") current_question_text = gr.State("") current_agent_answer = gr.State("") # Stores the final answer string from the agent # agent_state = gr.State(agent_instance) # Pass agent instance via state with gr.Row(): api_url_input = gr.Textbox(label="FastAPI API URL", value=DEFAULT_API_URL) hf_username_input = gr.Textbox(label="Hugging Face Username") with gr.Row(): fetch_button = gr.Button("Get Random Question") submission_status_display = gr.Textbox(label="Status", interactive=False) # For fetch status with gr.Row(): question_display = gr.Textbox(label="Current Question", lines=3, interactive=False) gr.Markdown("---") gr.Markdown("## Agent Interaction") chatbot = gr.Chatbot(label="Agent Conversation", height=400) msg_input = gr.Textbox(label="Send a message to the Agent (or just observe)") # Input for chat # Hidden Textbox to display the final extracted answer (optional, for clarity) final_answer_display = gr.Textbox(label="Agent's Final Answer (Extracted)", interactive=False) gr.Markdown("---") gr.Markdown("## Submission") with gr.Row(): submit_button = gr.Button("Submit Current Answer to Leaderboard") submission_result_display = gr.Markdown(label="Submission Result", value="*Submit an answer to see the result here.*") # Use Markdown for better formatting # --- Component Interactions --- # Fetch Button Action fetch_button.click( fn=fetch_and_display_question, inputs=[api_url_input], outputs=[ submission_status_display, # Shows fetch status current_task_id, # Updates hidden state question_display, # Updates question text box final_answer_display, # Clears old final answer chatbot # Clears chat history ] ) # Chat Submission Action (when user sends message in chat) msg_input.submit( fn=run_agent_interaction, inputs=[ msg_input, # User message from chat input chatbot, # Current chat history current_task_id, # Current task ID from state # agent_state # Pass agent instance state ], outputs=[ chatbot, # Updated chat history current_agent_answer # Update the hidden state holding the final answer ] ).then( # After agent runs, update the visible "Final Answer" box from the state lambda answer_state: answer_state, inputs=[current_agent_answer], outputs=[final_answer_display] ) # Clear message input after submission msg_input.submit(lambda: "", None, msg_input, queue=False) # Submit Button Action submit_button.click( fn=submit_to_leaderboard, inputs=[ api_url_input, hf_username_input, current_task_id, current_agent_answer, # Use the stored final answer state # agent_state # Pass agent instance state ], outputs=[submission_result_display] # Display result message ) if __name__ == "__main__": if agent_instance is None: print("\nFATAL: Agent could not be initialized. Gradio app will not run correctly.") print("Please ensure OPENAI_API_KEY is set and valid.\n") # Optionally exit here if agent is critical # exit(1) else: print("Launching Gradio Interface...") demo.launch(debug=True, server_name="0.0.0.0") # Share=False by default for security