Spaces:
Running
Running
import os | |
import sys | |
import asyncio | |
import logging | |
import threading | |
import queue | |
import gradio as gr | |
import httpx | |
from typing import Generator, Any, Dict, List | |
# -------------------- Configuration -------------------- | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
# -------------------- External Model Call -------------------- | |
async def call_model(prompt: str, model: str = "gpt-4o-mini", api_key: str = None) -> str: | |
""" | |
Sends a prompt to the OpenAI API endpoint using the specified model (overridden to gpt-4o-mini) | |
and returns the generated response. | |
""" | |
# Use the provided API key or fall back to the environment variable | |
if api_key is None: | |
api_key = os.getenv("OPENAI_API_KEY") | |
url = "https://api.openai.com/v1/chat/completions" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
# Override the model value to always be "gpt-4o-mini" | |
payload = { | |
"model": "gpt-4o-mini", | |
"messages": [{"role": "user", "content": prompt}], | |
} | |
async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client: | |
response = await client.post(url, headers=headers, json=payload) | |
response.raise_for_status() | |
response_json = response.json() | |
return response_json["choices"][0]["message"]["content"] | |
# -------------------- Agent Classes -------------------- | |
class PromptOptimizerAgent: | |
async def optimize_prompt(self, user_prompt: str, api_key: str) -> str: | |
""" | |
Optimizes the user's initial prompt according to the following instructions: | |
>>> Given the user's initial prompt below the ### characters please enhance it. | |
1. Start with clear, precise instructions placed at the beginning of the prompt. | |
2. Include specific details about the desired context, outcome, length, format, and style. | |
3. Provide examples of the desired output format, if possible. | |
4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. | |
5. Avoid any vague or imprecise language. | |
6. Rather than only stating what not to do, provide guidance on what should be done instead. | |
Remember to ensure the revised prompt remains true to the user's original intent. <<< | |
###User initial prompt below ### | |
""" | |
system_prompt = ( | |
"Given the user's initial prompt below the ### characters please enhance it. " | |
"1. Start with clear, precise instructions placed at the beginning of the prompt. " | |
"2. Include specific details about the desired context, outcome, length, format, and style. " | |
"3. Provide examples of the desired output format, if possible. " | |
"4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. " | |
"5. Avoid any vague or imprecise language. " | |
"6. Rather than only stating what not to do, provide guidance on what should be done instead. " | |
"Remember to ensure the revised prompt remains true to the user's original intent. " | |
"###User initial prompt ###" | |
) | |
full_prompt = f"{system_prompt}\n{user_prompt}\n<<<" | |
optimized = await call_model(full_prompt, api_key=api_key) | |
return optimized | |
class OrchestratorAgent: | |
def __init__(self, log_queue: queue.Queue) -> None: | |
self.log_queue = log_queue | |
async def generate_plan(self, task: str, api_key: str) -> str: | |
""" | |
Generates a detailed, step-by-step plan for completing the given task. | |
""" | |
prompt = ( | |
f"You are an orchestrator agent. The user has provided the task: '{task}'.\n" | |
"Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, " | |
"a code reviewer agent, and a documentation agent. List the steps as bullet points." | |
) | |
plan = await call_model(prompt, api_key=api_key) | |
return plan | |
class CoderAgent: | |
async def generate_code(self, instructions: str, api_key: str) -> str: | |
""" | |
Generates code based on the given instructions. | |
""" | |
prompt = ( | |
"You are a coder agent. Based on the following instructions, generate the requested code. " | |
"Only output the generated code, never any explanations or any other information besides the actual code!\n" | |
f"{instructions}\n" | |
) | |
code = await call_model(prompt, api_key=api_key) | |
return code | |
class CodeReviewerAgent: | |
async def review_code(self, code: str, task: str, api_key: str) -> str: | |
""" | |
Reviews the provided code to check if it meets the task specifications. | |
NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct. | |
""" | |
prompt = ( | |
"You are a code reviewing agent highly skilled in evaluating code quality. " | |
"Review the provided code and check if it meets the task specifications and properly addresses any provided feedback. " | |
"NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct. " | |
"Do not mention 'APPROVE' before actually approving! Do not request documentation or user guides:\n" | |
f"Task: {task}\n" | |
f"Code:\n{code}\n\n" | |
) | |
review = await call_model(prompt, api_key=api_key) | |
return review | |
class DocumentationAgent: | |
async def generate_documentation(self, code: str, api_key: str) -> str: | |
""" | |
Generates clear and concise documentation for the approved code, | |
including a brief and concise --help message. | |
""" | |
prompt = ( | |
"You are a documentation agent. Generate a brief, clear and concise documentation for the following approved code. " | |
"Keep it short and compact, focusing on the main elements, do not include unnecessary extras that limit readability. " | |
"Additionally, generate a brief and concise --help message for the code:\n" | |
f"{code}\n" | |
"Briefly explain what the code does and how it works. Make sure to be clear and concise, do not include unnecessary extras that limit readability." | |
) | |
documentation = await call_model(prompt, api_key=api_key) | |
return documentation | |
# -------------------- Multi-Agent Conversation -------------------- | |
async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str) -> None: | |
""" | |
Conducts a multi-agent conversation where each agent's response is generated via the external model API. | |
The conversation is logged to the provided queue. | |
""" | |
conversation: List[Dict[str, str]] = [] # List to store each agent's message | |
# Step 0: Use Prompt Optimizer to enhance the user's initial prompt. | |
log_queue.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...") | |
prompt_optimizer = PromptOptimizerAgent() | |
optimized_task = await prompt_optimizer.optimize_prompt(task_message, api_key=api_key) | |
conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"}) | |
log_queue.put(f"[Prompt Optimizer]: Optimized task prompt:\n{optimized_task}") | |
# Step 1: Orchestrator generates a plan based on the optimized task. | |
log_queue.put("[Orchestrator]: Received optimized task. Generating plan...") | |
orchestrator = OrchestratorAgent(log_queue) | |
plan = await orchestrator.generate_plan(optimized_task, api_key=api_key) | |
conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"}) | |
log_queue.put(f"[Orchestrator]: Plan generated:\n{plan}") | |
# Step 2: Coder generates code based on the plan. | |
coder = CoderAgent() | |
coder_instructions = f"Implement the task as described in the following plan:\n{plan}" | |
log_queue.put("[Coder]: Received coding task from the Orchestrator.") | |
code = await coder.generate_code(coder_instructions, api_key=api_key) | |
conversation.append({"agent": "Coder", "message": f"Code:\n{code}"}) | |
log_queue.put(f"[Coder]: Code generated:\n{code}") | |
# Step 3: Code Reviewer reviews the generated code. | |
reviewer = CodeReviewerAgent() | |
approval_keyword = "approve" | |
revision_iteration = 0 | |
while True: | |
if revision_iteration == 0: | |
log_queue.put("[Code Reviewer]: Starting review of the generated code...") | |
else: | |
log_queue.put(f"[Code Reviewer]: Reviewing the revised code (Iteration {revision_iteration})...") | |
review = await reviewer.review_code(code, optimized_task, api_key=api_key) | |
conversation.append({"agent": "Code Reviewer", "message": f"Review (Iteration {revision_iteration}):\n{review}"}) | |
log_queue.put(f"[Code Reviewer]: Review feedback (Iteration {revision_iteration}):\n{review}") | |
# Check if the code has been approved | |
if approval_keyword in review.lower(): | |
log_queue.put("[Code Reviewer]: Code approved.") | |
break # Exit the loop if approved | |
# If not approved, increment the revision count. | |
revision_iteration += 1 | |
# Kill-switch: After 5 generations without approval, shut down. | |
if revision_iteration >= 5: | |
log_queue.put("Unable to solve your task to full satisfaction :(") | |
sys.exit("Unable to solve your task to full satisfaction :(") | |
# If under the limit, instruct the coder to revise the code. | |
log_queue.put(f"[Orchestrator]: Code not approved. Instructing coder to revise the code (Iteration {revision_iteration}).") | |
update_instructions = f"Please revise the code according to the following feedback. Feedback: {review}" | |
revised_code = await coder.generate_code(update_instructions, api_key=api_key) | |
conversation.append({"agent": "Coder", "message": f"Revised Code (Iteration {revision_iteration}):\n{revised_code}"}) | |
log_queue.put(f"[Coder]: Revised code submitted (Iteration {revision_iteration}):\n{revised_code}") | |
code = revised_code # Update the code for the next review iteration | |
# Step 4: Documentation Agent generates documentation for the approved code. | |
doc_agent = DocumentationAgent() | |
log_queue.put("[Documentation Agent]: Generating documentation for the approved code.") | |
documentation = await doc_agent.generate_documentation(code, api_key=api_key) | |
conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{documentation}"}) | |
log_queue.put(f"[Documentation Agent]: Documentation generated:\n{documentation}") | |
log_queue.put("Multi-agent conversation complete.") | |
log_queue.put(("result", conversation)) | |
# -------------------- Process Generator for Streaming -------------------- | |
def process_conversation_generator(task_message: str, api_key: str) -> Generator[str, None, None]: | |
""" | |
Wraps the asynchronous multi-agent conversation and yields log messages as they are generated. | |
""" | |
log_q: queue.Queue = queue.Queue() | |
def run_conversation() -> None: | |
asyncio.run(multi_agent_conversation(task_message, log_q, api_key)) | |
thread = threading.Thread(target=run_conversation) | |
thread.start() | |
final_result = None | |
# Yield log messages as long as the thread is running or the queue is not empty. | |
while thread.is_alive() or not log_q.empty(): | |
try: | |
msg = log_q.get(timeout=0.1) | |
if isinstance(msg, tuple) and msg[0] == "result": | |
final_result = msg[1] | |
yield "Final conversation complete." | |
else: | |
yield msg | |
except queue.Empty: | |
continue | |
thread.join() | |
if final_result: | |
# Format the final conversation log. | |
conv_text = "\n========== Multi-Agent Conversation ==========\n" | |
for entry in final_result: | |
conv_text += f"[{entry['agent']}]: {entry['message']}\n\n" | |
yield conv_text | |
# -------------------- Chat Function for Gradio -------------------- | |
def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[str, None, None]: | |
""" | |
Chat function for Gradio. | |
The user's message is interpreted as the task description. | |
An optional OpenAI API key can be provided via the additional input; if not provided, the environment variable is used. | |
This function streams the multi-agent conversation log messages. | |
""" | |
if not openai_api_key: | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
yield from process_conversation_generator(message, openai_api_key) | |
# -------------------- Launch the Chatbot -------------------- | |
# Use Gradio's ChatInterface with an additional input field for the OpenAI API key. | |
iface = gr.ChatInterface( | |
fn=multi_agent_chat, | |
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")], | |
type="messages", | |
title="Actual Multi-Agent Conversation Chatbot", | |
description="""Collaborative workflow between Primary, Critic, and Documentation agents: | |
"1. Enter a task and observe as your prompt gets magically improved by the Prompt-Enhancing agent before reaching the | |
2. Orchestrator, who then devises a plan and enlists the assistance of a | |
3. Coder agent that writes code which is iteratively improved upon thanks to the | |
4. Code Reviewer who finally decides if the code should be approved and get sent off to the | |
5. Code Documentation Generator that will write documentation for your freshly generated code!" | |
"NOTE: The full conversation log will be displayed at the end, showing all the steps taken!" | |
"NOTE2: If the Coder is unable to satisfactorily complete the task after five attempts, the script will terminate to prevent endless iterations. | |
"NOTE3: You will have to input your OPENAI_API_KEY at the bottom of the page for this to work!""" | |
) | |
if __name__ == "__main__": | |
iface.launch() |