Spaces:
Running
Running
File size: 13,819 Bytes
b07e47b ac407f6 b07e47b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
import os
import sys
import asyncio
import logging
import threading
import queue
import gradio as gr
import httpx
from typing import Generator, Any, Dict, List
# -------------------- Configuration --------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# -------------------- External Model Call --------------------
async def call_model(prompt: str, model: str = "gpt-4o-mini", api_key: str = None) -> str:
"""
Sends a prompt to the OpenAI API endpoint using the specified model (overridden to gpt-4o-mini)
and returns the generated response.
"""
# Use the provided API key or fall back to the environment variable
if api_key is None:
api_key = os.getenv("OPENAI_API_KEY")
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Override the model value to always be "gpt-4o-mini"
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
}
async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
response = await client.post(url, headers=headers, json=payload)
response.raise_for_status()
response_json = response.json()
return response_json["choices"][0]["message"]["content"]
# -------------------- Agent Classes --------------------
class PromptOptimizerAgent:
async def optimize_prompt(self, user_prompt: str, api_key: str) -> str:
"""
Optimizes the user's initial prompt according to the following instructions:
>>> Given the user's initial prompt below the ### characters please enhance it.
1. Start with clear, precise instructions placed at the beginning of the prompt.
2. Include specific details about the desired context, outcome, length, format, and style.
3. Provide examples of the desired output format, if possible.
4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved.
5. Avoid any vague or imprecise language.
6. Rather than only stating what not to do, provide guidance on what should be done instead.
Remember to ensure the revised prompt remains true to the user's original intent. <<<
###User initial prompt below ###
"""
system_prompt = (
"Given the user's initial prompt below the ### characters please enhance it. "
"1. Start with clear, precise instructions placed at the beginning of the prompt. "
"2. Include specific details about the desired context, outcome, length, format, and style. "
"3. Provide examples of the desired output format, if possible. "
"4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. "
"5. Avoid any vague or imprecise language. "
"6. Rather than only stating what not to do, provide guidance on what should be done instead. "
"Remember to ensure the revised prompt remains true to the user's original intent. "
"###User initial prompt ###"
)
full_prompt = f"{system_prompt}\n{user_prompt}\n<<<"
optimized = await call_model(full_prompt, api_key=api_key)
return optimized
class OrchestratorAgent:
def __init__(self, log_queue: queue.Queue) -> None:
self.log_queue = log_queue
async def generate_plan(self, task: str, api_key: str) -> str:
"""
Generates a detailed, step-by-step plan for completing the given task.
"""
prompt = (
f"You are an orchestrator agent. The user has provided the task: '{task}'.\n"
"Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, "
"a code reviewer agent, and a documentation agent. List the steps as bullet points."
)
plan = await call_model(prompt, api_key=api_key)
return plan
class CoderAgent:
async def generate_code(self, instructions: str, api_key: str) -> str:
"""
Generates code based on the given instructions.
"""
prompt = (
"You are a coder agent. Based on the following instructions, generate the requested code. "
"Only output the generated code, never any explanations or any other information besides the actual code!\n"
f"{instructions}\n"
)
code = await call_model(prompt, api_key=api_key)
return code
class CodeReviewerAgent:
async def review_code(self, code: str, task: str, api_key: str) -> str:
"""
Reviews the provided code to check if it meets the task specifications.
NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct.
"""
prompt = (
"You are a code reviewing agent highly skilled in evaluating code quality. "
"Review the provided code and check if it meets the task specifications and properly addresses any provided feedback. "
"NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct. "
"Do not mention 'APPROVE' before actually approving! Do not request documentation or user guides:\n"
f"Task: {task}\n"
f"Code:\n{code}\n\n"
)
review = await call_model(prompt, api_key=api_key)
return review
class DocumentationAgent:
async def generate_documentation(self, code: str, api_key: str) -> str:
"""
Generates clear and concise documentation for the approved code,
including a brief and concise --help message.
"""
prompt = (
"You are a documentation agent. Generate a brief, clear and concise documentation for the following approved code. "
"Keep it short and compact, focusing on the main elements, do not include unnecessary extras that limit readability. "
"Additionally, generate a brief and concise --help message for the code:\n"
f"{code}\n"
"Briefly explain what the code does and how it works. Make sure to be clear and concise, do not include unnecessary extras that limit readability."
)
documentation = await call_model(prompt, api_key=api_key)
return documentation
# -------------------- Multi-Agent Conversation --------------------
async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str) -> None:
"""
Conducts a multi-agent conversation where each agent's response is generated via the external model API.
The conversation is logged to the provided queue.
"""
conversation: List[Dict[str, str]] = [] # List to store each agent's message
# Step 0: Use Prompt Optimizer to enhance the user's initial prompt.
log_queue.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...")
prompt_optimizer = PromptOptimizerAgent()
optimized_task = await prompt_optimizer.optimize_prompt(task_message, api_key=api_key)
conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"})
log_queue.put(f"[Prompt Optimizer]: Optimized task prompt:\n{optimized_task}")
# Step 1: Orchestrator generates a plan based on the optimized task.
log_queue.put("[Orchestrator]: Received optimized task. Generating plan...")
orchestrator = OrchestratorAgent(log_queue)
plan = await orchestrator.generate_plan(optimized_task, api_key=api_key)
conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"})
log_queue.put(f"[Orchestrator]: Plan generated:\n{plan}")
# Step 2: Coder generates code based on the plan.
coder = CoderAgent()
coder_instructions = f"Implement the task as described in the following plan:\n{plan}"
log_queue.put("[Coder]: Received coding task from the Orchestrator.")
code = await coder.generate_code(coder_instructions, api_key=api_key)
conversation.append({"agent": "Coder", "message": f"Code:\n{code}"})
log_queue.put(f"[Coder]: Code generated:\n{code}")
# Step 3: Code Reviewer reviews the generated code.
reviewer = CodeReviewerAgent()
approval_keyword = "approve"
revision_iteration = 0
while True:
if revision_iteration == 0:
log_queue.put("[Code Reviewer]: Starting review of the generated code...")
else:
log_queue.put(f"[Code Reviewer]: Reviewing the revised code (Iteration {revision_iteration})...")
review = await reviewer.review_code(code, optimized_task, api_key=api_key)
conversation.append({"agent": "Code Reviewer", "message": f"Review (Iteration {revision_iteration}):\n{review}"})
log_queue.put(f"[Code Reviewer]: Review feedback (Iteration {revision_iteration}):\n{review}")
# Check if the code has been approved
if approval_keyword in review.lower():
log_queue.put("[Code Reviewer]: Code approved.")
break # Exit the loop if approved
# If not approved, increment the revision count.
revision_iteration += 1
# Kill-switch: After 5 generations without approval, shut down.
if revision_iteration >= 5:
log_queue.put("Unable to solve your task to full satisfaction :(")
sys.exit("Unable to solve your task to full satisfaction :(")
# If under the limit, instruct the coder to revise the code.
log_queue.put(f"[Orchestrator]: Code not approved. Instructing coder to revise the code (Iteration {revision_iteration}).")
update_instructions = f"Please revise the code according to the following feedback. Feedback: {review}"
revised_code = await coder.generate_code(update_instructions, api_key=api_key)
conversation.append({"agent": "Coder", "message": f"Revised Code (Iteration {revision_iteration}):\n{revised_code}"})
log_queue.put(f"[Coder]: Revised code submitted (Iteration {revision_iteration}):\n{revised_code}")
code = revised_code # Update the code for the next review iteration
# Step 4: Documentation Agent generates documentation for the approved code.
doc_agent = DocumentationAgent()
log_queue.put("[Documentation Agent]: Generating documentation for the approved code.")
documentation = await doc_agent.generate_documentation(code, api_key=api_key)
conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{documentation}"})
log_queue.put(f"[Documentation Agent]: Documentation generated:\n{documentation}")
log_queue.put("Multi-agent conversation complete.")
log_queue.put(("result", conversation))
# -------------------- Process Generator for Streaming --------------------
def process_conversation_generator(task_message: str, api_key: str) -> Generator[str, None, None]:
"""
Wraps the asynchronous multi-agent conversation and yields log messages as they are generated.
"""
log_q: queue.Queue = queue.Queue()
def run_conversation() -> None:
asyncio.run(multi_agent_conversation(task_message, log_q, api_key))
thread = threading.Thread(target=run_conversation)
thread.start()
final_result = None
# Yield log messages as long as the thread is running or the queue is not empty.
while thread.is_alive() or not log_q.empty():
try:
msg = log_q.get(timeout=0.1)
if isinstance(msg, tuple) and msg[0] == "result":
final_result = msg[1]
yield "Final conversation complete."
else:
yield msg
except queue.Empty:
continue
thread.join()
if final_result:
# Format the final conversation log.
conv_text = "\n========== Multi-Agent Conversation ==========\n"
for entry in final_result:
conv_text += f"[{entry['agent']}]: {entry['message']}\n\n"
yield conv_text
# -------------------- Chat Function for Gradio --------------------
def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[str, None, None]:
"""
Chat function for Gradio.
The user's message is interpreted as the task description.
An optional OpenAI API key can be provided via the additional input; if not provided, the environment variable is used.
This function streams the multi-agent conversation log messages.
"""
if not openai_api_key:
openai_api_key = os.getenv("OPENAI_API_KEY")
yield from process_conversation_generator(message, openai_api_key)
# -------------------- Launch the Chatbot --------------------
# Use Gradio's ChatInterface with an additional input field for the OpenAI API key.
iface = gr.ChatInterface(
fn=multi_agent_chat,
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")],
type="messages",
title="Actual Multi-Agent Conversation Chatbot",
description="""
- Collaborative workflow between Prompt Enhancer, Orchestrator, Coder, Code-Reviewer and Documentation Generator agents.
- Enter a task and observe as your prompt gets magically solved! :)
- NOTE: The full conversation log will be displayed at the end, showing all the steps taken!
- NOTE2: If the Coder is unable to satisfactorily complete the task after five attempts, the script will terminate to prevent endless iterations.
- NOTE3: You will have to input your OPENAI_API_KEY at the bottom of the page for this to work!
"""
)
if __name__ == "__main__":
iface.launch() |