File size: 13,819 Bytes
b07e47b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac407f6
 
 
 
 
 
 
b07e47b
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
import os
import sys
import asyncio
import logging
import threading
import queue
import gradio as gr
import httpx
from typing import Generator, Any, Dict, List

# -------------------- Configuration --------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# -------------------- External Model Call --------------------
async def call_model(prompt: str, model: str = "gpt-4o-mini", api_key: str = None) -> str:
    """
    Sends a prompt to the OpenAI API endpoint using the specified model (overridden to gpt-4o-mini)
    and returns the generated response.
    """
    # Use the provided API key or fall back to the environment variable
    if api_key is None:
        api_key = os.getenv("OPENAI_API_KEY")
    url = "https://api.openai.com/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    # Override the model value to always be "gpt-4o-mini"
    payload = {
        "model": "gpt-4o-mini",
        "messages": [{"role": "user", "content": prompt}],
    }
    async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
        response = await client.post(url, headers=headers, json=payload)
        response.raise_for_status()
        response_json = response.json()
        return response_json["choices"][0]["message"]["content"]

# -------------------- Agent Classes --------------------
class PromptOptimizerAgent:
    async def optimize_prompt(self, user_prompt: str, api_key: str) -> str:
        """
        Optimizes the user's initial prompt according to the following instructions:
        >>> Given the user's initial prompt below the ### characters please enhance it. 
        1. Start with clear, precise instructions placed at the beginning of the prompt. 
        2. Include specific details about the desired context, outcome, length, format, and style. 
        3. Provide examples of the desired output format, if possible. 
        4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. 
        5. Avoid any vague or imprecise language. 
        6. Rather than only stating what not to do, provide guidance on what should be done instead. 
        Remember to ensure the revised prompt remains true to the user's original intent. <<<
        ###User initial prompt below ###
        """
        system_prompt = (
            "Given the user's initial prompt below the ### characters please enhance it. "
            "1. Start with clear, precise instructions placed at the beginning of the prompt. "
            "2. Include specific details about the desired context, outcome, length, format, and style. "
            "3. Provide examples of the desired output format, if possible. "
            "4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. "
            "5. Avoid any vague or imprecise language. "
            "6. Rather than only stating what not to do, provide guidance on what should be done instead. "
            "Remember to ensure the revised prompt remains true to the user's original intent. "
            "###User initial prompt ###"
        )
        full_prompt = f"{system_prompt}\n{user_prompt}\n<<<"
        optimized = await call_model(full_prompt, api_key=api_key)
        return optimized

class OrchestratorAgent:
    def __init__(self, log_queue: queue.Queue) -> None:
        self.log_queue = log_queue

    async def generate_plan(self, task: str, api_key: str) -> str:
        """
        Generates a detailed, step-by-step plan for completing the given task.
        """
        prompt = (
            f"You are an orchestrator agent. The user has provided the task: '{task}'.\n"
            "Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, "
            "a code reviewer agent, and a documentation agent. List the steps as bullet points."
        )
        plan = await call_model(prompt, api_key=api_key)
        return plan

class CoderAgent:
    async def generate_code(self, instructions: str, api_key: str) -> str:
        """
        Generates code based on the given instructions.
        """
        prompt = (
            "You are a coder agent. Based on the following instructions, generate the requested code. "
            "Only output the generated code, never any explanations or any other information besides the actual code!\n"
            f"{instructions}\n"
        )
        code = await call_model(prompt, api_key=api_key)
        return code

class CodeReviewerAgent:
    async def review_code(self, code: str, task: str, api_key: str) -> str:
        """
        Reviews the provided code to check if it meets the task specifications.
        NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct.
        """
        prompt = (
            "You are a code reviewing agent highly skilled in evaluating code quality. "
            "Review the provided code and check if it meets the task specifications and properly addresses any provided feedback. "
            "NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct. "
            "Do not mention 'APPROVE' before actually approving! Do not request documentation or user guides:\n"
            f"Task: {task}\n"
            f"Code:\n{code}\n\n"
        )
        review = await call_model(prompt, api_key=api_key)
        return review

class DocumentationAgent:
    async def generate_documentation(self, code: str, api_key: str) -> str:
        """
        Generates clear and concise documentation for the approved code,
        including a brief and concise --help message.
        """
        prompt = (
            "You are a documentation agent. Generate a brief, clear and concise documentation for the following approved code. "
            "Keep it short and compact, focusing on the main elements, do not include unnecessary extras that limit readability. "
            "Additionally, generate a brief and concise --help message for the code:\n"
            f"{code}\n"
            "Briefly explain what the code does and how it works. Make sure to be clear and concise, do not include unnecessary extras that limit readability."
        )
        documentation = await call_model(prompt, api_key=api_key)
        return documentation

