File size: 5,234 Bytes
4e0ee33
 
 
 
 
 
 
 
 
 
 
 
 
 
a38c567
4e0ee33
 
 
 
 
 
 
 
 
 
 
 
 
 
07b2aa1
 
 
 
 
 
 
4e0ee33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16d6410
 
 
 
 
4e0ee33
 
 
16d6410
 
 
 
4e0ee33
16d6410
4e0ee33
16d6410
 
 
4e0ee33
16d6410
 
4e0ee33
16d6410
 
 
 
 
 
 
4e0ee33
 
 
16d6410
 
 
 
 
4e0ee33
 
 
16d6410
 
 
 
 
 
4e0ee33
16d6410
 
4e0ee33
16d6410
 
 
 
 
4e0ee33
 
 
16d6410
 
 
4e0ee33
16d6410
 
4e0ee33
16d6410
 
4e0ee33
 
 
16d6410
 
 
 
 
 
a38c567
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import os
import getpass
import faiss
import numpy as np
import warnings
import logging

# Suppress warnings
logging.getLogger("pdfminer").setLevel(logging.ERROR)
warnings.filterwarnings("ignore")

from google import genai
from google.genai import types
from sentence_transformers import SentenceTransformer
from transformers import pipeline
from langchain_community.document_loaders import(
    UnstructuredPDFLoader,
    TextLoader,
    CSVLoader,
    JSONLoader,
    UnstructuredPowerPointLoader,
    UnstructuredExcelLoader,
    UnstructuredXMLLoader,
    UnstructuredWordDocumentLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter


def authenticate():
  """Authenticates with the Google Generative AI API using an API key."""
  api_key = os.environ.get("GOOGLE_API_KEY")
  if not api_key:
    api_key = getpass.getpass("Enter your API Key: ")
  
  client = genai.Client(api_key=api_key)
  return client


def load_documents_gradio(uploaded_files):
  docs = []
  for file in uploaded_files:
    file_path = file.name
    # Detect type and load accordingly
    if file_path.lower().endswith('.pdf'):
      docs.extend(UnstructuredPDFLoader(file_path).load())
    elif file_path.lower().endswith('.txt'):
      docs.extend(TextLoader(file_path).load())
    elif file_path.lower().endswith('.csv'):
      docs.extend(CSVLoader(file_path).load())
    elif file_path.lower().endswith('.json'):
      docs.extend(JSONLoader(file_path).load())
    elif file_path.lower().endswith('.pptx'):
      docs.extend(UnstructuredPowerPointLoader(file_path).load())
    elif file_path.lower().endswith('.xlsx'):
      docs.extend(UnstructuredExcelLoader(file_path).load())
    elif file_path.lower().endswith('.xml'):
      docs.extend(UnstructuredXMLLoader(file_path).load())
    elif file_path.lower().endswith('.docx'):
      docs.extend(UnstructuredWordDocumentLoader(file_path).load())
    else:
      print(f'Unsupported File Type: {file_path}')
  return docs


def split_documents(docs, chunk_size=500, chunk_overlap=100):
  """Splits documents into smaller chunks using RecursiveCharacterTextSplitter."""
  splitter = RecursiveCharacterTextSplitter(
    chunk_size=chunk_size, chunk_overlap=chunk_overlap
  )
  return splitter.split_documents(docs)


def build_vectorstore(docs, embedding_model_name="all-MiniLM-L6-v2"):
  """Builds a FAISS vector store from the document chunks."""
  texts = [doc.page_content.strip() for doc in docs if doc.page_content.strip()]
  if not texts:
    raise ValueError("No valid text found in the documents.")

  print(f"No. of Chunks: {len(texts)}")

  model = SentenceTransformer(embedding_model_name)
  embeddings = model.encode(texts)
  print(embeddings.shape)

  index = faiss.IndexFlatL2(embeddings.shape[1])
  index.add(np.array(embeddings).astype("float32"))

  return {
    "index": index,
    "texts": texts,
    "embedding_model": model,
    "embeddings": embeddings,
    "chunks": len(texts)
  }


def retrieve_context(query, store, k=6):
  """Retrieves the top-k context chunks most similar to the query."""
  query_vec = store["embedding_model"].encode([query])
  k = min(k, len(store["texts"]))
  distances, indices = store["index"].search(query_vec, k)
  return [store["texts"][i] for i in indices[0]]


def retrieve_context_approx(query, store, k=6):
  """Retrieves context chunks using approximate nearest neighbor search."""
  ncells = 50
  D = store["index"].d
  index = faiss.IndexFlatL2(D)
  nindex = faiss.IndexIVFFlat(index, D, ncells)
  nindex.nprobe = 10

  if not nindex.is_trained:
    nindex.train(np.array(store["embeddings"]).astype("float32"))

  nindex.add(np.array(store["embeddings"]).astype("float32"))
  query_vec = store["embedding_model"].encode([query])
  k = min(k, len(store["texts"]))
  _, indices = nindex.search(np.array(query_vec).astype("float32"), k)
  return [store["texts"][i] for i in indices[0]]


def build_prompt(context_chunks, query):
  """Builds the prompt for the Gemini API using context and query."""
  context = "\n".join(context_chunks)
  return f"""You are a highly knowledgeable and helpful assistant. Use the following context to generate a **detailed and step-by-step** answer to the user's question. Include explanations, examples, and reasoning wherever helpful.

  Context:
  {context}

  Question: {query}
  Answer:"""


def ask_gemini(prompt, client):
  """Calls the Gemini API with the given prompt and returns the response."""
  response = client.models.generate_content(
    model="gemini-2.0-flash",  # Or your preferred model
    contents=[prompt],
    config=types.GenerateContentConfig(max_output_tokens=2048, temperature=0.5, seed=42),
  )
  return response.text

# Speech2Text:
def transcribe(audio, model="openai/whisper-base.en"):
  if audio is None:
    raise ValueError("No audio detected!")
  
  transcriber = pipeline("automatic-speech-recognition", model=model)
  sr, y = audio # Sampling rate (KHz) and y= amplitude array

  if y.ndim > 1: # Convert to Mono (CH=1) if Stereo (CH=2; L & R)
    y = y.mean(1)

  y = y.astype(np.float32)
  y /= np.max(np.abs(y)) # Normalizing the amplitude values in range [-1,1]

  result = transcriber({"sampling_rate" : sr, "raw" : y})
  return result["text"]