import os import getpass import faiss import numpy as np import warnings import logging # Suppress warnings logging.getLogger("pdfminer").setLevel(logging.ERROR) warnings.filterwarnings("ignore") from google import genai from google.genai import types from sentence_transformers import SentenceTransformer from transformers import pipeline from langchain_community.document_loaders import( UnstructuredPDFLoader, TextLoader, CSVLoader, JSONLoader, UnstructuredPowerPointLoader, UnstructuredExcelLoader, UnstructuredXMLLoader, UnstructuredWordDocumentLoader, ) from langchain.text_splitter import RecursiveCharacterTextSplitter def authenticate(): """Authenticates with the Google Generative AI API using an API key.""" api_key = os.environ.get("GOOGLE_API_KEY") if not api_key: api_key = getpass.getpass("Enter your API Key: ") client = genai.Client(api_key=api_key) return client def load_documents_gradio(uploaded_files): docs = [] for file in uploaded_files: file_path = file.name # Detect type and load accordingly if file_path.lower().endswith('.pdf'): docs.extend(UnstructuredPDFLoader(file_path).load()) elif file_path.lower().endswith('.txt'): docs.extend(TextLoader(file_path).load()) elif file_path.lower().endswith('.csv'): docs.extend(CSVLoader(file_path).load()) elif file_path.lower().endswith('.json'): docs.extend(JSONLoader(file_path).load()) elif file_path.lower().endswith('.pptx'): docs.extend(UnstructuredPowerPointLoader(file_path).load()) elif file_path.lower().endswith('.xlsx'): docs.extend(UnstructuredExcelLoader(file_path).load()) elif file_path.lower().endswith('.xml'): docs.extend(UnstructuredXMLLoader(file_path).load()) elif file_path.lower().endswith('.docx'): docs.extend(UnstructuredWordDocumentLoader(file_path).load()) else: print(f'Unsupported File Type: {file_path}') return docs def split_documents(docs, chunk_size=500, chunk_overlap=100): """Splits documents into smaller chunks using RecursiveCharacterTextSplitter.""" splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ) return splitter.split_documents(docs) def build_vectorstore(docs, embedding_model_name="all-MiniLM-L6-v2"): """Builds a FAISS vector store from the document chunks.""" texts = [doc.page_content.strip() for doc in docs if doc.page_content.strip()] if not texts: raise ValueError("No valid text found in the documents.") print(f"No. of Chunks: {len(texts)}") model = SentenceTransformer(embedding_model_name) embeddings = model.encode(texts) print(embeddings.shape) index = faiss.IndexFlatL2(embeddings.shape[1]) index.add(np.array(embeddings).astype("float32")) return { "index": index, "texts": texts, "embedding_model": model, "embeddings": embeddings, "chunks": len(texts) } def retrieve_context(query, store, k=6): """Retrieves the top-k context chunks most similar to the query.""" query_vec = store["embedding_model"].encode([query]) k = min(k, len(store["texts"])) distances, indices = store["index"].search(query_vec, k) return [store["texts"][i] for i in indices[0]] def retrieve_context_approx(query, store, k=6): """Retrieves context chunks using approximate nearest neighbor search.""" ncells = 50 D = store["index"].d index = faiss.IndexFlatL2(D) nindex = faiss.IndexIVFFlat(index, D, ncells) nindex.nprobe = 10 if not nindex.is_trained: nindex.train(np.array(store["embeddings"]).astype("float32")) nindex.add(np.array(store["embeddings"]).astype("float32")) query_vec = store["embedding_model"].encode([query]) k = min(k, len(store["texts"])) _, indices = nindex.search(np.array(query_vec).astype("float32"), k) return [store["texts"][i] for i in indices[0]] def build_prompt(context_chunks, query): """Builds the prompt for the Gemini API using context and query.""" context = "\n".join(context_chunks) return f"""You are a highly knowledgeable and helpful assistant. Use the following context to generate a **detailed and step-by-step** answer to the user's question. Include explanations, examples, and reasoning wherever helpful. Context: {context} Question: {query} Answer:""" def ask_gemini(prompt, client): """Calls the Gemini API with the given prompt and returns the response.""" response = client.models.generate_content( model="gemini-2.0-flash", # Or your preferred model contents=[prompt], config=types.GenerateContentConfig(max_output_tokens=2048, temperature=0.5, seed=42), ) return response.text # Speech2Text: def transcribe(audio, model="openai/whisper-base.en"): if audio is None: raise ValueError("No audio detected!") transcriber = pipeline("automatic-speech-recognition", model=model) sr, y = audio # Sampling rate (KHz) and y= amplitude array if y.ndim > 1: # Convert to Mono (CH=1) if Stereo (CH=2; L & R) y = y.mean(1) y = y.astype(np.float32) y /= np.max(np.abs(y)) # Normalizing the amplitude values in range [-1,1] result = transcriber({"sampling_rate" : sr, "raw" : y}) return result["text"]