|
import os |
|
import sqlite3 |
|
import requests |
|
import openai |
|
import gradio as gr |
|
import asyncio |
|
from gtts import gTTS |
|
from typing_extensions import TypedDict |
|
from langgraph.graph import StateGraph, START, END |
|
import csv |
|
|
|
openai.api_key = os.getenv("OPENAI_API_KEY") |
|
|
|
def init_db_from_csv(csv_path: str = "transactions.csv") -> None: |
|
conn = sqlite3.connect("shop.db") |
|
cur = conn.cursor() |
|
cur.execute( |
|
"CREATE TABLE IF NOT EXISTS transactions (date TEXT, product TEXT, amount REAL)" |
|
) |
|
with open(csv_path, newline='') as f: |
|
reader = csv.DictReader(f) |
|
rows = [(row["date"], row["product"], float(row["amount"])) for row in reader] |
|
cur.execute("DELETE FROM transactions") |
|
cur.executemany( |
|
"INSERT INTO transactions (date, product, amount) VALUES (?, ?, ?)", rows |
|
) |
|
conn.commit() |
|
conn.close() |
|
|
|
init_db_from_csv() |
|
|
|
def db_agent(query: str) -> str: |
|
try: |
|
conn = sqlite3.connect("shop.db") |
|
cur = conn.cursor() |
|
cur.execute( |
|
""" |
|
SELECT product, SUM(amount) AS revenue |
|
FROM transactions |
|
WHERE date = date('now') |
|
GROUP BY product |
|
ORDER BY revenue DESC |
|
LIMIT 1 |
|
""" |
|
) |
|
row = cur.fetchone() |
|
if row: |
|
return f"Top product today: {row[0]} with ₹{row[1]:,.2f}" |
|
return "No transactions found for today." |
|
except sqlite3.OperationalError as e: |
|
return f"Database error: {e}. Please check 'transactions' table in shop.db." |
|
|
|
def web_search_agent(query: str) -> str: |
|
try: |
|
resp = requests.get( |
|
"https://serpapi.com/search", |
|
params={"q": query, "api_key": os.getenv("SERPAPI_KEY")} |
|
) |
|
snippet = resp.json().get("organic_results", [{}])[0].get("snippet", "").strip() |
|
if snippet: |
|
return llm_agent(f"Summarize: {snippet}") |
|
except Exception: |
|
pass |
|
return llm_agent(query) |
|
|
|
def llm_agent(query: str) -> str: |
|
response = openai.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[ |
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
{"role": "user", "content": query}, |
|
], |
|
temperature=0.2, |
|
) |
|
return response.choices[0].message.content.strip() |
|
|
|
def stt_agent(audio_path: str) -> str: |
|
with open(audio_path, "rb") as afile: |
|
transcript = openai.audio.transcriptions.create( |
|
model="whisper-1", |
|
file=afile |
|
) |
|
return transcript.text.strip() |
|
|
|
def tts_agent(text: str, lang: str = 'en') -> str: |
|
tts = gTTS(text=text, lang=lang) |
|
out_path = "response_audio.mp3" |
|
tts.save(out_path) |
|
return out_path |
|
|
|
class State(TypedDict): |
|
query: str |
|
result: str |
|
|
|
def route_fn(state: State) -> str: |
|
q = state["query"].lower() |
|
if any(k in q for k in ["max revenue", "revenue"]): |
|
return "db" |
|
if any(k in q for k in ["who", "what", "when", "where"]): |
|
return "web" |
|
return "llm" |
|
|
|
def router_node(state: State) -> dict: |
|
return {"query": state["query"]} |
|
|
|
def db_node(state: State) -> dict: |
|
return {"result": db_agent(state["query"]) } |
|
|
|
def web_node(state: State) -> dict: |
|
return {"result": web_search_agent(state["query"]) } |
|
|
|
def llm_node(state: State) -> dict: |
|
return {"result": llm_agent(state["query"]) } |
|
|
|
builder = StateGraph(State) |
|
builder.add_node("router", router_node) |
|
builder.set_entry_point("router") |
|
builder.set_conditional_entry_point( |
|
route_fn, |
|
path_map={"db": "db", "web": "web", "llm": "llm"} |
|
) |
|
builder.add_node("db", db_node) |
|
builder.add_node("web", web_node) |
|
builder.add_node("llm", llm_node) |
|
builder.add_edge(START, "router") |
|
builder.add_edge("db", END) |
|
builder.add_edge("web", END) |
|
builder.add_edge("llm", END) |
|
graph = builder.compile() |
|
|
|
def handle_query(audio_or_text: str): |
|
is_audio = audio_or_text.endswith('.wav') or audio_or_text.endswith('.mp3') |
|
if is_audio: |
|
query = stt_agent(audio_or_text) |
|
else: |
|
query = audio_or_text |
|
|
|
state = graph.invoke({"query": query}) |
|
response = state["result"] |
|
|
|
if is_audio: |
|
audio_path = tts_agent(response) |
|
return response, audio_path |
|
return response |
|
|
|
with gr.Blocks() as demo: |
|
|
|
gr.Markdown( |
|
""" |
|
**Shop Voice-Box Assistant Demo!** |
|
|
|
**Usage Instructions:** |
|
- Speak into your microphone or upload transactions.csv for data queries. |
|
- Sample questions you can ask: |
|
- What is the max revenue product today? |
|
- Who invented the light bulb? |
|
- Tell me a joke about cats. |
|
""" |
|
) |
|
inp = gr.Audio(sources=["microphone"], type="filepath", label="Speak or type your question") |
|
out_text = gr.Textbox(label="Answer (text)") |
|
out_audio = gr.Audio(label="Answer (speech)") |
|
submit = gr.Button("Submit") |
|
submit.click(fn=handle_query, inputs=inp, outputs=[out_text, out_audio]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch(share=False, server_name="0.0.0.0", server_port=7860) |
|
|