Spaces:
Running
Running
File size: 7,228 Bytes
02d640a b07e47b 02d640a b07e47b 02d640a b07e47b 02d640a 45727ee b07e47b 45727ee b07e47b 45727ee b07e47b 45727ee b07e47b 45727ee b07e47b 45727ee b07e47b 45727ee b07e47b 45727ee b07e47b 45727ee b07e47b 45727ee b07e47b 45727ee b07e47b 45727ee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import os
import sys
import asyncio
import logging
import threading
import queue
import gradio as gr
import httpx
from typing import Generator, Any, Dict, List
logger = logging.getLogger(__name__)
class Agent(ABC):
@abstractmethod
async def generate_response(self, prompt: str, api_key: str) -> str:
pass
class PromptOptimizerAgent(Agent):
async def generate_response(self, prompt: str, api_key: str) -> str:
system_prompt = (
"Given the user's initial prompt below the ### characters please enhance it. "
"1. Start with clear, precise instructions placed at the beginning of the prompt. "
"2. Include specific details about the desired context, outcome, length, format, and style. "
"3. Provide examples of the desired output format, if possible. "
"4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. "
"5. Avoid any vague or imprecise language. "
"6. Rather than only stating what not to do, provide guidance on what should be done instead. "
"Remember to ensure the revised prompt remains true to the user's original intent. "
###User initial prompt###
)
return await call_openai(system_prompt, api_key)
class OrchestratorAgent(Agent):
async def generate_response(self, task_message: str, api_key: str) -> str:
plan = f"You are an orchestrator agent. The user has provided the task: '{task_message}'. Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, a code reviewer agent, and a documentation agent. List the steps as bullet points."
return await call_openai(plan, api_key)
class CoderAgent(Agent):
async def generate_response(self, instructions: str, api_key: str) -> str:
prompt = f"Implement the task as described in the following plan:\n{instructions}"
return await call_openai(prompt, api_key)
class CodeReviewerAgent(Agent):
async def generate_response(self, code: str, task: str, api_key: str) -> str:
feedback = await call_openai(
f"You are a code reviewer agent. Review the provided code: '{code}' and check if it meets the task specifications.",
api_key=api_key
)
return feedback
class DocumentationAgent(Agent):
async def generate_response(self, code: str, api_key: str) -> str:
prompt = f"You are a documentation agent. Generate a brief documentation for the code:\nCode:\n{code}"
return await call_openai(prompt, api_key)
async def process_conversation_generator(conversation: list, log_q: queue.Queue, api_key: str) -> Generator[str, None, None]:
try:
while True:
if not conversation or not log_q.get(timeout=0.1):
continue
msg = log_q.get(timeout=0.1)
if isinstance(msg, tuple) and msg[0] == "result":
final_result = msg[1]
break
yield msg
except asyncio.CancelledError:
pass
finally:
if log_q.empty():
log_q.put("Final conversation complete.")
async def multi_agent_conversation(
task_message: str,
log_q: queue.Queue,
api_key: str,
additional_inputs=None
) -> None:
if additional_inputs is None:
additional_inputs = [gr.Textbox(label="OpenAI API Key (optional)")]
agents = [
PromptOptimizerAgent(),
OrchestratorAgent(),
CoderAgent(),
CodeReviewerAgent(),
DocumentationAgent()
]
log_queue = queue.Queue()
run_conversation = None
async def run_conversation_thread() -> None:
nonlocal run_conversation
try:
if run_conversation is not None:
await run_conversation
except asyncio.CancelledError:
pass
thread = asyncio.to_thread(run_conversation_thread)
thread.start()
try:
conversation = []
log_q.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...")
# Step 0: Use Prompt Optimizer
optimized_task = await agents[0].generate_response(task_message, api_key)
conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"})
log_q.put(f"[Prompt Optimizer]: Optimized Task:\n{optimized_task}")
# Step 1: Generate Plan
plan = await agents[1].generate_response(optimized_task, api_key)
conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"})
log_q.put(f"[Orchestrator]: Plan generated:\n{plan}")
# Step 2: Generate Code
code = await agents[2].generate_response(plan, api_key)
conversation.append({"agent": "Coder", "message": f"Code:\n{code}"})
log_q.put(f"[Coder]: Code generated:\n{code}")
# Step 3: Code Review
code_review = None
iteration = 0
while True:
if iteration >= 5:
log_q.put("[Code Reviewer]: Code not approved after 5 iterations: terminating.")
break
code_review = await agents[3].generate_response(code, plan, api_key)
revised_code = await agents[2].generate_response(
f"Please revise the code according to the following feedback: {code_review}",
api_key
)
code = revised_code
iteration += 1
if code == revised_code:
break
log_q.put(f"[Code Reviewer]: Feedback received:\n{code_review}")
log_q.put(f"[Code Reviewer]: Revised code:\n{revised_code}")
# Step 4: Documentation
doc = await agents[4].generate_response(code, api_key)
conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{doc}"})
log_q.put(f"[Documentation Agent]: Documentation generated:\n{doc}")
except Exception as e:
log_q.put(f"[All Agents]: An error occurred: {str(e)}")
logger.error(f"Error in multi_agent_conversation: {str(e)}")
finally:
thread.join()
async def multi_agent_conversation_wrapper(task_message: str, api_key: str) -> None:
await multi_agent_conversation(
task_message,
log_q=queue.Queue(),
api_key=api_key,
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)") if api_key is None else None]
)
if __name__ == "__main__":
iface = gr.ChatInterface(
fn=multi_agent_conversation_wrapper,
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)")],
type="messages",
title="Actual Multi-Agent Conversation Chatbot",
description="""
- Collaborative workflow between Prompt Enhancer, Orchestrator, Coder, Code-Reviewer and Documentation Agent agents.
- Enter a task description to observe the iterative workflow between the agents.
- NOTE: The kill-switch mechanism will terminate after five code rejection iterations to prevent endless loops.
- NOTE3: You can input your OPENAI_API_KEY at the bottom of the page for this to work!
""",
)
iface.launch()
|