CultriX's picture
Create app.py
b07e47b verified
raw
history blame
14.2 kB
import os
import sys
import asyncio
import logging
import threading
import queue
import gradio as gr
import httpx
from typing import Generator, Any, Dict, List
# -------------------- Configuration --------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# -------------------- External Model Call --------------------
async def call_model(prompt: str, model: str = "gpt-4o-mini", api_key: str = None) -> str:
"""
Sends a prompt to the OpenAI API endpoint using the specified model (overridden to gpt-4o-mini)
and returns the generated response.
"""
# Use the provided API key or fall back to the environment variable
if api_key is None:
api_key = os.getenv("OPENAI_API_KEY")
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Override the model value to always be "gpt-4o-mini"
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
}
async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
response = await client.post(url, headers=headers, json=payload)
response.raise_for_status()
response_json = response.json()
return response_json["choices"][0]["message"]["content"]
# -------------------- Agent Classes --------------------
class PromptOptimizerAgent:
async def optimize_prompt(self, user_prompt: str, api_key: str) -> str:
"""
Optimizes the user's initial prompt according to the following instructions:
>>> Given the user's initial prompt below the ### characters please enhance it.
1. Start with clear, precise instructions placed at the beginning of the prompt.
2. Include specific details about the desired context, outcome, length, format, and style.
3. Provide examples of the desired output format, if possible.
4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved.
5. Avoid any vague or imprecise language.
6. Rather than only stating what not to do, provide guidance on what should be done instead.
Remember to ensure the revised prompt remains true to the user's original intent. <<<
###User initial prompt below ###
"""
system_prompt = (
"Given the user's initial prompt below the ### characters please enhance it. "
"1. Start with clear, precise instructions placed at the beginning of the prompt. "
"2. Include specific details about the desired context, outcome, length, format, and style. "
"3. Provide examples of the desired output format, if possible. "
"4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. "
"5. Avoid any vague or imprecise language. "
"6. Rather than only stating what not to do, provide guidance on what should be done instead. "
"Remember to ensure the revised prompt remains true to the user's original intent. "
"###User initial prompt ###"
)
full_prompt = f"{system_prompt}\n{user_prompt}\n<<<"
optimized = await call_model(full_prompt, api_key=api_key)
return optimized
class OrchestratorAgent:
def __init__(self, log_queue: queue.Queue) -> None:
self.log_queue = log_queue
async def generate_plan(self, task: str, api_key: str) -> str:
"""
Generates a detailed, step-by-step plan for completing the given task.
"""
prompt = (
f"You are an orchestrator agent. The user has provided the task: '{task}'.\n"
"Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, "
"a code reviewer agent, and a documentation agent. List the steps as bullet points."
)
plan = await call_model(prompt, api_key=api_key)
return plan
class CoderAgent:
async def generate_code(self, instructions: str, api_key: str) -> str:
"""
Generates code based on the given instructions.
"""
prompt = (
"You are a coder agent. Based on the following instructions, generate the requested code. "
"Only output the generated code, never any explanations or any other information besides the actual code!\n"
f"{instructions}\n"
)
code = await call_model(prompt, api_key=api_key)
return code
class CodeReviewerAgent:
async def review_code(self, code: str, task: str, api_key: str) -> str:
"""
Reviews the provided code to check if it meets the task specifications.
NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct.
"""
prompt = (
"You are a code reviewing agent highly skilled in evaluating code quality. "
"Review the provided code and check if it meets the task specifications and properly addresses any provided feedback. "
"NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct. "
"Do not mention 'APPROVE' before actually approving! Do not request documentation or user guides:\n"
f"Task: {task}\n"
f"Code:\n{code}\n\n"
)
review = await call_model(prompt, api_key=api_key)
return review
class DocumentationAgent:
async def generate_documentation(self, code: str, api_key: str) -> str:
"""
Generates clear and concise documentation for the approved code,
including a brief and concise --help message.
"""
prompt = (
"You are a documentation agent. Generate a brief, clear and concise documentation for the following approved code. "
"Keep it short and compact, focusing on the main elements, do not include unnecessary extras that limit readability. "
"Additionally, generate a brief and concise --help message for the code:\n"
f"{code}\n"
"Briefly explain what the code does and how it works. Make sure to be clear and concise, do not include unnecessary extras that limit readability."
)
documentation = await call_model(prompt, api_key=api_key)
return documentation
# -------------------- Multi-Agent Conversation --------------------
async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str) -> None:
"""
Conducts a multi-agent conversation where each agent's response is generated via the external model API.
The conversation is logged to the provided queue.
"""
conversation: List[Dict[str, str]] = [] # List to store each agent's message
# Step 0: Use Prompt Optimizer to enhance the user's initial prompt.
log_queue.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...")
prompt_optimizer = PromptOptimizerAgent()
optimized_task = await prompt_optimizer.optimize_prompt(task_message, api_key=api_key)
conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"})
log_queue.put(f"[Prompt Optimizer]: Optimized task prompt:\n{optimized_task}")
# Step 1: Orchestrator generates a plan based on the optimized task.
log_queue.put("[Orchestrator]: Received optimized task. Generating plan...")
orchestrator = OrchestratorAgent(log_queue)
plan = await orchestrator.generate_plan(optimized_task, api_key=api_key)
conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"})
log_queue.put(f"[Orchestrator]: Plan generated:\n{plan}")
# Step 2: Coder generates code based on the plan.
coder = CoderAgent()
coder_instructions = f"Implement the task as described in the following plan:\n{plan}"
log_queue.put("[Coder]: Received coding task from the Orchestrator.")
code = await coder.generate_code(coder_instructions, api_key=api_key)
conversation.append({"agent": "Coder", "message": f"Code:\n{code}"})
log_queue.put(f"[Coder]: Code generated:\n{code}")
# Step 3: Code Reviewer reviews the generated code.
reviewer = CodeReviewerAgent()
approval_keyword = "approve"
revision_iteration = 0
while True:
if revision_iteration == 0:
log_queue.put("[Code Reviewer]: Starting review of the generated code...")
else:
log_queue.put(f"[Code Reviewer]: Reviewing the revised code (Iteration {revision_iteration})...")
review = await reviewer.review_code(code, optimized_task, api_key=api_key)
conversation.append({"agent": "Code Reviewer", "message": f"Review (Iteration {revision_iteration}):\n{review}"})
log_queue.put(f"[Code Reviewer]: Review feedback (Iteration {revision_iteration}):\n{review}")
# Check if the code has been approved
if approval_keyword in review.lower():
log_queue.put("[Code Reviewer]: Code approved.")
break # Exit the loop if approved
# If not approved, increment the revision count.
revision_iteration += 1
# Kill-switch: After 5 generations without approval, shut down.
if revision_iteration >= 5:
log_queue.put("Unable to solve your task to full satisfaction :(")
sys.exit("Unable to solve your task to full satisfaction :(")
# If under the limit, instruct the coder to revise the code.
log_queue.put(f"[Orchestrator]: Code not approved. Instructing coder to revise the code (Iteration {revision_iteration}).")
update_instructions = f"Please revise the code according to the following feedback. Feedback: {review}"
revised_code = await coder.generate_code(update_instructions, api_key=api_key)
conversation.append({"agent": "Coder", "message": f"Revised Code (Iteration {revision_iteration}):\n{revised_code}"})
log_queue.put(f"[Coder]: Revised code submitted (Iteration {revision_iteration}):\n{revised_code}")
code = revised_code # Update the code for the next review iteration
# Step 4: Documentation Agent generates documentation for the approved code.
doc_agent = DocumentationAgent()
log_queue.put("[Documentation Agent]: Generating documentation for the approved code.")
documentation = await doc_agent.generate_documentation(code, api_key=api_key)
conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{documentation}"})
log_queue.put(f"[Documentation Agent]: Documentation generated:\n{documentation}")
log_queue.put("Multi-agent conversation complete.")
log_queue.put(("result", conversation))
# -------------------- Process Generator for Streaming --------------------
def process_conversation_generator(task_message: str, api_key: str) -> Generator[str, None, None]:
"""
Wraps the asynchronous multi-agent conversation and yields log messages as they are generated.
"""
log_q: queue.Queue = queue.Queue()
def run_conversation() -> None:
asyncio.run(multi_agent_conversation(task_message, log_q, api_key))
thread = threading.Thread(target=run_conversation)
thread.start()
final_result = None
# Yield log messages as long as the thread is running or the queue is not empty.
while thread.is_alive() or not log_q.empty():
try:
msg = log_q.get(timeout=0.1)
if isinstance(msg, tuple) and msg[0] == "result":
final_result = msg[1]
yield "Final conversation complete."
else:
yield msg
except queue.Empty:
continue
thread.join()
if final_result:
# Format the final conversation log.
conv_text = "\n========== Multi-Agent Conversation ==========\n"
for entry in final_result:
conv_text += f"[{entry['agent']}]: {entry['message']}\n\n"
yield conv_text
# -------------------- Chat Function for Gradio --------------------
def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[str, None, None]:
"""
Chat function for Gradio.
The user's message is interpreted as the task description.
An optional OpenAI API key can be provided via the additional input; if not provided, the environment variable is used.
This function streams the multi-agent conversation log messages.
"""
if not openai_api_key:
openai_api_key = os.getenv("OPENAI_API_KEY")
yield from process_conversation_generator(message, openai_api_key)
# -------------------- Launch the Chatbot --------------------
# Use Gradio's ChatInterface with an additional input field for the OpenAI API key.
iface = gr.ChatInterface(
fn=multi_agent_chat,
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")],
type="messages",
title="Actual Multi-Agent Conversation Chatbot",
description="""Collaborative workflow between Primary, Critic, and Documentation agents:
"1. Enter a task and observe as your prompt gets magically improved by the Prompt-Enhancing agent before reaching the
2. Orchestrator, who then devises a plan and enlists the assistance of a
3. Coder agent that writes code which is iteratively improved upon thanks to the
4. Code Reviewer who finally decides if the code should be approved and get sent off to the
5. Code Documentation Generator that will write documentation for your freshly generated code!"
"NOTE: The full conversation log will be displayed at the end, showing all the steps taken!"
"NOTE2: If the Coder is unable to satisfactorily complete the task after five attempts, the script will terminate to prevent endless iterations.
"NOTE3: You will have to input your OPENAI_API_KEY at the bottom of the page for this to work!"""
)
if __name__ == "__main__":
iface.launch()