File size: 5,070 Bytes
4664994 2379f57 4664994 2379f57 4664994 2379f57 4664994 2379f57 4664994 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
from typing import Annotated, Any, Literal
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.types import interrupt, Command
from typing_extensions import TypedDict
"""
from langchain_anthropic import ChatAnthropic
from langchain_ollama.llms import OllamaLLM
from langchain_experimental.llms.ollama_functions import OllamaFunctions
llm = OllamaFunctions(model="qwen2.5", format="json")
llm_with_tools = llm #.bind_tools(tools)
"""
from langchain_groq import ChatGroq
llm = ChatGroq(
model="gemma2-9b-it", #"llama-3.1-8b-instant",
temperature=0.4,
max_tokens=None,
timeout=None,
max_retries=2,
# other params...
)
template = """Question: {question}
Answer: Let's think step by step."""
prompt = ChatPromptTemplate.from_template(template)
# model = OllamaLLM(model="deepseek-r1")
chain = prompt | llm
# print(chain.invoke({"question": "Explain like I'm 5 for capacity planning?"}))
@tool
def human_assistance(query: str) -> str:
"""Request assistance from a human."""
human_response = interrupt({"query": query})
return human_response["data"]
tool = TavilySearchResults(max_results=2)
tools = [tool, human_assistance]
llm_with_tools=llm.bind_tools(tools)
# llm = OllamaLLM(model="deepseek-r1") #ChatAnthropic(model="claude-3-5-sonnet-20240620")
class State(TypedDict):
messages: Annotated[list, add_messages]
persona: str
email: str
release: Literal['approve', 'reject']
graph_builder = StateGraph(State)
def write_email(state: State):
prompt = f"""Write an promotional personalized email for this persona and offer financial education and setup a meeting for financial advisor, Only the email nothing else:
{state["persona"]}
"""
email = llm_with_tools.invoke(prompt)
# Because we will be interrupting during tool execution,
# we disable parallel tool calling to avoid repeating any
# tool invocations when we resume.
# assert len(email.tool_calls) <= 1
return Command(update={"email": email.content})
graph_builder.add_node("write_email", write_email)
def delivery(state: State):
print(f"""Delivering: {state['email']}""")
return Command(update={"messages": ["Email delivered to customer"]})
graph_builder.add_node("delivery", delivery)
def human_approval(state: State) -> Command[Literal["delivery", END]]:
is_approved = interrupt(
"Approval for release the promotional email to customer? (type: approved or rejected):"
)
if is_approved == "approved":
return Command(goto="delivery", update={"release": "approved"})
else:
return Command(goto=END, update={"release": "rejected"})
# Add the node to the graph in an appropriate location
# and connect it to the relevant nodes.
graph_builder.add_node("human_approval", human_approval)
graph_builder.add_edge(START, "write_email")
graph_builder.add_edge("write_email", "human_approval")
graph_builder.add_edge("delivery", END)
checkpointer = MemorySaver()
graph = graph_builder.compile(checkpointer=checkpointer)
def email(persona, campaign, history):
thread_config = {"configurable": {"thread_id": campaign}}
for event in graph.stream({"persona": persona}, config=thread_config):
for value in event.values():
return r"Assistant: ", value, r"Value: ", graph.get_state(thread_config).values
def feedback(deliver, campaign, history):
thread_config = {"configurable": {"thread_id": campaign}}
for event in graph.stream(Command(resume=deliver), config=thread_config):
for value in event.values():
return r"Assistant: ", value, r"Value: ", graph.get_state(thread_config).values
'''
from IPython.display import Image, display
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# This requires some extra dependencies and is optional
pass
'''
def campaign(user_input: Any, id: str):
thread_config = {"configurable": {"thread_id": id}}
for event in graph.stream(user_input, config=thread_config):
for value in event.values():
print("Assistant:", value, "Value: ", graph.get_state(thread_config).values)
"""
campaign({"persona": "My mortgage rate is 9%, I cannot afford it anymore, I need to refinance and I'm unemploy right now."}, "MOR")
campaign({"persona": "my credit card limit is too low, I need a card with bigger limit and low fee"}, "CARD")
campaign(Command(resume="approved"), "MOR")
"""
while False:
try:
user_input = input("User: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
campaign(user_input, "MORT")
# stream_graph_updates(user_input)
except Exception as e:
# fallback if input() is not available
user_input = "What do you know about LangGraph?"
print("User: " + user_input)
campaign(user_input, "MORT")
# stream_graph_updates(user_input)
break
|