CultriX's picture
Update app.py
02d640a verified
raw
history blame
7.23 kB
import os
import sys
import asyncio
import logging
import threading
import queue
import gradio as gr
import httpx
from typing import Generator, Any, Dict, List
logger = logging.getLogger(__name__)
class Agent(ABC):
@abstractmethod
async def generate_response(self, prompt: str, api_key: str) -> str:
pass
class PromptOptimizerAgent(Agent):
async def generate_response(self, prompt: str, api_key: str) -> str:
system_prompt = (
"Given the user's initial prompt below the ### characters please enhance it. "
"1. Start with clear, precise instructions placed at the beginning of the prompt. "
"2. Include specific details about the desired context, outcome, length, format, and style. "
"3. Provide examples of the desired output format, if possible. "
"4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. "
"5. Avoid any vague or imprecise language. "
"6. Rather than only stating what not to do, provide guidance on what should be done instead. "
"Remember to ensure the revised prompt remains true to the user's original intent. "
###User initial prompt###
)
return await call_openai(system_prompt, api_key)
class OrchestratorAgent(Agent):
async def generate_response(self, task_message: str, api_key: str) -> str:
plan = f"You are an orchestrator agent. The user has provided the task: '{task_message}'. Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, a code reviewer agent, and a documentation agent. List the steps as bullet points."
return await call_openai(plan, api_key)
class CoderAgent(Agent):
async def generate_response(self, instructions: str, api_key: str) -> str:
prompt = f"Implement the task as described in the following plan:\n{instructions}"
return await call_openai(prompt, api_key)
class CodeReviewerAgent(Agent):
async def generate_response(self, code: str, task: str, api_key: str) -> str:
feedback = await call_openai(
f"You are a code reviewer agent. Review the provided code: '{code}' and check if it meets the task specifications.",
api_key=api_key
)
return feedback
class DocumentationAgent(Agent):
async def generate_response(self, code: str, api_key: str) -> str:
prompt = f"You are a documentation agent. Generate a brief documentation for the code:\nCode:\n{code}"
return await call_openai(prompt, api_key)
async def process_conversation_generator(conversation: list, log_q: queue.Queue, api_key: str) -> Generator[str, None, None]:
try:
while True:
if not conversation or not log_q.get(timeout=0.1):
continue
msg = log_q.get(timeout=0.1)
if isinstance(msg, tuple) and msg[0] == "result":
final_result = msg[1]
break
yield msg
except asyncio.CancelledError:
pass
finally:
if log_q.empty():
log_q.put("Final conversation complete.")
async def multi_agent_conversation(
task_message: str,
log_q: queue.Queue,
api_key: str,
additional_inputs=None
) -> None:
if additional_inputs is None:
additional_inputs = [gr.Textbox(label="OpenAI API Key (optional)")]
agents = [
PromptOptimizerAgent(),
OrchestratorAgent(),
CoderAgent(),
CodeReviewerAgent(),
DocumentationAgent()
]
log_queue = queue.Queue()
run_conversation = None
async def run_conversation_thread() -> None:
nonlocal run_conversation
try:
if run_conversation is not None:
await run_conversation
except asyncio.CancelledError:
pass
thread = asyncio.to_thread(run_conversation_thread)
thread.start()
try:
conversation = []
log_q.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...")
# Step 0: Use Prompt Optimizer
optimized_task = await agents[0].generate_response(task_message, api_key)
conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"})
log_q.put(f"[Prompt Optimizer]: Optimized Task:\n{optimized_task}")
# Step 1: Generate Plan
plan = await agents[1].generate_response(optimized_task, api_key)
conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"})
log_q.put(f"[Orchestrator]: Plan generated:\n{plan}")
# Step 2: Generate Code
code = await agents[2].generate_response(plan, api_key)
conversation.append({"agent": "Coder", "message": f"Code:\n{code}"})
log_q.put(f"[Coder]: Code generated:\n{code}")
# Step 3: Code Review
code_review = None
iteration = 0
while True:
if iteration >= 5:
log_q.put("[Code Reviewer]: Code not approved after 5 iterations: terminating.")
break
code_review = await agents[3].generate_response(code, plan, api_key)
revised_code = await agents[2].generate_response(
f"Please revise the code according to the following feedback: {code_review}",
api_key
)
code = revised_code
iteration += 1
if code == revised_code:
break
log_q.put(f"[Code Reviewer]: Feedback received:\n{code_review}")
log_q.put(f"[Code Reviewer]: Revised code:\n{revised_code}")
# Step 4: Documentation
doc = await agents[4].generate_response(code, api_key)
conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{doc}"})
log_q.put(f"[Documentation Agent]: Documentation generated:\n{doc}")
except Exception as e:
log_q.put(f"[All Agents]: An error occurred: {str(e)}")
logger.error(f"Error in multi_agent_conversation: {str(e)}")
finally:
thread.join()
async def multi_agent_conversation_wrapper(task_message: str, api_key: str) -> None:
await multi_agent_conversation(
task_message,
log_q=queue.Queue(),
api_key=api_key,
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)") if api_key is None else None]
)
if __name__ == "__main__":
iface = gr.ChatInterface(
fn=multi_agent_conversation_wrapper,
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)")],
type="messages",
title="Actual Multi-Agent Conversation Chatbot",
description="""
- Collaborative workflow between Prompt Enhancer, Orchestrator, Coder, Code-Reviewer and Documentation Agent agents.
- Enter a task description to observe the iterative workflow between the agents.
- NOTE: The kill-switch mechanism will terminate after five code rejection iterations to prevent endless loops.
- NOTE3: You can input your OPENAI_API_KEY at the bottom of the page for this to work!
""",
)
iface.launch()