# -------------------- Multi-Agent Conversation --------------------
async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str) -> None:
    """
    Conducts a multi-agent conversation where each agent's response is generated via the external model API.
    The conversation is logged to the provided queue.
    """
    conversation: List[Dict[str, str]] = []  # List to store each agent's message

    # Step 0: Use Prompt Optimizer to enhance the user's initial prompt.
    log_queue.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...")
    prompt_optimizer = PromptOptimizerAgent()
    optimized_task = await prompt_optimizer.optimize_prompt(task_message, api_key=api_key)
    conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"})
    log_queue.put(f"[Prompt Optimizer]: Optimized task prompt:\n{optimized_task}")

    # Step 1: Orchestrator generates a plan based on the optimized task.
    log_queue.put("[Orchestrator]: Received optimized task. Generating plan...")
    orchestrator = OrchestratorAgent(log_queue)
    plan = await orchestrator.generate_plan(optimized_task, api_key=api_key)
    conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"})
    log_queue.put(f"[Orchestrator]: Plan generated:\n{plan}")

    # Step 2: Coder generates code based on the plan.
    coder = CoderAgent()
    coder_instructions = f"Implement the task as described in the following plan:\n{plan}"
    log_queue.put("[Coder]: Received coding task from the Orchestrator.")
    code = await coder.generate_code(coder_instructions, api_key=api_key)
    conversation.append({"agent": "Coder", "message": f"Code:\n{code}"})
    log_queue.put(f"[Coder]: Code generated:\n{code}")

    # Step 3: Code Reviewer reviews the generated code.
    reviewer = CodeReviewerAgent()
    approval_keyword = "approve"
    revision_iteration = 0
    while True:
        if revision_iteration == 0:
            log_queue.put("[Code Reviewer]: Starting review of the generated code...")
        else:
            log_queue.put(f"[Code Reviewer]: Reviewing the revised code (Iteration {revision_iteration})...")

        review = await reviewer.review_code(code, optimized_task, api_key=api_key)
        conversation.append({"agent": "Code Reviewer", "message": f"Review (Iteration {revision_iteration}):\n{review}"})
        log_queue.put(f"[Code Reviewer]: Review feedback (Iteration {revision_iteration}):\n{review}")

        # Check if the code has been approved
        if approval_keyword in review.lower():
            log_queue.put("[Code Reviewer]: Code approved.")
            break  # Exit the loop if approved

        # If not approved, increment the revision count.
        revision_iteration += 1

        # Kill-switch: After 5 generations without approval, shut down.
        if revision_iteration >= 5:
            log_queue.put("Unable to solve your task to full satisfaction :(")
            sys.exit("Unable to solve your task to full satisfaction :(")

        # If under the limit, instruct the coder to revise the code.
        log_queue.put(f"[Orchestrator]: Code not approved. Instructing coder to revise the code (Iteration {revision_iteration}).")
        update_instructions = f"Please revise the code according to the following feedback. Feedback: {review}"
        revised_code = await coder.generate_code(update_instructions, api_key=api_key)
        conversation.append({"agent": "Coder", "message": f"Revised Code (Iteration {revision_iteration}):\n{revised_code}"})
        log_queue.put(f"[Coder]: Revised code submitted (Iteration {revision_iteration}):\n{revised_code}")
        code = revised_code  # Update the code for the next review iteration

    # Step 4: Documentation Agent generates documentation for the approved code.
    doc_agent = DocumentationAgent()
    log_queue.put("[Documentation Agent]: Generating documentation for the approved code.")
    documentation = await doc_agent.generate_documentation(code, api_key=api_key)
    conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{documentation}"})
    log_queue.put(f"[Documentation Agent]: Documentation generated:\n{documentation}")

    log_queue.put("Multi-agent conversation complete.")
    log_queue.put(("result", conversation))

# -------------------- Process Generator for Streaming --------------------
def process_conversation_generator(task_message: str, api_key: str) -> Generator[str, None, None]:
    """
    Wraps the asynchronous multi-agent conversation and yields log messages as they are generated.
    """
    log_q: queue.Queue = queue.Queue()

    def run_conversation() -> None:
        asyncio.run(multi_agent_conversation(task_message, log_q, api_key))

    thread = threading.Thread(target=run_conversation)
    thread.start()

    final_result = None
    # Yield log messages as long as the thread is running or the queue is not empty.
    while thread.is_alive() or not log_q.empty():
        try:
            msg = log_q.get(timeout=0.1)
            if isinstance(msg, tuple) and msg[0] == "result":
                final_result = msg[1]
                yield "Final conversation complete."
            else:
                yield msg
        except queue.Empty:
            continue

    thread.join()
    if final_result:
        # Format the final conversation log.
        conv_text = "\n========== Multi-Agent Conversation ==========\n"
        for entry in final_result:
            conv_text += f"[{entry['agent']}]: {entry['message']}\n\n"
        yield conv_text

# -------------------- Chat Function for Gradio --------------------
def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[str, None, None]:
    """
    Chat function for Gradio.
    The user's message is interpreted as the task description.
    An optional OpenAI API key can be provided via the additional input; if not provided, the environment variable is used.
    This function streams the multi-agent conversation log messages.
    """
    if not openai_api_key:
        openai_api_key = os.getenv("OPENAI_API_KEY")
    yield from process_conversation_generator(message, openai_api_key)

# -------------------- Launch the Chatbot --------------------
# Use Gradio's ChatInterface with an additional input field for the OpenAI API key.
iface = gr.ChatInterface(
    fn=multi_agent_chat,
    additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")],
    type="messages",
    title="Actual Multi-Agent Conversation Chatbot",
    description="""
        - Collaborative workflow between Prompt Enhancer, Orchestrator, Coder, Code-Reviewer and Documentation Generator agents.
        - Enter a task and observe as your prompt gets magically solved! :)
        - NOTE: The full conversation log will be displayed at the end, showing all the steps taken!
        - NOTE2: If the Coder is unable to satisfactorily complete the task after five attempts, the script will terminate to prevent endless iterations.
        - NOTE3: You will have to input your OPENAI_API_KEY at the bottom of the page for this to work!
        """
)

if __name__ == "__main__":
    iface.launch()