|
from __future__ import annotations |
|
|
|
import json |
|
|
|
from pydantic import BaseModel, Field |
|
|
|
from metagpt.actions.di.ask_review import AskReview, ReviewConst |
|
from metagpt.actions.di.write_plan import ( |
|
WritePlan, |
|
precheck_update_plan_from_rsp, |
|
update_plan_from_rsp, |
|
) |
|
from metagpt.logs import logger |
|
from metagpt.memory import Memory |
|
from metagpt.schema import Message, Plan, Task, TaskResult |
|
from metagpt.strategy.task_type import TaskType |
|
from metagpt.utils.common import remove_comments |
|
|
|
STRUCTURAL_CONTEXT = """ |
|
## User Requirement |
|
{user_requirement} |
|
## Context |
|
{context} |
|
## Current Plan |
|
{tasks} |
|
## Current Task |
|
{current_task} |
|
""" |
|
|
|
PLAN_STATUS = """ |
|
## Finished Tasks |
|
### code |
|
```python |
|
{code_written} |
|
``` |
|
|
|
### execution result |
|
{task_results} |
|
|
|
## Current Task |
|
{current_task} |
|
|
|
## Task Guidance |
|
Write complete code for 'Current Task'. And avoid duplicating code from 'Finished Tasks', such as repeated import of packages, reading data, etc. |
|
Specifically, {guidance} |
|
""" |
|
|
|
|
|
class Planner(BaseModel): |
|
plan: Plan |
|
working_memory: Memory = Field( |
|
default_factory=Memory |
|
) |
|
auto_run: bool = False |
|
|
|
def __init__(self, goal: str = "", plan: Plan = None, **kwargs): |
|
plan = plan or Plan(goal=goal) |
|
super().__init__(plan=plan, **kwargs) |
|
|
|
@property |
|
def current_task(self): |
|
return self.plan.current_task |
|
|
|
@property |
|
def current_task_id(self): |
|
return self.plan.current_task_id |
|
|
|
async def update_plan(self, goal: str = "", max_tasks: int = 3, max_retries: int = 3): |
|
if goal: |
|
self.plan = Plan(goal=goal) |
|
|
|
plan_confirmed = False |
|
while not plan_confirmed: |
|
context = self.get_useful_memories() |
|
rsp = await WritePlan().run(context, max_tasks=max_tasks) |
|
self.working_memory.add(Message(content=rsp, role="assistant", cause_by=WritePlan)) |
|
|
|
|
|
is_plan_valid, error = precheck_update_plan_from_rsp(rsp, self.plan) |
|
if not is_plan_valid and max_retries > 0: |
|
error_msg = f"The generated plan is not valid with error: {error}, try regenerating, remember to generate either the whole plan or the single changed task only" |
|
logger.warning(error_msg) |
|
self.working_memory.add(Message(content=error_msg, role="assistant", cause_by=WritePlan)) |
|
max_retries -= 1 |
|
continue |
|
|
|
_, plan_confirmed = await self.ask_review(trigger=ReviewConst.TASK_REVIEW_TRIGGER) |
|
|
|
update_plan_from_rsp(rsp=rsp, current_plan=self.plan) |
|
|
|
self.working_memory.clear() |
|
|
|
async def process_task_result(self, task_result: TaskResult): |
|
|
|
review, task_result_confirmed = await self.ask_review(task_result) |
|
|
|
if task_result_confirmed: |
|
|
|
await self.confirm_task(self.current_task, task_result, review) |
|
|
|
elif "redo" in review: |
|
|
|
|
|
pass |
|
|
|
else: |
|
|
|
await self.update_plan() |
|
|
|
async def ask_review( |
|
self, |
|
task_result: TaskResult = None, |
|
auto_run: bool = None, |
|
trigger: str = ReviewConst.TASK_REVIEW_TRIGGER, |
|
review_context_len: int = 5, |
|
): |
|
""" |
|
Ask to review the task result, reviewer needs to provide confirmation or request change. |
|
If human confirms the task result, then we deem the task completed, regardless of whether the code run succeeds; |
|
if auto mode, then the code run has to succeed for the task to be considered completed. |
|
""" |
|
auto_run = auto_run or self.auto_run |
|
if not auto_run: |
|
context = self.get_useful_memories() |
|
review, confirmed = await AskReview().run( |
|
context=context[-review_context_len:], plan=self.plan, trigger=trigger |
|
) |
|
if not confirmed: |
|
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) |
|
return review, confirmed |
|
confirmed = task_result.is_success if task_result else True |
|
return "", confirmed |
|
|
|
async def confirm_task(self, task: Task, task_result: TaskResult, review: str): |
|
task.update_task_result(task_result=task_result) |
|
self.plan.finish_current_task() |
|
self.working_memory.clear() |
|
|
|
confirmed_and_more = ( |
|
ReviewConst.CONTINUE_WORDS[0] in review.lower() and review.lower() not in ReviewConst.CONTINUE_WORDS[0] |
|
) |
|
if confirmed_and_more: |
|
self.working_memory.add(Message(content=review, role="user", cause_by=AskReview)) |
|
await self.update_plan() |
|
|
|
def get_useful_memories(self, task_exclude_field=None) -> list[Message]: |
|
"""find useful memories only to reduce context length and improve performance""" |
|
user_requirement = self.plan.goal |
|
context = self.plan.context |
|
tasks = [task.dict(exclude=task_exclude_field) for task in self.plan.tasks] |
|
tasks = json.dumps(tasks, indent=4, ensure_ascii=False) |
|
current_task = self.plan.current_task.json() if self.plan.current_task else {} |
|
context = STRUCTURAL_CONTEXT.format( |
|
user_requirement=user_requirement, context=context, tasks=tasks, current_task=current_task |
|
) |
|
context_msg = [Message(content=context, role="user")] |
|
|
|
return context_msg + self.working_memory.get() |
|
|
|
def get_plan_status(self) -> str: |
|
|
|
finished_tasks = self.plan.get_finished_tasks() |
|
code_written = [remove_comments(task.code) for task in finished_tasks] |
|
code_written = "\n\n".join(code_written) |
|
task_results = [task.result for task in finished_tasks] |
|
task_results = "\n\n".join(task_results) |
|
task_type_name = self.current_task.task_type |
|
task_type = TaskType.get_type(task_type_name) |
|
guidance = task_type.guidance if task_type else "" |
|
|
|
|
|
prompt = PLAN_STATUS.format( |
|
code_written=code_written, |
|
task_results=task_results, |
|
current_task=self.current_task.instruction, |
|
guidance=guidance, |
|
) |
|
|
|
return prompt |
|
|