Spaces:
Running
Running
import os | |
import sys | |
import asyncio | |
import logging | |
import threading | |
import queue | |
import gradio as gr | |
import httpx | |
from typing import Generator, Any, Dict, List | |
logger = logging.getLogger(__name__) | |
class Agent(ABC): | |
async def generate_response(self, prompt: str, api_key: str) -> str: | |
pass | |
class PromptOptimizerAgent(Agent): | |
async def generate_response(self, prompt: str, api_key: str) -> str: | |
system_prompt = ( | |
"Given the user's initial prompt below the ### characters please enhance it. " | |
"1. Start with clear, precise instructions placed at the beginning of the prompt. " | |
"2. Include specific details about the desired context, outcome, length, format, and style. " | |
"3. Provide examples of the desired output format, if possible. " | |
"4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. " | |
"5. Avoid any vague or imprecise language. " | |
"6. Rather than only stating what not to do, provide guidance on what should be done instead. " | |
"Remember to ensure the revised prompt remains true to the user's original intent. " | |
###User initial prompt### | |
) | |
return await call_openai(system_prompt, api_key) | |
class OrchestratorAgent(Agent): | |
async def generate_response(self, task_message: str, api_key: str) -> str: | |
plan = f"You are an orchestrator agent. The user has provided the task: '{task_message}'. Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, a code reviewer agent, and a documentation agent. List the steps as bullet points." | |
return await call_openai(plan, api_key) | |
class CoderAgent(Agent): | |
async def generate_response(self, instructions: str, api_key: str) -> str: | |
prompt = f"Implement the task as described in the following plan:\n{instructions}" | |
return await call_openai(prompt, api_key) | |
class CodeReviewerAgent(Agent): | |
async def generate_response(self, code: str, task: str, api_key: str) -> str: | |
feedback = await call_openai( | |
f"You are a code reviewer agent. Review the provided code: '{code}' and check if it meets the task specifications.", | |
api_key=api_key | |
) | |
return feedback | |
class DocumentationAgent(Agent): | |
async def generate_response(self, code: str, api_key: str) -> str: | |
prompt = f"You are a documentation agent. Generate a brief documentation for the code:\nCode:\n{code}" | |
return await call_openai(prompt, api_key) | |
async def process_conversation_generator(conversation: list, log_q: queue.Queue, api_key: str) -> Generator[str, None, None]: | |
try: | |
while True: | |
if not conversation or not log_q.get(timeout=0.1): | |
continue | |
msg = log_q.get(timeout=0.1) | |
if isinstance(msg, tuple) and msg[0] == "result": | |
final_result = msg[1] | |
break | |
yield msg | |
except asyncio.CancelledError: | |
pass | |
finally: | |
if log_q.empty(): | |
log_q.put("Final conversation complete.") | |
async def multi_agent_conversation( | |
task_message: str, | |
log_q: queue.Queue, | |
api_key: str, | |
additional_inputs=None | |
) -> None: | |
if additional_inputs is None: | |
additional_inputs = [gr.Textbox(label="OpenAI API Key (optional)")] | |
agents = [ | |
PromptOptimizerAgent(), | |
OrchestratorAgent(), | |
CoderAgent(), | |
CodeReviewerAgent(), | |
DocumentationAgent() | |
] | |
log_queue = queue.Queue() | |
run_conversation = None | |
async def run_conversation_thread() -> None: | |
nonlocal run_conversation | |
try: | |
if run_conversation is not None: | |
await run_conversation | |
except asyncio.CancelledError: | |
pass | |
thread = asyncio.to_thread(run_conversation_thread) | |
thread.start() | |
try: | |
conversation = [] | |
log_q.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...") | |
# Step 0: Use Prompt Optimizer | |
optimized_task = await agents[0].generate_response(task_message, api_key) | |
conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"}) | |
log_q.put(f"[Prompt Optimizer]: Optimized Task:\n{optimized_task}") | |
# Step 1: Generate Plan | |
plan = await agents[1].generate_response(optimized_task, api_key) | |
conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"}) | |
log_q.put(f"[Orchestrator]: Plan generated:\n{plan}") | |
# Step 2: Generate Code | |
code = await agents[2].generate_response(plan, api_key) | |
conversation.append({"agent": "Coder", "message": f"Code:\n{code}"}) | |
log_q.put(f"[Coder]: Code generated:\n{code}") | |
# Step 3: Code Review | |
code_review = None | |
iteration = 0 | |
while True: | |
if iteration >= 5: | |
log_q.put("[Code Reviewer]: Code not approved after 5 iterations: terminating.") | |
break | |
code_review = await agents[3].generate_response(code, plan, api_key) | |
revised_code = await agents[2].generate_response( | |
f"Please revise the code according to the following feedback: {code_review}", | |
api_key | |
) | |
code = revised_code | |
iteration += 1 | |
if code == revised_code: | |
break | |
log_q.put(f"[Code Reviewer]: Feedback received:\n{code_review}") | |
log_q.put(f"[Code Reviewer]: Revised code:\n{revised_code}") | |
# Step 4: Documentation | |
doc = await agents[4].generate_response(code, api_key) | |
conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{doc}"}) | |
log_q.put(f"[Documentation Agent]: Documentation generated:\n{doc}") | |
except Exception as e: | |
log_q.put(f"[All Agents]: An error occurred: {str(e)}") | |
logger.error(f"Error in multi_agent_conversation: {str(e)}") | |
finally: | |
thread.join() | |
async def multi_agent_conversation_wrapper(task_message: str, api_key: str) -> None: | |
await multi_agent_conversation( | |
task_message, | |
log_q=queue.Queue(), | |
api_key=api_key, | |
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)") if api_key is None else None] | |
) | |
if __name__ == "__main__": | |
iface = gr.ChatInterface( | |
fn=multi_agent_conversation_wrapper, | |
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)")], | |
type="messages", | |
title="Actual Multi-Agent Conversation Chatbot", | |
description=""" | |
- Collaborative workflow between Prompt Enhancer, Orchestrator, Coder, Code-Reviewer and Documentation Agent agents. | |
- Enter a task description to observe the iterative workflow between the agents. | |
- NOTE: The kill-switch mechanism will terminate after five code rejection iterations to prevent endless loops. | |
- NOTE3: You can input your OPENAI_API_KEY at the bottom of the page for this to work! | |
""", | |
) | |
iface.launch() | |