Spaces:
Running
Running
File size: 5,234 Bytes
4e0ee33 a38c567 4e0ee33 07b2aa1 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 4e0ee33 16d6410 a38c567 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import os
import getpass
import faiss
import numpy as np
import warnings
import logging
# Suppress warnings
logging.getLogger("pdfminer").setLevel(logging.ERROR)
warnings.filterwarnings("ignore")
from google import genai
from google.genai import types
from sentence_transformers import SentenceTransformer
from transformers import pipeline
from langchain_community.document_loaders import(
UnstructuredPDFLoader,
TextLoader,
CSVLoader,
JSONLoader,
UnstructuredPowerPointLoader,
UnstructuredExcelLoader,
UnstructuredXMLLoader,
UnstructuredWordDocumentLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
def authenticate():
"""Authenticates with the Google Generative AI API using an API key."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
api_key = getpass.getpass("Enter your API Key: ")
client = genai.Client(api_key=api_key)
return client
def load_documents_gradio(uploaded_files):
docs = []
for file in uploaded_files:
file_path = file.name
# Detect type and load accordingly
if file_path.lower().endswith('.pdf'):
docs.extend(UnstructuredPDFLoader(file_path).load())
elif file_path.lower().endswith('.txt'):
docs.extend(TextLoader(file_path).load())
elif file_path.lower().endswith('.csv'):
docs.extend(CSVLoader(file_path).load())
elif file_path.lower().endswith('.json'):
docs.extend(JSONLoader(file_path).load())
elif file_path.lower().endswith('.pptx'):
docs.extend(UnstructuredPowerPointLoader(file_path).load())
elif file_path.lower().endswith('.xlsx'):
docs.extend(UnstructuredExcelLoader(file_path).load())
elif file_path.lower().endswith('.xml'):
docs.extend(UnstructuredXMLLoader(file_path).load())
elif file_path.lower().endswith('.docx'):
docs.extend(UnstructuredWordDocumentLoader(file_path).load())
else:
print(f'Unsupported File Type: {file_path}')
return docs
def split_documents(docs, chunk_size=500, chunk_overlap=100):
"""Splits documents into smaller chunks using RecursiveCharacterTextSplitter."""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
return splitter.split_documents(docs)
def build_vectorstore(docs, embedding_model_name="all-MiniLM-L6-v2"):
"""Builds a FAISS vector store from the document chunks."""
texts = [doc.page_content.strip() for doc in docs if doc.page_content.strip()]
if not texts:
raise ValueError("No valid text found in the documents.")
print(f"No. of Chunks: {len(texts)}")
model = SentenceTransformer(embedding_model_name)
embeddings = model.encode(texts)
print(embeddings.shape)
index = faiss.IndexFlatL2(embeddings.shape[1])
index.add(np.array(embeddings).astype("float32"))
return {
"index": index,
"texts": texts,
"embedding_model": model,
"embeddings": embeddings,
"chunks": len(texts)
}
def retrieve_context(query, store, k=6):
"""Retrieves the top-k context chunks most similar to the query."""
query_vec = store["embedding_model"].encode([query])
k = min(k, len(store["texts"]))
distances, indices = store["index"].search(query_vec, k)
return [store["texts"][i] for i in indices[0]]
def retrieve_context_approx(query, store, k=6):
"""Retrieves context chunks using approximate nearest neighbor search."""
ncells = 50
D = store["index"].d
index = faiss.IndexFlatL2(D)
nindex = faiss.IndexIVFFlat(index, D, ncells)
nindex.nprobe = 10
if not nindex.is_trained:
nindex.train(np.array(store["embeddings"]).astype("float32"))
nindex.add(np.array(store["embeddings"]).astype("float32"))
query_vec = store["embedding_model"].encode([query])
k = min(k, len(store["texts"]))
_, indices = nindex.search(np.array(query_vec).astype("float32"), k)
return [store["texts"][i] for i in indices[0]]
def build_prompt(context_chunks, query):
"""Builds the prompt for the Gemini API using context and query."""
context = "\n".join(context_chunks)
return f"""You are a highly knowledgeable and helpful assistant. Use the following context to generate a **detailed and step-by-step** answer to the user's question. Include explanations, examples, and reasoning wherever helpful.
Context:
{context}
Question: {query}
Answer:"""
def ask_gemini(prompt, client):
"""Calls the Gemini API with the given prompt and returns the response."""
response = client.models.generate_content(
model="gemini-2.0-flash", # Or your preferred model
contents=[prompt],
config=types.GenerateContentConfig(max_output_tokens=2048, temperature=0.5, seed=42),
)
return response.text
# Speech2Text:
def transcribe(audio, model="openai/whisper-base.en"):
if audio is None:
raise ValueError("No audio detected!")
transcriber = pipeline("automatic-speech-recognition", model=model)
sr, y = audio # Sampling rate (KHz) and y= amplitude array
if y.ndim > 1: # Convert to Mono (CH=1) if Stereo (CH=2; L & R)
y = y.mean(1)
y = y.astype(np.float32)
y /= np.max(np.abs(y)) # Normalizing the amplitude values in range [-1,1]
result = transcriber({"sampling_rate" : sr, "raw" : y})
return result["text"] |