from haystack import Pipeline from haystack.components.builders import PromptBuilder from haystack.components.generators.openai import OpenAIGenerator from haystack.components.routers import ConditionalRouter from functions import SQLiteQuery from typing import List import sqlite3 import os from getpass import getpass from dotenv import load_dotenv load_dotenv() if "OPENAI_API_KEY" not in os.environ: os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:") from haystack.components.builders import PromptBuilder from haystack.components.generators import OpenAIGenerator llm = OpenAIGenerator(model="gpt-4o") def rag_pipeline_func(queries: str, session_hash): sql_query = SQLiteQuery(f'data_source_{session_hash}.db') connection = sqlite3.connect(f'data_source_{session_hash}.db') cur=connection.execute('select * from data_source') columns = [i[0] for i in cur.description] cur.close() #Rag Pipeline prompt = PromptBuilder(template="""Please generate an SQL query. The query should answer the following Question: {{question}}; If the question cannot be answered given the provided table and columns, return 'no_answer' The query is to be answered for the table is called 'data_source' with the following Columns: {{columns}}; Answer:""") routes = [ { "condition": "{{'no_answer' not in replies[0]}}", "output": "{{replies}}", "output_name": "sql", "output_type": List[str], }, { "condition": "{{'no_answer' in replies[0]}}", "output": "{{question}}", "output_name": "go_to_fallback", "output_type": str, }, ] router = ConditionalRouter(routes) fallback_prompt = PromptBuilder(template="""User entered a query that cannot be answered with the given table. The query was: {{question}} and the table had columns: {{columns}}. Let the user know why the question cannot be answered""") fallback_llm = OpenAIGenerator(model="gpt-4") conditional_sql_pipeline = Pipeline() conditional_sql_pipeline.add_component("prompt", prompt) conditional_sql_pipeline.add_component("llm", llm) conditional_sql_pipeline.add_component("router", router) conditional_sql_pipeline.add_component("fallback_prompt", fallback_prompt) conditional_sql_pipeline.add_component("fallback_llm", fallback_llm) conditional_sql_pipeline.add_component("sql_querier", sql_query) conditional_sql_pipeline.connect("prompt", "llm") conditional_sql_pipeline.connect("llm.replies", "router.replies") conditional_sql_pipeline.connect("router.sql", "sql_querier.queries") conditional_sql_pipeline.connect("router.go_to_fallback", "fallback_prompt.question") conditional_sql_pipeline.connect("fallback_prompt", "fallback_llm") print("RAG PIPELINE FUNCTION") result = conditional_sql_pipeline.run({"prompt": {"question": queries, "columns": columns}, "router": {"question": queries}, "fallback_prompt": {"columns": columns}}) if 'sql_querier' in result: reply = result['sql_querier']['results'][0] elif 'fallback_llm' in result: reply = result['fallback_llm']['replies'][0] else: reply = result["llm"]["replies"][0] print("reply content") print(reply.content) return {"reply": reply.content}