|
import gradio as gr |
|
import faiss |
|
import json |
|
import numpy as np |
|
import openai |
|
import os |
|
import spacy |
|
from sentence_transformers import CrossEncoder |
|
from dotenv import load_dotenv |
|
|
|
|
|
try: |
|
nlp = spacy.load("en_core_web_sm") |
|
except OSError: |
|
from spacy.cli import download |
|
download("en_core_web_sm") |
|
nlp = spacy.load("en_core_web_sm") |
|
|
|
|
|
load_dotenv() |
|
openai.api_key = os.getenv("OPENAI_API_KEY") |
|
|
|
|
|
index = faiss.read_index("faiss.index") |
|
with open("faiss_metadata.json", "r", encoding="utf-8") as f: |
|
metadata = json.load(f) |
|
|
|
|
|
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2") |
|
|
|
|
|
def get_embedding(text): |
|
response = openai.embeddings.create( |
|
model="text-embedding-ada-002", |
|
input=[text] |
|
) |
|
data = response.dict()["data"] |
|
return np.array(data[0]["embedding"], dtype=np.float32) |
|
|
|
|
|
def extract_keywords_llm(text): |
|
prompt = ( |
|
"Här är några exempel:\n" |
|
"Exempel 1:\n" |
|
"Fråga: \"Hur fungerar BECCS enligt Stockholm Exergi?\"\n" |
|
"Förväntad nyckelordslista: [\"beccs\", \"bio energy\", \"carbon capture\", \"lagring\"]\n\n" |
|
"Exempel 2:\n" |
|
"Fråga: \"Hur ändrar jag postadress på en kund?\"\n" |
|
"Förväntad nyckelordslista: [\"postadress\", \"kund\", \"adressändring\"]\n\n" |
|
"Extrahera de viktigaste nyckelorden från frågan nedan, inkludera även akronymer och facktermer. " |
|
"Returnera svaret som en JSON-lista.\n" |
|
f"Fråga: \"{text}\"" |
|
) |
|
response = openai.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": prompt}], |
|
max_tokens=100, |
|
temperature=0.0, |
|
) |
|
answer_text = response.choices[0].message.content.strip() |
|
try: |
|
keywords = json.loads(answer_text) |
|
if isinstance(keywords, list): |
|
return [kw.strip().lower() for kw in keywords if isinstance(kw, str) and kw.strip()] |
|
else: |
|
return [kw.strip().lower() for kw in answer_text.split(",") if kw.strip()] |
|
except Exception as e: |
|
return [kw.strip().lower() for kw in answer_text.split(",") if kw.strip()] |
|
|
|
|
|
def extract_keywords_spacy(text): |
|
doc = nlp(text) |
|
keywords = set() |
|
for ent in doc.ents: |
|
keywords.add(ent.text.lower()) |
|
for token in doc: |
|
if token.pos_ in ["NOUN", "PROPN", "ADJ"]: |
|
keywords.add(token.text.lower()) |
|
return list(keywords) |
|
|
|
|
|
def extract_keywords_combined(text): |
|
llm_keywords = extract_keywords_llm(text) |
|
spacy_keywords = extract_keywords_spacy(text) |
|
combined = set(llm_keywords) | set(spacy_keywords) |
|
return list(combined) |
|
|
|
|
|
def retrieve_top_k(question, k=20): |
|
embedding = get_embedding(question) |
|
embedding = np.array([embedding]) |
|
distances, indices = index.search(embedding, k) |
|
return [metadata[i] for i in indices[0] if i < len(metadata)] |
|
|
|
|
|
def re_rank_candidates(question, candidates): |
|
keywords = extract_keywords_combined(question) |
|
pair_list = [(question, c["text"]) for c in candidates] |
|
scores = reranker.predict(pair_list) |
|
|
|
modified_scores = [] |
|
for cand, score in zip(candidates, scores): |
|
text_lower = cand["text"].lower() |
|
bonus = 0.0 |
|
for kw in keywords: |
|
|
|
if kw == "beccs" and kw in text_lower: |
|
bonus += 0.3 |
|
elif kw in text_lower: |
|
bonus += 0.1 |
|
modified_scores.append(score + bonus) |
|
|
|
reranked = sorted(zip(candidates, modified_scores), key=lambda x: x[1], reverse=True) |
|
return [cand for cand, _ in reranked[:3]] |
|
|
|
|
|
def build_prompt(question, docs): |
|
context = "" |
|
for doc in docs: |
|
doc_id = doc.get("id", "Okänt-ID") |
|
rubrik = doc.get("rubrik", "") |
|
content = doc.get("text", "") |
|
context += f"\n[Artikel: {doc_id} - {rubrik}]\n{content}\n" |
|
lower_q = question.lower() |
|
additional_instruction = "" |
|
if any(term in lower_q for term in ["steg för steg", "detaljerad", "guide"]): |
|
additional_instruction = "Ge en detaljerad steg-för-steg-guide baserad på informationen ovan." |
|
prompt = ( |
|
"Du är en hjälpsam supportagent hos Stockholm Exergi. " |
|
"Svara endast om du hittar tydlig och direkt information i texterna nedan. " |
|
"Om informationen inte finns, skriv exakt: 'Ingen information finns.'\n\n" |
|
"Viktig instruktion: I ditt svar, ange alltid Offentligt artikelnummer " |
|
"för de artiklar du använde.\n\n" |
|
f"{context}\n" |
|
f"Fråga: {question}\n" |
|
f"{additional_instruction}\n" |
|
"Svar:" |
|
) |
|
return prompt |
|
|
|
|
|
def chat_rag(question, history, user_profile, cache): |
|
lower_q = question.lower().strip() |
|
normalized_q = question.strip().lower() |
|
|
|
|
|
if "mitt namn är" in lower_q: |
|
try: |
|
name = question.split("mitt namn är", 1)[1].strip().split()[0] |
|
except IndexError: |
|
name = "Okänt" |
|
user_profile["name"] = name |
|
answer = f"Ok, jag har sparat att ditt namn är {name}." |
|
history.append({"role": "user", "content": question}) |
|
history.append({"role": "assistant", "content": answer}) |
|
return "", history, user_profile, cache |
|
elif "vad heter jag" in lower_q: |
|
name = user_profile.get("name") |
|
if name: |
|
answer = f"Du heter {name}." |
|
else: |
|
answer = "Jag har inte fått veta ditt namn än. Skriv 'mitt namn är ...' för att uppdatera." |
|
history.append({"role": "user", "content": question}) |
|
history.append({"role": "assistant", "content": answer}) |
|
return "", history, user_profile, cache |
|
|
|
|
|
qualifies_for_cache = any(term in lower_q for term in ["steg för steg", "detaljerad", "guide"]) |
|
if qualifies_for_cache and normalized_q in cache: |
|
cached_answer = cache[normalized_q] |
|
history.append({"role": "user", "content": question}) |
|
history.append({"role": "assistant", "content": cached_answer}) |
|
return "", history, user_profile, cache |
|
|
|
|
|
if qualifies_for_cache and "kan du ge mig svaret steg för steg" in lower_q: |
|
if history and history[-1]["role"] == "assistant": |
|
prev_answer = history[-1]["content"] |
|
new_prompt = ( |
|
"Utgå från detta tidigare svar:\n\n" |
|
f"{prev_answer}\n\n" |
|
"Och ge mig en detaljerad steg-för-steg-guide baserad på informationen ovan." |
|
) |
|
response = openai.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": new_prompt}], |
|
max_tokens=150, |
|
temperature=0.0 |
|
) |
|
answer = response.choices[0].message.content |
|
history.append({"role": "user", "content": question}) |
|
history.append({"role": "assistant", "content": answer}) |
|
cache[normalized_q] = answer |
|
return "", history, user_profile, cache |
|
|
|
|
|
top_docs = retrieve_top_k(question, k=20) |
|
best_docs = re_rank_candidates(question, top_docs) |
|
prompt = build_prompt(question, best_docs) |
|
response = openai.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": prompt}], |
|
max_tokens=300, |
|
temperature=0.0 |
|
) |
|
answer = response.choices[0].message.content |
|
used_ids = [doc.get("id", "Okänt-ID") for doc in best_docs] |
|
unique_ids = set(used_ids) |
|
if unique_ids: |
|
references = ", ".join(unique_ids) |
|
answer += f"\n\n[Information hämtad från Offentligt artikelnummer(n): {references}]" |
|
history.append({"role": "user", "content": question}) |
|
history.append({"role": "assistant", "content": answer}) |
|
if qualifies_for_cache: |
|
cache[normalized_q] = answer |
|
return "", history, user_profile, cache |
|
|
|
|
|
|
|
|
|
PASSWORD = "agrikaremexergi" |
|
|
|
def login(input_password): |
|
if input_password == PASSWORD: |
|
return gr.update(visible=True), "Inloggning lyckades!" |
|
else: |
|
return gr.update(visible=False), "Fel lösenord, försök igen." |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
|
with gr.Column(visible=True) as login_panel: |
|
gr.Markdown("## Logga in") |
|
password_input = gr.Textbox(label="Ange lösenord", type="password") |
|
login_button = gr.Button("Logga in") |
|
login_message = gr.Markdown("") |
|
|
|
|
|
with gr.Column(visible=False) as main_app_panel: |
|
gr.Markdown("## 💬 OpenAI RAG-Chat med FAISS + Re-Ranking") |
|
chatbot = gr.Chatbot(label="RAG-Chat", type="messages") |
|
msg = gr.Textbox(label="Ställ en fråga...") |
|
send = gr.Button("Skicka") |
|
state = gr.State([]) |
|
profile = gr.State({}) |
|
cache_state = gr.State({}) |
|
|
|
send.click( |
|
fn=chat_rag, |
|
inputs=[msg, state, profile, cache_state], |
|
outputs=[msg, chatbot, profile, cache_state] |
|
) |
|
|
|
|
|
login_button.click( |
|
fn=login, |
|
inputs=[password_input], |
|
outputs=[main_app_panel, login_message] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|