Spaces:
Running
Running
import os | |
from typing import Optional | |
import gradio as gr | |
import requests | |
from smolagents import CodeAgent, Tool | |
from smolagents.models import HfApiModel | |
from smolagents.monitoring import LogLevel | |
from gradio import ChatMessage | |
import logging | |
from functools import lru_cache | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
DEFAULT_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" | |
HF_API_TOKEN = os.getenv("HF_TOKEN") | |
# Tool descriptions for the UI | |
TOOL_DESCRIPTIONS = { | |
"Hub Collections": "Add tool collections from Hugging Face Hub.", | |
"Spaces": "Add tools from Hugging Face Spaces.", | |
} | |
def search_spaces(query, limit=1): | |
""" | |
Search for Hugging Face Spaces using the API. | |
Returns the first result or None if no results. | |
""" | |
try: | |
url = f"https://huggingface.co./api/spaces?search={query}&limit={limit}" | |
response = requests.get( | |
url, headers={"Authorization": f"Bearer {HF_API_TOKEN}"} | |
) | |
response.raise_for_status() | |
spaces = response.json() | |
if not spaces: | |
return None | |
return extract_space_info(spaces[0]) | |
except requests.RequestException as e: | |
logger.error(f"Error searching spaces: {e}") | |
return None | |
def extract_space_info(space): | |
space_id = space["id"] | |
title = space_id.split("/")[-1] | |
description = f"Tool from {space_id}" | |
if "title" in space: | |
title = space["title"] | |
elif "cardData" in space and "title" in space["cardData"]: | |
title = space["cardData"]["title"] | |
if "description" in space: | |
description = space["description"] | |
elif "cardData" in space and "description" in space["cardData"]: | |
description = space["cardData"]["description"] | |
return {"id": space_id, "title": title, "description": description} | |
def get_space_metadata(space_id): | |
""" | |
Get metadata for a specific Hugging Face Space. | |
""" | |
try: | |
url = f"https://huggingface.co./api/spaces/{space_id}" | |
response = requests.get( | |
url, headers={"Authorization": f"Bearer {HF_API_TOKEN}"} | |
) | |
response.raise_for_status() | |
space = response.json() | |
return extract_space_info(space) | |
except requests.RequestException as e: | |
logger.error(f"Error getting space metadata: {e}") | |
return None | |
def create_agent(model_name, space_tools=None): | |
""" | |
Create a CodeAgent with the specified model and tools. | |
""" | |
if not space_tools: | |
space_tools = [] | |
try: | |
tools = [Tool.from_space( | |
tool_info["id"], | |
name=tool_info.get("name", tool_info["id"]), | |
description=tool_info.get("description", ""), | |
) for tool_info in space_tools] | |
model = HfApiModel(model_id=model_name, token=HF_API_TOKEN) | |
agent = CodeAgent( | |
tools=tools, | |
model=model, | |
additional_authorized_imports=["PIL", "requests"], | |
verbosity_level=LogLevel.DEBUG, | |
) | |
logger.info(f"Agent created successfully with {len(tools)} tools") | |
return agent | |
except Exception as e: | |
logger.error(f"Error creating agent: {e}") | |
try: | |
logger.info("Trying fallback model...") | |
fallback_model = HfApiModel( | |
model_id="Qwen/Qwen2.5-Coder-7B-Instruct", token=HF_API_TOKEN | |
) | |
agent = CodeAgent( | |
tools=tools, | |
model=fallback_model, | |
additional_authorized_imports=["PIL", "requests"], | |
verbosity_level=LogLevel.DEBUG, | |
) | |
logger.info("Agent created successfully with fallback model") | |
return agent | |
except Exception as e: | |
logger.error(f"Error creating agent with fallback model: {e}") | |
return None | |
# Event handler functions | |
def on_search_spaces(query): | |
if not query: | |
return "Please enter a search term.", "", "", "" | |
try: | |
space_info = search_spaces(query) | |
if space_info is None: | |
return "No spaces found.", "", "", "" | |
results_md = f"### Search Results:\n- ID: `{space_info['id']}`\n- Title: {space_info['title']}\n- Description: {space_info['description']}\n" | |
return results_md, space_info["id"], space_info["title"], space_info["description"] | |
except Exception as e: | |
logger.error(f"Error in search: {e}") | |
return f"Error: {str(e)}", "", "", "" | |
def on_validate_space(space_id): | |
if not space_id: | |
return "Please enter a space ID or search term.", "", "" | |
try: | |
space_info = get_space_metadata(space_id) | |
if space_info is None: | |
space_info = search_spaces(space_id) | |
if space_info is None: | |
return f"No spaces found for '{space_id}'.", "", "" | |
result_md = f"### Found Space via Search:\n- ID: `{space_info['id']}`\n- Title: {space_info['title']}\n- Description: {space_info['description']}\n" | |
return result_md, space_info["title"], space_info["description"] | |
result_md = f"### Space Validated Successfully:\n- ID: `{space_info['id']}`\n- Title: {space_info['title']}\n- Description: {space_info['description']}\n" | |
return result_md, space_info["title"], space_info["description"] | |
except Exception as e: | |
logger.error(f"Error validating space: {e}") | |
return f"Error: {str(e)}", "", "" | |
def on_add_tool(space_id, space_name, space_description, current_tools): | |
if not space_id: | |
return current_tools, "Please enter a space ID." | |
for tool in current_tools: | |
if tool["id"] == space_id: | |
return current_tools, f"Tool '{space_id}' is already added." | |
new_tool = { | |
"id": space_id, | |
"name": space_name if space_name else space_id, | |
"description": space_description if space_description else "No description", | |
} | |
updated_tools = current_tools + [new_tool] | |
tools_md = "### Added Tools:\n" | |
for i, tool in enumerate(updated_tools, 1): | |
tools_md += f"{i}. **{tool['name']}** (`{tool['id']}`)\n {tool['description']}\n\n" | |
return updated_tools, tools_md | |
def on_create_agent(model, space_tools): | |
if not space_tools: | |
return None, [], "", "Please add at least one tool before creating an agent.", "No agent created yet." | |
try: | |
agent = create_agent(model, space_tools) | |
if agent is None: | |
return None, [], "", "Failed to create agent. Please try again with different tools or model.", "No agent created yet." | |
tools_str = ", ".join([f"{tool['name']} ({tool['id']})" for tool in space_tools]) | |
agent_status = update_agent_status(agent) | |
return agent, [], "", f"β Agent created successfully with {model}!\nTools: {tools_str}", agent_status | |
except Exception as e: | |
logger.error(f"Error creating agent: {e}") | |
return None, [], "", f"Error creating agent: {str(e)}", "No agent created yet." | |
def add_user_message(message, chat_history): | |
if not message: | |
return "", chat_history | |
chat_history = chat_history + [ChatMessage(role="user", content=message)] | |
return message, chat_history | |
def stream_to_gradio(agent, task: str, reset_agent_memory: bool = False, additional_args: Optional[dict] = None): | |
from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types | |
from smolagents.agent_types import AgentAudio, AgentImage, AgentText | |
for step_log in agent.run(task, stream=True, reset=reset_agent_memory, additional_args=additional_args): | |
for message in pull_messages_from_step(step_log): | |
yield message | |
final_answer = step_log | |
final_answer = handle_agent_output_types(final_answer) | |
if isinstance(final_answer, AgentImage): | |
yield gr.ChatMessage(role="assistant", content={"path": final_answer.to_string(), "mime_type": "image/png"}) | |
elif isinstance(final_answer, AgentText) and os.path.exists(final_answer.to_string()): | |
yield gr.ChatMessage(role="assistant", content=gr.Image(final_answer.to_string())) | |
elif isinstance(final_answer, AgentAudio): | |
yield gr.ChatMessage(role="assistant", content={"path": final_answer.to_string(), "mime_type": "audio/wav"}) | |
else: | |
yield gr.ChatMessage(role="assistant", content=f"**Final answer:** {str(final_answer)}") | |
def stream_agent_response(agent, message, chat_history): | |
if not message or agent is None: | |
return chat_history | |
yield chat_history | |
try: | |
for msg in stream_to_gradio(agent, message): | |
chat_history = chat_history + [msg] | |
yield chat_history | |
except Exception as e: | |
error_msg = f"Error: {str(e)}" | |
chat_history = chat_history + [ChatMessage(role="assistant", content=error_msg)] | |
yield chat_history | |
def on_clear(agent=None): | |
return agent, [], "", "Agent cleared. Create a new one to continue.", "", gr.update(interactive=False) | |
def update_agent_status(agent): | |
if agent is None: | |
return "No agent created yet. Add a Space tool to get started." | |
tools = agent.tools if hasattr(agent, "tools") else [] | |
tool_count = len(tools) | |
status = f"Agent ready with {tool_count} tools" | |
return status | |
# Create the Gradio app | |
with gr.Blocks(title="AI Agent Builder") as app: | |
gr.Markdown("# AI Agent Builder with smolagents") | |
gr.Markdown("Build your own AI agent by selecting tools from Hugging Face Spaces.") | |
agent_state = gr.State(None) | |
last_message = gr.State("") | |
space_tools_state = gr.State([]) | |
msg_store = gr.State("") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("## Tool Configuration") | |
gr.Markdown("Add multiple Hugging Face Spaces as tools for your agent:") | |
model_input = gr.Textbox(value=DEFAULT_MODEL, label="Model", visible=False) | |
with gr.Group(): | |
gr.Markdown("### Add Space as Tool") | |
space_tool_input = gr.Textbox(label="Space ID or Search Term", placeholder=("Enter a Space ID or search term"), info="Enter a Space ID (username/space-name) or search term") | |
space_name_input = gr.Textbox(label="Tool Name (optional)", placeholder="Enter a name for this tool") | |
space_description_input = gr.Textbox(label="Tool Description (optional)", placeholder="Enter a description for this tool", lines=2) | |
add_tool_button = gr.Button("Add Tool", variant="primary") | |
gr.Markdown("### Added Tools") | |
tools_display = gr.Markdown("No tools added yet. Add at least one tool before creating an agent.") | |
create_button = gr.Button("Create Agent with Selected Tools", variant="secondary", size="lg") | |
status_msg = gr.Markdown("") | |
agent_status = gr.Markdown("No agent created yet.") | |
with gr.Column(scale=2): | |
chatbot = gr.Chatbot(label="Agent Chat", height=600, show_copy_button=True, avatar_images=("π€", "π€"), type="messages") | |
msg = gr.Textbox(label="Your message", placeholder="Type a message to your agent...", interactive=True) | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=60): | |
clear = gr.Button("ποΈ", scale=1) | |
with gr.Column(scale=8): | |
pass | |
space_tool_input.submit(on_validate_space, inputs=[space_tool_input], outputs=[status_msg, space_name_input, space_description_input]) | |
add_tool_button.click(on_add_tool, inputs=[space_tool_input, space_name_input, space_description_input, space_tools_state], outputs=[space_tools_state, tools_display]) | |
create_button.click(on_create_agent, inputs=[model_input, space_tools_state], outputs=[agent_state, chatbot, msg, status_msg, agent_status]) | |
msg.submit(lambda message: (message, message, ""), inputs=[msg], outputs=[msg_store, msg, msg], queue=False).then(add_user_message, inputs=[msg_store, chatbot], outputs=[msg_store, chatbot], queue=False).then(stream_agent_response, inputs=[agent_state, msg_store, chatbot], outputs=chatbot, queue=True) | |
if __name__ == "__main__": | |
app.queue().launch() |