|
import os |
|
import gradio as gr |
|
from gradio import ChatMessage |
|
import requests |
|
from typing import Dict, List, Generator, Sequence |
|
from langchain_core.messages import HumanMessage, BaseMessage |
|
from langchain_core.tools import tool |
|
from langchain_openai import ChatOpenAI |
|
from langgraph.checkpoint.memory import MemorySaver |
|
from langgraph.prebuilt import create_react_agent |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@tool |
|
def get_lat_lng(location_description: str) -> dict[str, float]: |
|
"""Get the latitude and longitude of a location description (e.g., 'Paris', 'Tokyo, Japan').""" |
|
|
|
logger.info(f"Tool 'get_lat_lng' called with location: {location_description}") |
|
|
|
if "london" in location_description.lower(): |
|
return {"lat": 51.5074, "lng": -0.1278} |
|
elif "tokyo" in location_description.lower(): |
|
return {"lat": 35.6895, "lng": 139.6917} |
|
elif "paris" in location_description.lower(): |
|
return {"lat": 48.8566, "lng": 2.3522} |
|
elif "new york" in location_description.lower(): |
|
return {"lat": 40.7128, "lng": -74.0060} |
|
else: |
|
|
|
return {"lat": 51.1, "lng": -0.1} |
|
|
|
@tool |
|
def get_weather(lat: float, lng: float) -> dict[str, str]: |
|
"""Get the current weather conditions at a specific latitude and longitude.""" |
|
|
|
logger.info(f"Tool 'get_weather' called with lat: {lat}, lng: {lng}") |
|
|
|
if 40 < lat < 50: |
|
return {"temperature": "18°C", "description": "Cloudy"} |
|
elif lat > 50: |
|
return {"temperature": "15°C", "description": "Rainy"} |
|
else: |
|
return {"temperature": "25°C", "description": "Sunny"} |
|
|
|
|
|
def initialize_agent(): |
|
"""Initializes the LangChain agent.""" |
|
api_key = os.getenv("OPENAI_API_KEY") |
|
if not api_key: |
|
logger.error("OPENAI_API_KEY environment variable not set.") |
|
|
|
|
|
|
|
return None |
|
|
|
try: |
|
llm = ChatOpenAI(temperature=0, model="gpt-4", openai_api_key=api_key) |
|
|
|
|
|
memory = MemorySaver() |
|
tools = [get_lat_lng, get_weather] |
|
agent_executor = create_react_agent(llm, tools, checkpointer=memory) |
|
logger.info("LangChain agent initialized successfully.") |
|
return agent_executor |
|
except Exception as e: |
|
logger.error(f"Failed to initialize LangChain agent: {e}", exc_info=True) |
|
return None |
|
|
|
|
|
agent_executor = initialize_agent() |
|
|
|
|
|
def stream_from_agent(message: str, history: List[List[str]]) -> Generator[Sequence[ChatMessage], None, None]: |
|
""" |
|
Processes user messages through the LangChain agent, yielding intermediate steps. |
|
|
|
Args: |
|
message: The user's input message. |
|
history: The conversation history provided by Gradio (list of [user, assistant] pairs). |
|
|
|
Yields: |
|
A sequence of Gradio ChatMessage objects representing the agent's thoughts and actions. |
|
""" |
|
global agent_executor |
|
|
|
if agent_executor is None: |
|
error_msg = "Agent initialization failed. Please check the logs and ensure the OPENAI_API_KEY secret is set correctly." |
|
yield [ChatMessage(role="assistant", content=error_msg)] |
|
return |
|
|
|
logger.info(f"Received message: {message}") |
|
logger.info(f"History: {history}") |
|
|
|
|
|
|
|
|
|
|
|
langchain_message = HumanMessage(content=message) |
|
|
|
messages_to_display: List[ChatMessage] = [] |
|
final_response_content = "" |
|
|
|
try: |
|
|
|
|
|
|
|
|
|
thread_id = "shared_weather_thread_123" |
|
config = {"configurable": {"thread_id": thread_id}} |
|
|
|
|
|
for chunk in agent_executor.stream({"messages": [langchain_message]}, config=config): |
|
logger.debug(f"Agent chunk received: {chunk}") |
|
|
|
|
|
if agent_action := chunk.get("agent"): |
|
|
|
|
|
if agent_action.get("messages"): |
|
for msg in agent_action["messages"]: |
|
if hasattr(msg, 'tool_calls') and msg.tool_calls: |
|
for tool_call in msg.tool_calls: |
|
|
|
tool_msg = ChatMessage( |
|
role="assistant", |
|
content=f"Parameters: `{tool_call['args']}`", |
|
metadata={ |
|
"title": f"🛠️ Calling Tool: `{tool_call['name']}`", |
|
"tool_call_id": tool_call["id"], |
|
} |
|
) |
|
messages_to_display.append(tool_msg) |
|
yield messages_to_display |
|
|
|
elif hasattr(msg, 'content') and isinstance(msg.content, str) and msg.content: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
if tool_chunk := chunk.get("tools"): |
|
if tool_chunk.get("messages"): |
|
for tool_response in tool_chunk["messages"]: |
|
|
|
found = False |
|
for i, msg in enumerate(messages_to_display): |
|
if msg.metadata and msg.metadata.get("tool_call_id") == tool_response.tool_call_id: |
|
|
|
updated_content = msg.content + f"\nResult: `{tool_response.content}`" |
|
messages_to_display[i] = ChatMessage( |
|
role=msg.role, |
|
content=updated_content, |
|
metadata=msg.metadata |
|
) |
|
found = True |
|
break |
|
if found: |
|
yield messages_to_display |
|
else: |
|
|
|
tool_result_msg = ChatMessage( |
|
role="tool", |
|
content=f"Tool Result (`{tool_response.tool_call_id}`): `{tool_response.content}`" |
|
) |
|
messages_to_display.append(tool_result_msg) |
|
yield messages_to_display |
|
|
|
|
|
|
|
|
|
if agent_final := chunk.get("agent"): |
|
if agent_final.get("messages"): |
|
last_message = agent_final["messages"][-1] |
|
|
|
if hasattr(last_message, 'content') and not (hasattr(last_message, 'tool_calls') and last_message.tool_calls): |
|
final_response_content = last_message.content |
|
|
|
|
|
|
|
if final_response_content: |
|
|
|
is_already_displayed = False |
|
if messages_to_display: |
|
last_displayed = messages_to_display[-1] |
|
|
|
if not (last_displayed.metadata and "tool_call_id" in last_displayed.metadata) and last_displayed.content == final_response_content: |
|
is_already_displayed = True |
|
|
|
if not is_already_displayed: |
|
final_msg = ChatMessage(role="assistant", content=final_response_content) |
|
messages_to_display.append(final_msg) |
|
yield messages_to_display |
|
elif not messages_to_display: |
|
|
|
yield [ChatMessage(role="assistant", content="Sorry, I couldn't process that request.")] |
|
|
|
|
|
except Exception as e: |
|
logger.error(f"Error during agent stream: {e}", exc_info=True) |
|
error_message = f"An error occurred: {e}" |
|
yield [ChatMessage(role="assistant", content=error_message)] |
|
|
|
|
|
|
|
|
|
demo = gr.ChatInterface( |
|
fn=stream_from_agent, |
|
chatbot=gr.Chatbot( |
|
bubble_full_width=False, |
|
show_copy_button=True, |
|
render=False |
|
), |
|
input_components=[gr.Textbox(label="Ask the weather assistant")], |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
title="🌤️ Weather Assistant with LangGraph ReAct Agent", |
|
description="Ask about the weather anywhere! Watch the agent think step-by-step as it uses tools.", |
|
examples=[ |
|
["What's the weather like in Tokyo?"], |
|
["Is it sunny in Paris right now?"], |
|
["Should I bring an umbrella in New York today?"] |
|
], |
|
cache_examples=False, |
|
theme="soft", |
|
retry_btn=None, |
|
undo_btn="Delete Previous", |
|
clear_btn="Clear Conversation", |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
demo.launch(server_name="0.0.0.0", server_port=7860) |