File size: 7,228 Bytes
02d640a
 
 
b07e47b
02d640a
b07e47b
02d640a
b07e47b
02d640a
45727ee
b07e47b
45727ee
 
 
 
b07e47b
45727ee
 
b07e47b
 
 
 
 
 
 
 
 
45727ee
b07e47b
45727ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b07e47b
45727ee
b07e47b
45727ee
 
 
 
b07e47b
45727ee
 
 
 
 
b07e47b
 
 
 
45727ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b07e47b
45727ee
 
b07e47b
45727ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b07e47b
 
45727ee
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import os
import sys
import asyncio
import logging
import threading
import queue
import gradio as gr
import httpx
from typing import Generator, Any, Dict, List
logger = logging.getLogger(__name__)

class Agent(ABC):
    @abstractmethod
    async def generate_response(self, prompt: str, api_key: str) -> str:
        pass

class PromptOptimizerAgent(Agent):
    async def generate_response(self, prompt: str, api_key: str) -> str:
        system_prompt = (
            "Given the user's initial prompt below the ### characters please enhance it. "
            "1. Start with clear, precise instructions placed at the beginning of the prompt. "
            "2. Include specific details about the desired context, outcome, length, format, and style. "
            "3. Provide examples of the desired output format, if possible. "
            "4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. "
            "5. Avoid any vague or imprecise language. "
            "6. Rather than only stating what not to do, provide guidance on what should be done instead. "
            "Remember to ensure the revised prompt remains true to the user's original intent. "
            ###User initial prompt###
        )
        return await call_openai(system_prompt, api_key)

class OrchestratorAgent(Agent):
    async def generate_response(self, task_message: str, api_key: str) -> str:
        plan = f"You are an orchestrator agent. The user has provided the task: '{task_message}'. Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, a code reviewer agent, and a documentation agent. List the steps as bullet points."
        return await call_openai(plan, api_key)

class CoderAgent(Agent):
    async def generate_response(self, instructions: str, api_key: str) -> str:
        prompt = f"Implement the task as described in the following plan:\n{instructions}"
        return await call_openai(prompt, api_key)

class CodeReviewerAgent(Agent):
    async def generate_response(self, code: str, task: str, api_key: str) -> str:
        feedback = await call_openai(
            f"You are a code reviewer agent. Review the provided code: '{code}' and check if it meets the task specifications.",
            api_key=api_key
        )
        return feedback

class DocumentationAgent(Agent):
    async def generate_response(self, code: str, api_key: str) -> str:
        prompt = f"You are a documentation agent. Generate a brief documentation for the code:\nCode:\n{code}"
        return await call_openai(prompt, api_key)

async def process_conversation_generator(conversation: list, log_q: queue.Queue, api_key: str) -> Generator[str, None, None]:
    try:
        while True:
            if not conversation or not log_q.get(timeout=0.1):
                continue

            msg = log_q.get(timeout=0.1)
            if isinstance(msg, tuple) and msg[0] == "result":
                final_result = msg[1]
                break

            yield msg
    except asyncio.CancelledError:
        pass
    finally:
        if log_q.empty():
            log_q.put("Final conversation complete.")

async def multi_agent_conversation(
    task_message: str, 
    log_q: queue.Queue, 
    api_key: str,
    additional_inputs=None
) -> None:
    if additional_inputs is None:
        additional_inputs = [gr.Textbox(label="OpenAI API Key (optional)")]

    agents = [
        PromptOptimizerAgent(),
        OrchestratorAgent(),
        CoderAgent(),
        CodeReviewerAgent(),
        DocumentationAgent()
    ]

    log_queue = queue.Queue()
    run_conversation = None

    async def run_conversation_thread() -> None:
        nonlocal run_conversation
        try:
            if run_conversation is not None:
                await run_conversation
        except asyncio.CancelledError:
            pass

    thread = asyncio.to_thread(run_conversation_thread)
    thread.start()

    try:
        conversation = []
        log_q.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...")

        # Step 0: Use Prompt Optimizer
        optimized_task = await agents[0].generate_response(task_message, api_key)
        conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"})
        log_q.put(f"[Prompt Optimizer]: Optimized Task:\n{optimized_task}")

        # Step 1: Generate Plan
        plan = await agents[1].generate_response(optimized_task, api_key)
        conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"})
        log_q.put(f"[Orchestrator]: Plan generated:\n{plan}")

        # Step 2: Generate Code
        code = await agents[2].generate_response(plan, api_key)
        conversation.append({"agent": "Coder", "message": f"Code:\n{code}"})
        log_q.put(f"[Coder]: Code generated:\n{code}")

        # Step 3: Code Review
        code_review = None
        iteration = 0
        while True:
            if iteration >= 5:
                log_q.put("[Code Reviewer]: Code not approved after 5 iterations: terminating.")
                break

            code_review = await agents[3].generate_response(code, plan, api_key)
            revised_code = await agents[2].generate_response(
                f"Please revise the code according to the following feedback: {code_review}",
                api_key
            )
            code = revised_code
            iteration += 1

            if code == revised_code:
                break

            log_q.put(f"[Code Reviewer]: Feedback received:\n{code_review}")
            log_q.put(f"[Code Reviewer]: Revised code:\n{revised_code}")

        # Step 4: Documentation
        doc = await agents[4].generate_response(code, api_key)
        conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{doc}"})
        log_q.put(f"[Documentation Agent]: Documentation generated:\n{doc}")

    except Exception as e:
        log_q.put(f"[All Agents]: An error occurred: {str(e)}")
        logger.error(f"Error in multi_agent_conversation: {str(e)}")

    finally:
        thread.join()

async def multi_agent_conversation_wrapper(task_message: str, api_key: str) -> None:
    await multi_agent_conversation(
        task_message, 
        log_q=queue.Queue(),
        api_key=api_key,
        additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)") if api_key is None else None]
    )

if __name__ == "__main__":
    iface = gr.ChatInterface(
        fn=multi_agent_conversation_wrapper,
        additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)")],
        type="messages",
        title="Actual Multi-Agent Conversation Chatbot",
        description="""
            - Collaborative workflow between Prompt Enhancer, Orchestrator, Coder, Code-Reviewer and Documentation Agent agents.
            - Enter a task description to observe the iterative workflow between the agents.
            - NOTE: The kill-switch mechanism will terminate after five code rejection iterations to prevent endless loops.
            - NOTE3: You can input your OPENAI_API_KEY at the bottom of the page for this to work!
        """,
    )

    iface.launch()