Spaces:
Running
Running
File size: 3,202 Bytes
372531f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
from gpt_researcher import GPTResearcher
from colorama import Fore, Style
from .utils.views import print_agent_output
class ResearchAgent:
def __init__(self, websocket=None, stream_output=None, tone=None, headers=None):
self.websocket = websocket
self.stream_output = stream_output
self.headers = headers or {}
self.tone = tone
async def research(self, query: str, research_report: str = "research_report",
parent_query: str = "", verbose=True, source="web", tone=None, headers=None):
# Initialize the researcher
researcher = GPTResearcher(query=query, report_type=research_report, parent_query=parent_query,
verbose=verbose, report_source=source, tone=tone, websocket=self.websocket, headers=self.headers)
# Conduct research on the given query
await researcher.conduct_research()
# Write the report
report = await researcher.write_report()
return report
async def run_subtopic_research(self, parent_query: str, subtopic: str, verbose: bool = True, source="web", headers=None):
try:
report = await self.research(parent_query=parent_query, query=subtopic,
research_report="subtopic_report", verbose=verbose, source=source, tone=self.tone, headers=None)
except Exception as e:
print(f"{Fore.RED}Error in researching topic {subtopic}: {e}{Style.RESET_ALL}")
report = None
return {subtopic: report}
async def run_initial_research(self, research_state: dict):
task = research_state.get("task")
query = task.get("query")
source = task.get("source", "web")
if self.websocket and self.stream_output:
await self.stream_output("logs", "initial_research", f"Running initial research on the following query: {query}", self.websocket)
else:
print_agent_output(f"Running initial research on the following query: {query}", agent="RESEARCHER")
return {"task": task, "initial_research": await self.research(query=query, verbose=task.get("verbose"),
source=source, tone=self.tone, headers=self.headers)}
async def run_depth_research(self, draft_state: dict):
task = draft_state.get("task")
topic = draft_state.get("topic")
parent_query = task.get("query")
source = task.get("source", "web")
verbose = task.get("verbose")
if self.websocket and self.stream_output:
await self.stream_output("logs", "depth_research", f"Running in depth research on the following report topic: {topic}", self.websocket)
else:
print_agent_output(f"Running in depth research on the following report topic: {topic}", agent="RESEARCHER")
research_draft = await self.run_subtopic_research(parent_query=parent_query, subtopic=topic,
verbose=verbose, source=source, headers=self.headers)
return {"draft": research_draft} |