File size: 5,785 Bytes
b12e5fe 0729c66 b12e5fe 52ec688 7598edf 6219b13 52ec688 0729c66 b12e5fe 6219b13 0729c66 dd5aa4f 7598edf dd5aa4f 6219b13 0729c66 7b770ed 52ec688 7b770ed b12e5fe 52ec688 dd5aa4f 0729c66 52ec688 0729c66 dd5aa4f 0729c66 dd5aa4f b12e5fe 7598edf 52ec688 7598edf 52ec688 7598edf b12e5fe 7598edf 52ec688 7598edf 0729c66 7cf958b 6e84209 7cf958b 6e84209 e8bcf17 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
import os
import sqlite3
import requests
import openai
import gradio as gr
import asyncio
from gtts import gTTS
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
import csv
openai.api_key = os.getenv("OPENAI_API_KEY")
def init_db_from_csv(csv_path: str = "transactions.csv") -> None:
conn = sqlite3.connect("shop.db")
cur = conn.cursor()
cur.execute(
"CREATE TABLE IF NOT EXISTS transactions (date TEXT, product TEXT, amount REAL)"
)
with open(csv_path, newline='') as f:
reader = csv.DictReader(f)
rows = [(row["date"], row["product"], float(row["amount"])) for row in reader]
cur.execute("DELETE FROM transactions")
cur.executemany(
"INSERT INTO transactions (date, product, amount) VALUES (?, ?, ?)", rows
)
conn.commit()
conn.close()
init_db_from_csv()
def db_agent(query: str) -> str:
try:
conn = sqlite3.connect("shop.db")
cur = conn.cursor()
cur.execute(
"""
SELECT product, SUM(amount) AS revenue
FROM transactions
WHERE date = date('now')
GROUP BY product
ORDER BY revenue DESC
LIMIT 1
"""
)
row = cur.fetchone()
if row:
return f"Top product today: {row[0]} with ₹{row[1]:,.2f}"
return "No transactions found for today."
except sqlite3.OperationalError as e:
return f"Database error: {e}. Please check 'transactions' table in shop.db."
def web_search_agent(query: str) -> str:
try:
resp = requests.get(
"https://serpapi.com/search",
params={"q": query, "api_key": os.getenv("SERPAPI_KEY")}
)
snippet = resp.json().get("organic_results", [{}])[0].get("snippet", "").strip()
if snippet:
return llm_agent(f"Summarize: {snippet}")
except Exception:
pass
return llm_agent(query)
def llm_agent(query: str) -> str:
response = openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": query},
],
temperature=0.2,
)
return response.choices[0].message.content.strip()
def stt_agent(audio_path: str) -> str:
with open(audio_path, "rb") as afile:
transcript = openai.audio.transcriptions.create(
model="whisper-1",
file=afile
)
return transcript.text.strip()
def tts_agent(text: str, lang: str = 'en') -> str:
tts = gTTS(text=text, lang=lang)
out_path = "response_audio.mp3"
tts.save(out_path)
return out_path
class State(TypedDict):
query: str
result: str
def route_fn(state: State) -> str:
q = state["query"].lower()
if any(k in q for k in ["max revenue", "revenue"]):
return "db"
if any(k in q for k in ["who", "what", "when", "where"]):
return "web"
return "llm"
def router_node(state: State) -> dict:
return {"query": state["query"]}
def db_node(state: State) -> dict:
return {"result": db_agent(state["query"]) }
def web_node(state: State) -> dict:
return {"result": web_search_agent(state["query"]) }
def llm_node(state: State) -> dict:
return {"result": llm_agent(state["query"]) }
builder = StateGraph(State)
builder.add_node("router", router_node)
builder.set_entry_point("router")
builder.set_conditional_entry_point(
route_fn,
path_map={"db": "db", "web": "web", "llm": "llm"}
)
builder.add_node("db", db_node)
builder.add_node("web", web_node)
builder.add_node("llm", llm_node)
builder.add_edge(START, "router")
builder.add_edge("db", END)
builder.add_edge("web", END)
builder.add_edge("llm", END)
graph = builder.compile()
def handle_query(audio_or_text: str):
is_audio = audio_or_text.endswith('.wav') or audio_or_text.endswith('.mp3')
if is_audio:
query = stt_agent(audio_or_text)
else:
query = audio_or_text
state = graph.invoke({"query": query})
response = state["result"]
if is_audio:
audio_path = tts_agent(response)
return response, audio_path
return response
with gr.Blocks() as demo:
gr.Markdown(
"""
**Shop Voice-Box Assistant Demo!**
**Usage Instructions:**
- Speak into your microphone or upload transactions.csv for data queries.
- Sample questions you can ask:
- What is the max revenue product today?
- Who invented the light bulb?
- Tell me a joke about cats.
"""
)
inp = gr.Audio(sources=["microphone"], type="filepath", label="Speak or type your question")
out_text = gr.Textbox(label="Answer (text)")
out_audio = gr.Audio(label="Answer (speech)")
submit = gr.Button("Submit")
submit.click(fn=handle_query, inputs=inp, outputs=[out_text, out_audio])
# with gr.Blocks() as demo:
# gr.Markdown("## Shop Voice-Box Assistant (Speech In/Out)")
# inp = gr.Audio(sources=["microphone"], type="filepath", label="Speak or type your question or upload transactions.csv separately in root")
# out_text = gr.Textbox(label="Answer (text)")
# out_audio = gr.Audio(label="Answer (speech)")
# submit = gr.Button("Submit")
# gr.Examples(
# examples=[
# ["What is the max revenue product today?"],
# ["Who invented the light bulb?"],
# ["Tell me a joke about cats."],
# ],
# inputs=inp,
# outputs=[out_text, out_audio],
# )
# submit.click(fn=handle_query, inputs=inp, outputs=[out_text, out_audio])
if __name__ == "__main__":
demo.launch(share=False, server_name="0.0.0.0", server_port=7860)
|