CultriX commited on
Commit
763898d
·
verified ·
1 Parent(s): 9d9cc65

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +206 -186
app.py CHANGED
@@ -1,21 +1,23 @@
1
  import os
2
- import sys
3
  import asyncio
4
  import logging
5
  import threading
6
  import queue
7
  import gradio as gr
8
  import httpx
9
- from typing import Generator, Any, Dict, List, Optional, Callable
10
  from functools import lru_cache
11
 
12
  # -------------------- Configuration --------------------
13
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
 
 
14
 
15
  # -------------------- External Model Call (with Caching and Retry) --------------------
16
- # Removed @lru_cache here, as it caused issues with async and Gradio
17
  async def call_model(prompt: str, model: str = "gpt-4o", api_key: str = None, max_retries: int = 3) -> str:
18
- """Sends a prompt to the OpenAI API endpoint, with caching and retries."""
 
 
19
  if api_key is None:
20
  api_key = os.getenv("OPENAI_API_KEY")
21
  if api_key is None:
@@ -23,37 +25,33 @@ async def call_model(prompt: str, model: str = "gpt-4o", api_key: str = None, ma
23
  url = "https://api.openai.com/v1/chat/completions"
24
  headers = {
25
  "Authorization": f"Bearer {api_key}",
26
- "Content-Type": "application/json"
27
- }
28
- payload = {
29
- "model": model,
30
- "messages": [{"role": "user", "content": prompt}],
31
  }
 
32
 
33
  for attempt in range(max_retries):
34
  try:
35
  async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
36
  response = await client.post(url, headers=headers, json=payload)
37
  response.raise_for_status()
38
- response_json = response.json()
39
  return response_json["choices"][0]["message"]["content"]
40
  except httpx.HTTPStatusError as e:
41
  logging.error(f"HTTP error (attempt {attempt + 1}/{max_retries}): {e}")
42
- if e.response.status_code in (502, 503, 504): # Retry on 502, 503, 504
43
- await asyncio.sleep(2 ** attempt) # Exponential backoff
44
  continue
45
  else:
46
- raise # Re-raise for other HTTP errors
47
  except httpx.RequestError as e:
48
  logging.error(f"Request error (attempt {attempt + 1}/{max_retries}): {e}")
49
  await asyncio.sleep(2 ** attempt)
50
  continue
51
  except Exception as e:
52
- logging.error(f"An unexpected error occurred (attempt {attempt+1}/{max_retries}): {e}")
53
  raise
54
  raise Exception(f"Failed to get response from OpenAI API after {max_retries} attempts.")
55
 
56
-
57
  # -------------------- Shared Context --------------------
58
  class Context:
59
  def __init__(self, original_task: str, optimized_task: Optional[str] = None,
@@ -78,8 +76,13 @@ class Context:
78
 
79
  class PromptOptimizerAgent:
80
  async def optimize_prompt(self, context: Context, api_key: str) -> Context:
81
- """Optimizes the user's initial prompt."""
82
- system_prompt = "Improve the prompt. Be clear, specific, and complete. Keep original intent. Return ONLY the revised prompt."
 
 
 
 
 
83
  full_prompt = f"{system_prompt}\n\nUser's prompt:\n{context.original_task}"
84
  optimized = await call_model(full_prompt, model="gpt-4o", api_key=api_key)
85
  context.optimized_task = optimized
@@ -87,65 +90,69 @@ class PromptOptimizerAgent:
87
  return context
88
 
89
  class OrchestratorAgent:
90
- def __init__(self, log_queue: queue.Queue, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> None:
91
  self.log_queue = log_queue
92
- self.human_in_the_loop_event = human_in_the_loop_event
93
  self.human_input_queue = human_input_queue
94
 
95
- async def generate_plan(self, context: Context, api_key: str, human_feedback: Optional[str] = None) -> Context:
96
- """Generates a plan, potentially requesting human feedback."""
97
-
98
- if human_feedback:
99
- prompt = (
100
- f"You are a planner. Revise/complete the plan for '{context.original_task}' using feedback:\n"
101
- f"{human_feedback}\n\nCurrent Plan:\n{context.plan if context.plan else 'No plan yet.'}\n\n"
102
- "Output the plan as a numbered list. If unsure, output 'REQUEST_HUMAN_FEEDBACK\\n[Question]'"
103
- )
104
- plan = await call_model(prompt, model="gpt-4o", api_key=api_key)
105
-
106
- else:
107
- prompt = (
108
- f"You are a planner. Create a plan for: '{context.optimized_task}'. "
109
- "Break down the task. Assign sub-tasks to: Coder, Code Reviewer, Quality Assurance Tester, and Documentation Agent. "
110
- "Include review/revision steps. Consider error handling. Include documentation instructions.\n\n"
111
- "If unsure, output 'REQUEST_HUMAN_FEEDBACK\\n[Question]'\n\nOutput the plan as a numbered list."
112
- )
113
- plan = await call_model(prompt, model="gpt-4o", api_key=api_key)
114
-
115
-
116
- if "REQUEST_HUMAN_FEEDBACK" in plan:
117
- self.log_queue.put("[Orchestrator]: Requesting human feedback...")
118
- question = plan.split("REQUEST_HUMAN_FEEDBACK\n", 1)[1].strip()
119
- self.log_queue.put(f"[Orchestrator]: Question for human: {question}")
120
-
121
- #Prepare detailed context for human
122
- feedback_request_context = (f"The orchestrator agent is requesting feedback on the following task:\n **{context.optimized_task}**\n\n"
123
- f"The current plan (if any):\n**{context.plan}**\n\n" if context.plan else "") + f"The specific question is:\n**{question}**"
124
-
125
- self.human_in_the_loop_event.set() # Signal the human input thread
126
-
127
- human_response = self.get_human_response(feedback_request_context) # Pass context to input function
128
- self.human_in_the_loop_event.clear() # Reset the event
129
- self.log_queue.put(f"[Orchestrator]: Received human feedback: {human_response}")
130
- context.add_conversation_entry("Orchestrator", f"Plan:\n{plan}\n\nHuman Feedback Requested. Question: {question}")
131
- return await self.generate_plan(context, api_key, human_response) # Recursive call
132
-
133
- context.plan = plan
134
- context.add_conversation_entry("Orchestrator", f"Plan:\n{plan}")
135
- return context
136
-
137
- def get_human_response(self, feedback_request_context):
138
- """Gets human input, using the Gradio queue and event."""
139
- self.human_input_queue.put(feedback_request_context) # Put the question into Gradio
140
- human_response = self.human_input_queue.get() # Get the response
141
- return human_response
 
 
142
 
143
  class CoderAgent:
144
  async def generate_code(self, context: Context, api_key: str, model: str = "gpt-4o") -> Context:
145
- """Generates code based on instructions."""
 
 
146
  prompt = (
147
  "You are a coding agent. Output ONLY the code. "
148
- "Adhere to best practices. Include error handling.\n\n"
149
  f"Instructions:\n{context.plan}"
150
  )
151
  code = await call_model(prompt, model=model, api_key=api_key)
@@ -155,34 +162,38 @@ class CoderAgent:
155
 
156
  class CodeReviewerAgent:
157
  async def review_code(self, context: Context, api_key: str) -> Context:
158
- """Reviews code. Provides concise, actionable feedback or 'APPROVE'."""
 
 
159
  prompt = (
160
- "You are a code reviewer. Provide CONCISE feedback. "
161
- "Focus on correctness, efficiency, readability, error handling, security, and adherence to the task. "
162
- "Suggest improvements. If acceptable, respond with ONLY 'APPROVE'. "
163
- "Do NOT generate code.\n\n"
164
  f"Task: {context.optimized_task}\n\nCode:\n{context.code}"
165
  )
166
  review = await call_model(prompt, model="gpt-4o", api_key=api_key)
167
  context.add_conversation_entry("Code Reviewer", f"Review:\n{review}")
168
 
169
- # Structured Feedback (Example)
170
  if "APPROVE" not in review.upper():
171
- structured_review = {"comments": []}
172
- #In a real implementation you might use a more advanced parsing technique here
173
- for line in review.splitlines():
174
- if line.strip(): #Simple example
175
- structured_review["comments"].append({"issue": line.strip(), "line_number": "N/A", "severity": "Medium"}) #Dummy data
176
- context.review_comments.append(structured_review)
177
-
 
 
178
  return context
179
 
180
  class QualityAssuranceTesterAgent:
181
  async def generate_test_cases(self, context: Context, api_key: str) -> Context:
182
- """Generates test cases."""
 
 
183
  prompt = (
184
- "You are a testing agent. Generate test cases. "
185
- "Consider edge cases and error scenarios. Output in a clear format.\n\n"
186
  f"Task: {context.optimized_task}\n\nCode:\n{context.code}"
187
  )
188
  test_cases = await call_model(prompt, model="gpt-4o", api_key=api_key)
@@ -191,10 +202,12 @@ class QualityAssuranceTesterAgent:
191
  return context
192
 
193
  async def run_tests(self, context: Context, api_key: str) -> Context:
194
- """Runs tests and reports results."""
 
 
195
  prompt = (
196
- "Run the test cases. Compare actual vs expected output. "
197
- "State discrepancies. If all pass, output 'TESTS PASSED'.\n\n"
198
  f"Code:\n{context.code}\n\nTest Cases:\n{context.test_cases}"
199
  )
200
  test_results = await call_model(prompt, model="gpt-4o", api_key=api_key)
@@ -204,10 +217,11 @@ class QualityAssuranceTesterAgent:
204
 
205
  class DocumentationAgent:
206
  async def generate_documentation(self, context: Context, api_key: str) -> Context:
207
- """Generates documentation, including a --help message."""
 
 
208
  prompt = (
209
- "Generate clear and concise documentation. "
210
- "Include a brief description, explanation, and a --help message.\n\n"
211
  f"Code:\n{context.code}"
212
  )
213
  documentation = await call_model(prompt, model="gpt-4o", api_key=api_key)
@@ -215,16 +229,16 @@ class DocumentationAgent:
215
  context.add_conversation_entry("Documentation Agent", f"Documentation:\n{documentation}")
216
  return context
217
 
218
- # -------------------- Agent Dispatcher (New) --------------------
219
 
220
  class AgentDispatcher:
221
- def __init__(self, log_queue: queue.Queue, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue):
222
  self.log_queue = log_queue
223
- self.human_in_the_loop_event = human_in_the_loop_event
224
  self.human_input_queue = human_input_queue
225
  self.agents = {
226
  "prompt_optimizer": PromptOptimizerAgent(),
227
- "orchestrator": OrchestratorAgent(log_queue, human_in_the_loop_event, human_input_queue),
228
  "coder": CoderAgent(),
229
  "code_reviewer": CodeReviewerAgent(),
230
  "qa_tester": QualityAssuranceTesterAgent(),
@@ -232,16 +246,18 @@ class AgentDispatcher:
232
  }
233
 
234
  async def dispatch(self, agent_name: str, context: Context, api_key: str, **kwargs) -> Context:
235
- """Dispatches the task to the specified agent."""
 
 
236
  agent = self.agents.get(agent_name)
237
  if not agent:
238
  raise ValueError(f"Unknown agent: {agent_name}")
239
 
240
- self.log_queue.put(f"[{agent_name.replace('_', ' ').title()}]: Starting task...") # Log here
241
  if agent_name == "prompt_optimizer":
242
  context = await agent.optimize_prompt(context, api_key)
243
  elif agent_name == "orchestrator":
244
- context = await agent.generate_plan(context, api_key) #Removed human_feedback
245
  elif agent_name == "coder":
246
  context = await agent.generate_code(context, api_key, **kwargs)
247
  elif agent_name == "code_reviewer":
@@ -254,167 +270,171 @@ class AgentDispatcher:
254
  elif agent_name == "documentation_agent":
255
  context = await agent.generate_documentation(context, api_key)
256
  else:
257
- raise ValueError(f"Unknown Agent Name: {agent_name}")
258
-
259
  return context
260
- async def determine_next_agent(self, context:Context, api_key:str) -> str:
261
- """Determines the next agent to run based on the current context."""
 
 
 
262
  if not context.optimized_task:
263
  return "prompt_optimizer"
264
  if not context.plan:
265
  return "orchestrator"
266
  if not context.code:
267
  return "coder"
268
- if not context.review_comments or "APPROVE" not in [comment.get('issue',"").upper() for comment_list in context.review_comments for comment in comment_list.get("comments",[]) ]:
 
 
 
 
 
269
  return "code_reviewer"
270
  if not context.test_cases:
271
  return "qa_tester"
272
- if not context.test_results or "TESTS PASSED" not in context.test_results.upper() :
273
  return "qa_tester"
274
  if not context.documentation:
275
  return "documentation_agent"
276
 
277
  return "done" # All tasks are complete
278
 
279
- # -------------------- Multi-Agent Conversation (Refactored) --------------------
280
- async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> None:
 
 
281
  """
282
- Conducts the multi-agent conversation using the AgentDispatcher.
283
  """
284
  context = Context(original_task=task_message)
285
- dispatcher = AgentDispatcher(log_queue, human_in_the_loop_event, human_input_queue)
286
 
287
  next_agent = await dispatcher.determine_next_agent(context, api_key)
 
 
 
288
  while next_agent != "done":
289
  if next_agent == "qa_tester":
290
  if not context.test_cases:
291
- context = await dispatcher.dispatch(next_agent, context, api_key, generate_tests=True)
292
  else:
293
- context = await dispatcher.dispatch(next_agent, context, api_key, run_tests=True)
294
  elif next_agent == "coder" and (context.review_comments or context.test_results):
295
- #Coder needs a different model after the first coding
296
- context = await dispatcher.dispatch(next_agent,context, api_key, model="gpt-3.5-turbo-16k")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
297
  else:
298
- context = await dispatcher.dispatch(next_agent, context, api_key) # Call the agent
299
 
300
- next_agent = await dispatcher.determine_next_agent(context, api_key)
301
- if next_agent == "code_reviewer" and context.review_comments and "APPROVE" in [comment.get('issue',"").upper() for comment_list in context.review_comments for comment in comment_list.get("comments",[]) ]:
302
- next_agent = await dispatcher.determine_next_agent(context, api_key)
303
- # Check for maximum revisions
304
- if next_agent == "coder" and len([entry for entry in context.conversation_history if entry["agent"] == "Coder"]) > 5:
305
- log_queue.put("Maximum revision iterations reached. Exiting.")
306
- break;
307
 
308
  log_queue.put("Conversation complete.")
309
  log_queue.put(("result", context.conversation_history))
310
 
311
  # -------------------- Process Generator and Human Input --------------------
312
- def process_conversation_generator(task_message: str, api_key: str, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue, log_queue: queue.Queue) -> Generator[str, None, None]:
 
 
 
313
  """
314
- Wraps the conversation, yields log messages, and handles human input within a single thread.
315
- Crucially, takes the log_queue as an argument. Yields Gradio updates.
316
  """
 
 
317
 
318
- # Run the multi-agent conversation *synchronously* within this function.
319
- asyncio.run(multi_agent_conversation(task_message, log_queue, api_key, human_in_the_loop_event, human_input_queue))
320
-
321
- # Process the log queue and handle human-in-the-loop
322
  final_result = None
323
- while True: # Loop indefinitely to handle multiple potential human feedback requests.
324
  try:
325
- msg = log_queue.get_nowait() # Non-blocking get from the log queue.
326
  if isinstance(msg, tuple) and msg[0] == "result":
327
  final_result = msg[1]
328
- yield gr.Chatbot.update(final_result) # Update the chatbot with the final result
329
- yield "Conversation complete." # Indicate completion.
330
- break # Exit the loop after processing the final result.
331
  else:
332
- yield msg # Yield the log message.
333
  except queue.Empty:
334
- pass # No log message available, continue checking for human input.
335
-
336
 
337
- if human_in_the_loop_event.is_set():
338
- yield "Waiting for human feedback..." # Indicate waiting state.
339
- try:
340
- feedback_request = human_input_queue.get(
341
- timeout=0.1) # Get the context/question for feedback.
342
- human_interface = get_human_feedback(feedback_request)
343
- yield gr.Textbox.update(visible=False), gr.update(visible=True) # Show feedback UI
344
- human_feedback = human_input_queue.get(
345
- timeout=300) # Wait (block) for human feedback, with a timeout.
346
- human_in_the_loop_event.clear() # Reset the event after getting feedback.
347
- yield gr.Textbox.update(visible=True), human_interface.close() # Hide feedback UI.
348
 
349
-
350
- except queue.Empty:
351
- pass
352
- # Add a small sleep to avoid busy-waiting and reduce CPU usage.
353
- time.sleep(0.1)
354
-
355
-
356
-
357
-
358
- def get_human_feedback(placeholder_text):
359
- """Gets human input using a Gradio Textbox."""
360
  with gr.Blocks() as human_feedback_interface:
361
  with gr.Row():
362
  human_input = gr.Textbox(lines=4, label="Human Feedback", placeholder=placeholder_text)
363
  with gr.Row():
364
  submit_button = gr.Button("Submit Feedback")
365
 
366
- def submit_feedback(input_text):
367
- # Put the feedback into the shared queue
368
  human_input_queue.put(input_text)
369
- return "" # Clear the input box after submission
370
-
371
 
372
  submit_button.click(fn=submit_feedback, inputs=human_input, outputs=human_input)
373
- human_feedback_interface.load(None, [], []) # Keep interface alive
374
-
375
  return human_feedback_interface
376
 
377
  # -------------------- Chat Function for Gradio --------------------
378
 
379
- def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[str, None, None]:
380
- """Chat function for Gradio."""
 
 
381
  if not openai_api_key:
382
  openai_api_key = os.getenv("OPENAI_API_KEY")
383
  if not openai_api_key:
384
  yield "Error: API key not provided."
385
  return
386
 
387
- human_in_the_loop_event = threading.Event()
388
- human_input_queue = queue.Queue() # Use a single queue for both requests and responses
389
- log_queue = queue.Queue() #Create log queue here
390
-
391
- yield from process_conversation_generator(message, openai_api_key, human_in_the_loop_event, human_input_queue, log_queue)
392
-
393
-
394
 
 
395
 
396
  # -------------------- Launch the Chatbot --------------------
397
 
398
- # Create the main chat interface
399
  iface = gr.ChatInterface(
400
  fn=multi_agent_chat,
401
- chatbot=gr.Chatbot(type="feed"), # Use the 'feed' type for a better display of messages
402
- additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")],
 
 
403
  title="Multi-Agent Task Solver with Human-in-the-Loop",
404
- description="""
405
- - Collaborative workflow with Human-in-the-Loop.
406
- - Orchestrator can ask for human feedback.
407
- - Enter a task; agents will work on it. You may be prompted for input.
408
- - Max 5 revisions.
409
- - Provide API Key.
410
- """
411
  )
412
 
413
- #Need a dummy interface to prevent Gradio errors
414
- dummy_iface = gr.Interface(lambda x:x, "textbox", "textbox")
415
 
416
  if __name__ == "__main__":
417
  demo = gr.TabbedInterface([iface, dummy_iface], ["Chatbot", "Dummy"])
418
  demo.launch(share=True)
419
-
420
- import time #Import the time module
 
1
  import os
 
2
  import asyncio
3
  import logging
4
  import threading
5
  import queue
6
  import gradio as gr
7
  import httpx
8
+ from typing import Generator, Any, Dict, List, Optional
9
  from functools import lru_cache
10
 
11
  # -------------------- Configuration --------------------
12
+ logging.basicConfig(
13
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
14
+ )
15
 
16
  # -------------------- External Model Call (with Caching and Retry) --------------------
 
17
  async def call_model(prompt: str, model: str = "gpt-4o", api_key: str = None, max_retries: int = 3) -> str:
18
+ """
19
+ Sends a prompt to the OpenAI API endpoint with retries and exponential backoff.
20
+ """
21
  if api_key is None:
22
  api_key = os.getenv("OPENAI_API_KEY")
23
  if api_key is None:
 
25
  url = "https://api.openai.com/v1/chat/completions"
26
  headers = {
27
  "Authorization": f"Bearer {api_key}",
28
+ "Content-Type": "application/json",
 
 
 
 
29
  }
30
+ payload = {"model": model, "messages": [{"role": "user", "content": prompt}]}
31
 
32
  for attempt in range(max_retries):
33
  try:
34
  async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
35
  response = await client.post(url, headers=headers, json=payload)
36
  response.raise_for_status()
37
+ response_json = response.json() # Synchronous parsing is acceptable here
38
  return response_json["choices"][0]["message"]["content"]
39
  except httpx.HTTPStatusError as e:
40
  logging.error(f"HTTP error (attempt {attempt + 1}/{max_retries}): {e}")
41
+ if e.response.status_code in (502, 503, 504):
42
+ await asyncio.sleep(2 ** attempt)
43
  continue
44
  else:
45
+ raise
46
  except httpx.RequestError as e:
47
  logging.error(f"Request error (attempt {attempt + 1}/{max_retries}): {e}")
48
  await asyncio.sleep(2 ** attempt)
49
  continue
50
  except Exception as e:
51
+ logging.error(f"Unexpected error (attempt {attempt+1}/{max_retries}): {e}")
52
  raise
53
  raise Exception(f"Failed to get response from OpenAI API after {max_retries} attempts.")
54
 
 
55
  # -------------------- Shared Context --------------------
56
  class Context:
57
  def __init__(self, original_task: str, optimized_task: Optional[str] = None,
 
76
 
77
  class PromptOptimizerAgent:
78
  async def optimize_prompt(self, context: Context, api_key: str) -> Context:
79
+ """
80
+ Optimizes the user’s original prompt.
81
+ """
82
+ system_prompt = (
83
+ "Improve the prompt. Be clear, specific, and complete. "
84
+ "Keep original intent. Return ONLY the revised prompt."
85
+ )
86
  full_prompt = f"{system_prompt}\n\nUser's prompt:\n{context.original_task}"
87
  optimized = await call_model(full_prompt, model="gpt-4o", api_key=api_key)
88
  context.optimized_task = optimized
 
90
  return context
91
 
92
  class OrchestratorAgent:
93
+ def __init__(self, log_queue: queue.Queue, human_event: threading.Event, human_input_queue: queue.Queue) -> None:
94
  self.log_queue = log_queue
95
+ self.human_event = human_event
96
  self.human_input_queue = human_input_queue
97
 
98
+ async def generate_plan(self, context: Context, api_key: str) -> Context:
99
+ """
100
+ Generates (or revises) a plan using human feedback if necessary.
101
+ Uses an iterative approach instead of recursion.
102
+ """
103
+ while True:
104
+ if context.plan:
105
+ prompt = (
106
+ f"You are a planner. Revise/complete the plan for '{context.original_task}' using feedback:\n"
107
+ f"{context.plan}\n\n"
108
+ "If unsure, output 'REQUEST_HUMAN_FEEDBACK\\n[Question]'"
109
+ )
110
+ else:
111
+ prompt = (
112
+ f"You are a planner. Create a plan for: '{context.optimized_task}'. "
113
+ "Break down the task and assign sub-tasks to: Coder, Code Reviewer, Quality Assurance Tester, and Documentation Agent. "
114
+ "Include review/revision steps, error handling, and documentation instructions.\n\n"
115
+ "If unsure, output 'REQUEST_HUMAN_FEEDBACK\\n[Question]'"
116
+ )
117
+
118
+ plan = await call_model(prompt, model="gpt-4o", api_key=api_key)
119
+ context.add_conversation_entry("Orchestrator", f"Plan:\n{plan}")
120
+
121
+ # Check if human feedback is requested.
122
+ if "REQUEST_HUMAN_FEEDBACK" in plan:
123
+ question = plan.split("REQUEST_HUMAN_FEEDBACK\n", 1)[1].strip()
124
+ self.log_queue.put("[Orchestrator]: Requesting human feedback...")
125
+ self.log_queue.put(f"[Orchestrator]: Question for human: {question}")
126
+
127
+ # Prepare feedback context and trigger the human feedback event.
128
+ feedback_request_context = (
129
+ f"The orchestrator agent is requesting feedback on the following task:\n"
130
+ f"**{context.optimized_task}**\n\n"
131
+ f"Current plan:\n**{context.plan or 'None'}**\n\n"
132
+ f"Question:\n**{question}**"
133
+ )
134
+ self.human_event.set()
135
+ # Pass the context to the human input handler.
136
+ self.human_input_queue.put(feedback_request_context)
137
+ human_response = self.human_input_queue.get() # Blocking call for human response.
138
+ self.human_event.clear()
139
+
140
+ self.log_queue.put(f"[Orchestrator]: Received human feedback: {human_response}")
141
+ # Incorporate human feedback into the plan and loop again.
142
+ context.plan = context.plan + "\n" + human_response if context.plan else human_response
143
+ else:
144
+ context.plan = plan
145
+ break # Exit loop when no feedback is requested.
146
+ return context
147
 
148
  class CoderAgent:
149
  async def generate_code(self, context: Context, api_key: str, model: str = "gpt-4o") -> Context:
150
+ """
151
+ Generates code based on the provided plan.
152
+ """
153
  prompt = (
154
  "You are a coding agent. Output ONLY the code. "
155
+ "Adhere to best practices and include error handling.\n\n"
156
  f"Instructions:\n{context.plan}"
157
  )
158
  code = await call_model(prompt, model=model, api_key=api_key)
 
162
 
163
  class CodeReviewerAgent:
164
  async def review_code(self, context: Context, api_key: str) -> Context:
165
+ """
166
+ Reviews the generated code and returns either actionable feedback or 'APPROVE'.
167
+ """
168
  prompt = (
169
+ "You are a code reviewer. Provide CONCISE feedback focusing on correctness, efficiency, readability, error handling, and security. "
170
+ "If the code is acceptable, respond with ONLY 'APPROVE'. Do NOT generate code.\n\n"
 
 
171
  f"Task: {context.optimized_task}\n\nCode:\n{context.code}"
172
  )
173
  review = await call_model(prompt, model="gpt-4o", api_key=api_key)
174
  context.add_conversation_entry("Code Reviewer", f"Review:\n{review}")
175
 
176
+ # Check for approval; if not approved, parse feedback.
177
  if "APPROVE" not in review.upper():
178
+ structured_review = {"comments": []}
179
+ for line in review.splitlines():
180
+ if line.strip():
181
+ structured_review["comments"].append({
182
+ "issue": line.strip(),
183
+ "line_number": "N/A",
184
+ "severity": "Medium"
185
+ })
186
+ context.review_comments.append(structured_review)
187
  return context
188
 
189
  class QualityAssuranceTesterAgent:
190
  async def generate_test_cases(self, context: Context, api_key: str) -> Context:
191
+ """
192
+ Generates test cases considering edge and error cases.
193
+ """
194
  prompt = (
195
+ "You are a testing agent. Generate comprehensive test cases considering edge cases and error scenarios. "
196
+ "Output in a clear format.\n\n"
197
  f"Task: {context.optimized_task}\n\nCode:\n{context.code}"
198
  )
199
  test_cases = await call_model(prompt, model="gpt-4o", api_key=api_key)
 
202
  return context
203
 
204
  async def run_tests(self, context: Context, api_key: str) -> Context:
205
+ """
206
+ Runs the generated test cases and compares expected vs. actual outcomes.
207
+ """
208
  prompt = (
209
+ "Run the test cases. Compare actual vs expected outputs and state any discrepancies. "
210
+ "If all tests pass, output 'TESTS PASSED'.\n\n"
211
  f"Code:\n{context.code}\n\nTest Cases:\n{context.test_cases}"
212
  )
213
  test_results = await call_model(prompt, model="gpt-4o", api_key=api_key)
 
217
 
218
  class DocumentationAgent:
219
  async def generate_documentation(self, context: Context, api_key: str) -> Context:
220
+ """
221
+ Generates concise documentation including a --help message.
222
+ """
223
  prompt = (
224
+ "Generate clear documentation including a brief description, explanation, and a --help message.\n\n"
 
225
  f"Code:\n{context.code}"
226
  )
227
  documentation = await call_model(prompt, model="gpt-4o", api_key=api_key)
 
229
  context.add_conversation_entry("Documentation Agent", f"Documentation:\n{documentation}")
230
  return context
231
 
232
+ # -------------------- Agent Dispatcher --------------------
233
 
234
  class AgentDispatcher:
235
+ def __init__(self, log_queue: queue.Queue, human_event: threading.Event, human_input_queue: queue.Queue):
236
  self.log_queue = log_queue
237
+ self.human_event = human_event
238
  self.human_input_queue = human_input_queue
239
  self.agents = {
240
  "prompt_optimizer": PromptOptimizerAgent(),
241
+ "orchestrator": OrchestratorAgent(log_queue, human_event, human_input_queue),
242
  "coder": CoderAgent(),
243
  "code_reviewer": CodeReviewerAgent(),
244
  "qa_tester": QualityAssuranceTesterAgent(),
 
246
  }
247
 
248
  async def dispatch(self, agent_name: str, context: Context, api_key: str, **kwargs) -> Context:
249
+ """
250
+ Dispatches the task to the specified agent.
251
+ """
252
  agent = self.agents.get(agent_name)
253
  if not agent:
254
  raise ValueError(f"Unknown agent: {agent_name}")
255
 
256
+ self.log_queue.put(f"[{agent_name.replace('_', ' ').title()}]: Starting task...")
257
  if agent_name == "prompt_optimizer":
258
  context = await agent.optimize_prompt(context, api_key)
259
  elif agent_name == "orchestrator":
260
+ context = await agent.generate_plan(context, api_key)
261
  elif agent_name == "coder":
262
  context = await agent.generate_code(context, api_key, **kwargs)
263
  elif agent_name == "code_reviewer":
 
270
  elif agent_name == "documentation_agent":
271
  context = await agent.generate_documentation(context, api_key)
272
  else:
273
+ raise ValueError(f"Unknown Agent Name: {agent_name}")
 
274
  return context
275
+
276
+ async def determine_next_agent(self, context: Context, api_key: str) -> str:
277
+ """
278
+ Determines the next agent to run based on the current context.
279
+ """
280
  if not context.optimized_task:
281
  return "prompt_optimizer"
282
  if not context.plan:
283
  return "orchestrator"
284
  if not context.code:
285
  return "coder"
286
+ # Check if any review comment lacks an APPROVE.
287
+ if not any(
288
+ "APPROVE" in comment.get("issue", "").upper()
289
+ for review in context.review_comments
290
+ for comment in review.get("comments", [])
291
+ ):
292
  return "code_reviewer"
293
  if not context.test_cases:
294
  return "qa_tester"
295
+ if not context.test_results or "TESTS PASSED" not in context.test_results.upper():
296
  return "qa_tester"
297
  if not context.documentation:
298
  return "documentation_agent"
299
 
300
  return "done" # All tasks are complete
301
 
302
+ # -------------------- Multi-Agent Conversation --------------------
303
+
304
+ async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str,
305
+ human_event: threading.Event, human_input_queue: queue.Queue) -> None:
306
  """
307
+ Orchestrates the multi-agent conversation.
308
  """
309
  context = Context(original_task=task_message)
310
+ dispatcher = AgentDispatcher(log_queue, human_event, human_input_queue)
311
 
312
  next_agent = await dispatcher.determine_next_agent(context, api_key)
313
+ # Prevent endless revisions by tracking coder iterations.
314
+ coder_iterations = 0
315
+
316
  while next_agent != "done":
317
  if next_agent == "qa_tester":
318
  if not context.test_cases:
319
+ context = await dispatcher.dispatch(next_agent, context, api_key, generate_tests=True)
320
  else:
321
+ context = await dispatcher.dispatch(next_agent, context, api_key, run_tests=True)
322
  elif next_agent == "coder" and (context.review_comments or context.test_results):
323
+ coder_iterations += 1
324
+ # Switch to a different model after the first iteration.
325
+ context = await dispatcher.dispatch(next_agent, context, api_key, model="gpt-3.5-turbo-16k")
326
+ else:
327
+ context = await dispatcher.dispatch(next_agent, context, api_key)
328
+
329
+ # Check for approval in code review if applicable.
330
+ if next_agent == "code_reviewer":
331
+ approved = any(
332
+ "APPROVE" in comment.get("issue", "").upper()
333
+ for review in context.review_comments
334
+ for comment in review.get("comments", [])
335
+ )
336
+ if not approved:
337
+ # If not approved, we continue with coder to improve the code.
338
+ next_agent = "coder"
339
+ else:
340
+ next_agent = await dispatcher.determine_next_agent(context, api_key)
341
  else:
342
+ next_agent = await dispatcher.determine_next_agent(context, api_key)
343
 
344
+ if next_agent == "coder" and coder_iterations > 5:
345
+ log_queue.put("Maximum revision iterations reached. Exiting.")
346
+ break
 
 
 
 
347
 
348
  log_queue.put("Conversation complete.")
349
  log_queue.put(("result", context.conversation_history))
350
 
351
  # -------------------- Process Generator and Human Input --------------------
352
+
353
+ def process_conversation_generator(task_message: str, api_key: str,
354
+ human_event: threading.Event, human_input_queue: queue.Queue,
355
+ log_queue: queue.Queue) -> Generator[str, None, None]:
356
  """
357
+ Runs the conversation and yields log messages.
 
358
  """
359
+ # Run the conversation asynchronously.
360
+ asyncio.run(multi_agent_conversation(task_message, log_queue, api_key, human_event, human_input_queue))
361
 
 
 
 
 
362
  final_result = None
363
+ while True:
364
  try:
365
+ msg = log_queue.get_nowait()
366
  if isinstance(msg, tuple) and msg[0] == "result":
367
  final_result = msg[1]
368
+ yield gr.Chatbot.update(value=final_result, visible=True)
369
+ yield "Conversation complete."
370
+ break
371
  else:
372
+ yield msg
373
  except queue.Empty:
374
+ pass
 
375
 
376
+ # If human feedback is requested, yield an appropriate message.
377
+ if human_event.is_set():
378
+ yield "Waiting for human feedback..."
379
+ # Use a short asynchronous sleep to avoid busy-waiting.
380
+ asyncio.run(asyncio.sleep(0.1))
 
 
 
 
 
 
381
 
382
+ def get_human_feedback(placeholder_text: str, human_input_queue: queue.Queue) -> gr.Blocks:
383
+ """
384
+ Constructs the Gradio interface to collect human feedback.
385
+ """
 
 
 
 
 
 
 
386
  with gr.Blocks() as human_feedback_interface:
387
  with gr.Row():
388
  human_input = gr.Textbox(lines=4, label="Human Feedback", placeholder=placeholder_text)
389
  with gr.Row():
390
  submit_button = gr.Button("Submit Feedback")
391
 
392
+ def submit_feedback(input_text: str):
 
393
  human_input_queue.put(input_text)
394
+ return ""
 
395
 
396
  submit_button.click(fn=submit_feedback, inputs=human_input, outputs=human_input)
 
 
397
  return human_feedback_interface
398
 
399
  # -------------------- Chat Function for Gradio --------------------
400
 
401
+ def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[Any, None, None]:
402
+ """
403
+ Gradio chat function that runs the multi-agent conversation.
404
+ """
405
  if not openai_api_key:
406
  openai_api_key = os.getenv("OPENAI_API_KEY")
407
  if not openai_api_key:
408
  yield "Error: API key not provided."
409
  return
410
 
411
+ human_event = threading.Event()
412
+ human_input_queue = queue.Queue()
413
+ log_queue = queue.Queue()
 
 
 
 
414
 
415
+ yield from process_conversation_generator(message, openai_api_key, human_event, human_input_queue, log_queue)
416
 
417
  # -------------------- Launch the Chatbot --------------------
418
 
 
419
  iface = gr.ChatInterface(
420
  fn=multi_agent_chat,
421
+ chatbot=gr.Chatbot(type="messages"),
422
+ additional_inputs=[
423
+ gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")
424
+ ],
425
  title="Multi-Agent Task Solver with Human-in-the-Loop",
426
+ description=(
427
+ "- Collaborative workflow with Human-in-the-Loop.\n"
428
+ "- Orchestrator can ask for human feedback.\n"
429
+ "- Enter a task; agents will work on it. You may be prompted for input.\n"
430
+ "- Max 5 revisions.\n"
431
+ "- Provide API Key."
432
+ )
433
  )
434
 
435
+ # Dummy interface to prevent Gradio errors.
436
+ dummy_iface = gr.Interface(lambda x: x, "textbox", "textbox")
437
 
438
  if __name__ == "__main__":
439
  demo = gr.TabbedInterface([iface, dummy_iface], ["Chatbot", "Dummy"])
440
  demo.launch(share=True)