Spaces:
Sleeping
Sleeping
from smolagents import CodeAgent, DuckDuckGoSearchTool, OpenAIServerModel, tool, FinalAnswerTool, VisitWebpageTool, LiteLLMModel | |
import gradio as gr | |
import requests | |
import pytz | |
from typing import Optional, Tuple, Union, Any, List, Dict | |
import re | |
# from google.colab import userdata # Assuming Colab environment | |
import io | |
import contextlib | |
import sys | |
import traceback | |
import os | |
import threading # <<< ADDED | |
import queue # <<< ADDED | |
import time # <<< ADDED | |
def parse_height_from_text( | |
text: str, | |
prefer_units: str = "cm", | |
max_expected: float = 1000.0 | |
) -> Optional[float]: | |
""" | |
Extracts and converts the FIRST valid height measurement found in a given text string into centimeters. | |
**Usage Workflow:** | |
1. Use this FIRST on the initial user query to get the user's height. Store this value. | |
2. LATER, after getting web search results, you might use this again on individual search result snippets | |
if they contain height information (e.g., "Character X is 6'2\" tall"). | |
Args: | |
text: Input text containing potential height measurements (can be user query or web search snippet). | |
prefer_units: Preferred unit system ('cm', 'm', 'ft', 'in') if units are ambiguous in the text. Default is 'cm'. | |
max_expected: Safety limit to ignore potentially nonsensical values during parsing (in cm). | |
Returns: | |
float | None: Height in centimeters if a valid measurement is found and parsed, otherwise None. | |
""" | |
height_pattern = r""" | |
(?:^|\b|(?<=\s))(\d+\.?\d*)\s*(?:(cm|centi.*)|(m|meters?|metres)|(ft|feet|')|(in|inches?|"))\b | |
""" | |
matches = re.finditer(height_pattern, text, re.IGNORECASE | re.VERBOSE | re.UNICODE) | |
unit_conversion = {"cm": 1.0, "m": 100.0, "ft": 30.48, "in": 2.54} | |
for match in matches: | |
try: | |
value = float(match.group(1)) | |
raw_unit = next((g for g in match.groups()[1:] if g), "").lower() | |
if any(u in raw_unit for u in ["cm", "centi"]): unit = "cm" | |
elif any(u in raw_unit for u in ["m", "meter", "metre"]): unit = "m" | |
elif any(u in raw_unit for u in ["ft", "feet", "'"]): unit = "ft" | |
elif any(u in raw_unit for u in ["in", "inch", "\""]): unit = "in" | |
else: unit = prefer_units | |
converted = value * unit_conversion[unit] | |
if 0.1 < converted < max_expected: return round(converted, 2) | |
except (ValueError, KeyError, TypeError): continue | |
return None | |
def create_comparison_statement( | |
target: str, | |
user_height: float, | |
reference_height: float, | |
) -> str: | |
""" | |
Creates ONE human-readable comparison statement based on height proximity. Output format example: | |
"👤 You're almost the same height as Sherlock Holmes! (185.0 cm vs 183.0 cm)" | |
**Usage Workflow:** | |
1. Call this tool *AFTER* finding a target name, extracting their height, and validating it (e.g., `if 50 < reference_height < 250:`). | |
2. Call this for *each* validated target you want to include. | |
3. Collect the string outputs and combine them for the final answer. | |
Args: | |
target: The name of the character/object/person being compared against (extracted from search results). | |
user_height: The user's height in centimeters. | |
reference_height: The specific reference target's height in centimeters (parsed and VALIDATED from search results). | |
Returns: | |
str: A single formatted comparison string indicating height similarity. | |
""" | |
diff = user_height - reference_height | |
abs_diff = abs(diff) | |
comparison_phrase = "" | |
# Define thresholds for different phrases (adjust as needed) | |
exact_threshold = 1.0 # Within 1 cm difference | |
close_threshold = 4.0 # Within 4 cm difference | |
if abs_diff <= exact_threshold: | |
comparison_phrase = f"You're exactly the same height as {target}!" | |
elif abs_diff <= close_threshold: | |
if diff > 0: | |
comparison_phrase = f"You're slightly taller than {target}!" | |
else: | |
comparison_phrase = f"You're slightly shorter than {target}!" | |
elif diff > 0: # User is significantly taller | |
comparison_phrase = f"You're noticeably taller than {target}." | |
else: # User is significantly shorter | |
comparison_phrase = f"You're noticeably shorter than {target}." | |
# Use a simple emoji or none | |
emoji = "👤" | |
return ( | |
f"{emoji} {comparison_phrase} " | |
f"({user_height:.1f} cm vs {reference_height:.1f} cm)" | |
) | |
# # --- Instantiate Model --- | |
# try: | |
# OR_API_KEY = userdata.get("OR_TOKEN") | |
# if not OR_API_KEY: raise ValueError("OR_TOKEN not found in Colab userdata.") | |
# except (ImportError, NameError): | |
# import os | |
# OR_API_KEY = os.environ.get("OR_TOKEN") | |
# if not OR_API_KEY: raise ValueError("API Key OR_TOKEN not found in environment variables.") | |
# model = OpenAIServerModel( | |
# model_id='qwen/qwen-2.5-coder-32b-instruct:free', | |
# api_base='https://openrouter.ai/api/v1', | |
# api_key=userdata.get("OR_TOKEN"), | |
# ) | |
# Replace all calls to HfApiModel | |
llm_model = LiteLLMModel( | |
model_id="gemini/gemini-2.0-flash", # you can see other model names here: https://cloud.google.com/vertex-ai/generative-ai/docs/learn/models. It is important to prefix the name with "gemini/" | |
api_key=os.environ.get('GEM_TOKEN'), | |
max_tokens=8192 | |
) | |
# --- Task Generation Function (No change needed here) --- | |
# It generates the *instructions* for the agent run | |
def create_height_comparison_task(user_query: str) -> str: | |
"""Combines user query with detailed instructions encouraging diverse searches and robust parsing.""" | |
escaped_query = user_query.replace("'", "\\'") # Simple escaping | |
instructions = f""" | |
TASK: Analyze the user query '{escaped_query}' and perform the following steps to find height comparisons with **diverse figures (people, characters)**: | |
1. **Parse User Height:** Use `parse_height_from_text` on the user query ('{escaped_query}') to get the user's height in cm. Print and store it. If none found, use `final_answer` to ask for clarification like "Please provide your height clearly (e.g., '180 cm', '5 ft 11 in').". | |
2. **Web Search (Diverse Queries):** If height found, use `web_search` to find **fictional characters, historical figures, scientists, artists, athletes, and other interesting people** of similar height. Formulate 2-3 specific queries using the user's height in cm (e.g., if user height is 180cm, search for `"historical figures 180 cm tall"`, `"celebrities around 180cm height"`, `"fictional characters exactly 180 cm"`). Print the search results clearly. | |
3. **Extract & Validate from Search Results:** CRITICAL STEP. Read the `web_search` Observation snippets carefully. | |
* Identify potential (Name, Height String) pairs. Prioritize clear mentions of height linked to a name. | |
* For each potential pair: | |
* Use `parse_height_from_text` on the relevant part of the search snippet string containing the height info. Store the result in cm (e.g., `extracted_cm`). | |
* **Validate using Python code:** Check if `extracted_cm` is NOT `None` AND if it's within a reasonable human range (e.g., `if extracted_cm is not None and 50 < extracted_cm < 250:`). | |
* Collect valid (Name, Validated Height cm) pairs into a Python list. Print this list. Aim for diverse examples. | |
4. **Generate Multiple Comparisons:** Check the validated matches list. | |
* If empty after searching, use `final_answer` stating no relevant matches were found for that height. | |
* If matches exist, select **up to 3-4 diverse ones**. | |
* Create an empty list `comparison_outputs = []`. | |
* **Loop** through the selected matches. For each (name, ref_height_cm), call `create_comparison_statement(target=name, user_height=USER_HEIGHT_CM, reference_height=ref_height_cm)`. Append the resulting string to `comparison_outputs`. | |
5. **Final Answer:** Combine the generated strings from `comparison_outputs` into a single response (e.g., separated by newlines: `"\\n".join(comparison_outputs)`). Add a brief introductory sentence like "Here are some figures with similar heights:". Return the complete message using `final_answer`. | |
Follow Thought-Code-Observation meticulously. Handle `None` returns from `parse_height_from_text` gracefully in your Python code logic. Use the tools as described in their docstrings. | |
""" | |
return instructions | |
# --- Define the Subclassed Agent --- | |
class HeightComparisonAgent(CodeAgent): | |
""" | |
An agent that intercepts the user query in the run method, | |
transforms it into a detailed task using create_height_comparison_task, | |
and then executes the detailed task using the parent CodeAgent's run method. | |
This allows GradioUI to monitor the execution of the *detailed* task. | |
""" | |
def run(self, task: str, **kwargs: Any) -> str: | |
""" | |
Overrides the default run method. | |
'task' received here is expected to be the raw user query from GradioUI. | |
""" | |
user_query = task # Assume the input 'task' is the user query | |
print(f"[HeightComparisonAgent] Intercepted run call with user query: '{user_query}'") | |
if not user_query or not user_query.strip(): | |
return "Please enter a valid query." # Handle empty input | |
# 1. Generate the detailed task description using the helper function | |
detailed_task = create_height_comparison_task(user_query) | |
print(f"[HeightComparisonAgent] Generated detailed task (first 200 chars): {detailed_task[:200]}...") | |
# 2. Call the *parent* class's run method with the DETAILED task | |
# This is the core step. super().run() executes the actual agent logic | |
# that GradioUI is presumably monitoring via its verbose output. | |
print(f"[HeightComparisonAgent] Calling super().run() with the detailed task...") | |
try: | |
# Pass the generated 'detailed_task' as the 'task' argument to the parent's run method | |
final_result = super().run(task=detailed_task, **kwargs) | |
print(f"[HeightComparisonAgent] super().run() finished.") | |
# GradioUI should display the final_result automatically | |
return final_result | |
except Exception as e: | |
print(f"[HeightComparisonAgent] Error during super().run(): {e}") | |
traceback.print_exc() | |
# Return a user-friendly error message | |
return f"An error occurred while processing your request: {e}" | |
# --- Instantiate the Subclassed Agent --- | |
# IMPORTANT: Use the HeightComparisonAgent class, not CodeAgent directly. | |
# Set verbosity_level=3 so the parent's run method (super().run) generates the verbose output. | |
# --- Instantiate the Agent --- | |
height_agent = None | |
initialization_error_message = None # <<< Make sure this line is BEFORE the if | |
if llm_model is not None: | |
try: | |
height_agent = HeightComparisonAgent( | |
tools=[DuckDuckGoSearchTool(), VisitWebpageTool(), parse_height_from_text, create_comparison_statement, FinalAnswerTool()], | |
model=llm_model, | |
verbosity_level=3, # <<< ESSENTIAL for capturing reasoning steps | |
max_steps=20, | |
) | |
print("--- HeightComparisonAgent initialized successfully. ---") | |
except Exception as e: | |
# Store the error if agent creation fails even with a model | |
initialization_error_message = f"ERROR: Failed to initialize HeightComparisonAgent: {e}\n{traceback.format_exc()}" | |
print(initialization_error_message) | |
height_agent = None # Ensure agent is None on error | |
else: | |
# Store the error if the LLM model itself failed to initialize | |
initialization_error_message = ( | |
"ERROR: Could not initialize any Language Model backend.\n\n" | |
f"Please check the Space logs (check the 'Logs' tab above the app).\n" | |
f"Verify that at least one of these secrets is correctly set in Space Settings -> Secrets:\n" | |
f"Also ensure necessary libraries are in requirements.txt." | |
) | |
print(initialization_error_message) | |
# height_agent is already None | |
# --- ADD THIS HELPER CLASS --- | |
class WritableQueue: | |
"""A file-like object that writes messages to a queue.""" | |
def __init__(self, q): | |
self.queue = q | |
def write(self, message): | |
# Only put non-empty messages on the queue | |
if message.strip(): | |
self.queue.put(message) | |
def flush(self): | |
# Required for file-like objects, but does nothing here | |
pass | |
# --- END OF HELPER CLASS --- | |
# --- REPLACE THE EXISTING run_agent_wrapper FUNCTION WITH THIS --- | |
# --- REPLACE THE EXISTING run_agent_wrapper FUNCTION WITH THIS --- | |
def agent_thread_func(agent, query, log_queue, result_queue): | |
"""Function to run the agent in a separate thread and capture output.""" | |
try: | |
# Create a WritableQueue instance for stdout redirection | |
stdout_writer = WritableQueue(log_queue) | |
# Redirect stdout within this thread | |
with contextlib.redirect_stdout(stdout_writer): | |
# Run the agent (prints will go to stdout_writer -> log_queue) | |
final_result = agent.run(query) | |
result_queue.put(final_result) # Put the final result in the result queue | |
except Exception as e: | |
# If an error occurs, print it to the log and put it in the result queue | |
tb_str = traceback.format_exc() | |
print(f"\n--- ERROR IN AGENT THREAD ---\n{e}\n{tb_str}") | |
result_queue.put(e) # Put the exception object itself | |
finally: | |
# Signal that logging is finished by putting None in the log queue | |
log_queue.put(None) | |
# Generator function for Gradio streaming | |
# REMOVED the return type hint -> Iterator[...] | |
# --- REPLACE the current run_agent_wrapper function WITH THIS --- | |
def run_agent_wrapper(query: str) -> Tuple[str, str]: | |
""" | |
Runs the height_agent synchronously and captures its stdout (reasoning steps). | |
Returns (reasoning_log, final_answer). NO STREAMING. | |
""" | |
if height_agent is None: | |
error_msg = initialization_error_message or "Agent not initialized." | |
return (error_msg, "Agent failed to initialize. See reasoning log.") | |
print(f"\n--- Running agent for query: '{query}' ---") | |
log_stream = io.StringIO() | |
final_answer = "Error during execution." # Default message | |
try: | |
# Redirect stdout to capture prints from agent.run() | |
with contextlib.redirect_stdout(log_stream): | |
# Run the agent directly (prints are captured) | |
final_answer = height_agent.run(query) | |
print("\n--- Agent execution finished. ---") # Add marker to log | |
except Exception as e: | |
print(f"\n--- Error during agent execution wrapper: {e} ---") | |
traceback.print_exc(file=log_stream) # Print exception to log stream | |
final_answer = f"An error occurred in the wrapper. See reasoning log. Error: {e}" | |
finally: | |
reasoning_log = log_stream.getvalue() | |
log_stream.close() | |
print("--- Finished capturing stdout. ---") # Log to console, not captured | |
return reasoning_log, final_answer | |
# --- Build Gradio Interface Manually with gr.Blocks --- | |
print("--- Building Gradio Interface with gr.Blocks ---") | |
# Make sure theme is applied correctly if desired | |
# theme = gr.themes.Default() # Or another theme | |
# with gr.Blocks(theme=theme, css="footer {visibility: hidden}") as demo: | |
with gr.Blocks(css="footer {visibility: hidden}") as demo: | |
gr.Markdown("# Height Comparison Agent") | |
gr.Markdown("Enter your height (e.g., '180 cm', '5ft 11in') to find characters/figures of similar height.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
query_input = gr.Textbox(label="Your Query (including height)", | |
placeholder="e.g., I am 175cm tall", | |
) | |
gr.Examples( | |
examples=[ | |
"I am 182 cm tall", | |
"How tall am I compared to characters around 5ft 9in?", | |
"who is the same height as 1.7 meters", | |
"my height is 190cm", | |
"compare 6'1\" height", | |
], | |
inputs=query_input # Link the examples to the query_input Textbox | |
) | |
submit_button = gr.Button("Compare Heights", variant="primary") | |
with gr.Column(scale=2): | |
# Keep the Textbox for the final answer separate | |
final_answer_output = gr.Textbox(label="Final Answer", interactive=False, lines=5) | |
gr.Markdown("## Agent Reasoning Steps") | |
# --- CHANGE THIS --- | |
# reasoning_output = gr.Code(label="Reasoning Log", language="markdown", interactive=False, lines=20) | |
# --- REPLACE Chatbot definition WITH THIS --- | |
reasoning_output = gr.Code( | |
label="Reasoning Log", | |
language="markdown", # Use markdown for good text/code display | |
interactive=False, | |
lines=20 | |
) | |
submit_button.click( | |
fn=run_agent_wrapper, | |
inputs=query_input, | |
outputs=[reasoning_output, final_answer_output] | |
) | |
# --- Launch Gradio (no change needed here) --- | |
print("--- Starting Gradio Interface ---") | |
demo.launch() | |