Spaces:
Runtime error
Runtime error
""" | |
Advanced RAG-based search engine with multi-source intelligence. | |
""" | |
from typing import List, Dict, Any, Optional | |
import asyncio | |
from langchain.chains import RetrievalQAWithSourcesChain | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.docstore.document import Document | |
from duckduckgo_search import DDGS | |
from googlesearch import search as gsearch | |
import requests | |
from bs4 import BeautifulSoup | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
import json | |
import time | |
from datetime import datetime, timedelta | |
import hashlib | |
from urllib.parse import urlparse | |
import re | |
class SearchEngine: | |
def __init__(self): | |
self.embeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-mpnet-base-v2" | |
) | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=50 | |
) | |
self.cache = {} | |
self.cache_ttl = timedelta(hours=24) | |
self.search_delay = 2 # seconds between searches | |
self.last_search_time = datetime.min | |
def _get_cache_key(self, query: str, **kwargs) -> str: | |
"""Generate cache key from query and kwargs.""" | |
cache_data = { | |
"query": query, | |
**kwargs | |
} | |
return hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest() | |
def _get_cached_result(self, cache_key: str) -> Optional[Dict[str, Any]]: | |
"""Get result from cache if valid.""" | |
if cache_key in self.cache: | |
result, timestamp = self.cache[cache_key] | |
if datetime.now() - timestamp < self.cache_ttl: | |
return result | |
del self.cache[cache_key] | |
return None | |
def _set_cached_result(self, cache_key: str, result: Dict[str, Any]): | |
"""Store result in cache.""" | |
self.cache[cache_key] = (result, datetime.now()) | |
async def search_web(self, query: str, max_results: int = 10) -> List[Dict[str, str]]: | |
"""Perform web search using multiple search engines.""" | |
results = [] | |
# Respect rate limiting | |
time_since_last = datetime.now() - self.last_search_time | |
if time_since_last.total_seconds() < self.search_delay: | |
await asyncio.sleep(self.search_delay - time_since_last.total_seconds()) | |
# DuckDuckGo Search | |
try: | |
with DDGS() as ddgs: | |
ddg_results = [r for r in ddgs.text(query, max_results=max_results)] | |
results.extend(ddg_results) | |
except Exception as e: | |
print(f"DuckDuckGo search error: {e}") | |
# Google Search | |
try: | |
google_results = gsearch(query, num_results=max_results) | |
results.extend([{"link": url, "title": url} for url in google_results]) | |
except Exception as e: | |
print(f"Google search error: {e}") | |
self.last_search_time = datetime.now() | |
return results[:max_results] | |
def _clean_html(self, html: str) -> str: | |
"""Clean HTML content.""" | |
# Remove script and style elements | |
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL) | |
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL) | |
# Remove comments | |
html = re.sub(r'<!--.*?-->', '', html, flags=re.DOTALL) | |
# Remove remaining tags | |
html = re.sub(r'<[^>]+>', ' ', html) | |
# Clean whitespace | |
html = re.sub(r'\s+', ' ', html).strip() | |
return html | |
async def fetch_content(self, url: str) -> Optional[str]: | |
"""Fetch and extract content from a webpage.""" | |
try: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
} | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
# Extract main content | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Remove unwanted elements | |
for element in soup(["script", "style", "nav", "footer", "header", "aside"]): | |
element.decompose() | |
# Try to find main content | |
main_content = None | |
# Look for article tag | |
if soup.find("article"): | |
main_content = soup.find("article") | |
# Look for main tag | |
elif soup.find("main"): | |
main_content = soup.find("main") | |
# Look for div with common content class names | |
elif soup.find("div", class_=re.compile(r"content|article|post|entry")): | |
main_content = soup.find("div", class_=re.compile(r"content|article|post|entry")) | |
# Use body if no main content found | |
if not main_content: | |
main_content = soup.body | |
# Extract text | |
if main_content: | |
text = self._clean_html(str(main_content)) | |
else: | |
text = self._clean_html(response.text) | |
return text | |
except Exception as e: | |
print(f"Error fetching {url}: {e}") | |
return None | |
def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: | |
"""Extract metadata from webpage.""" | |
metadata = { | |
"url": url, | |
"domain": urlparse(url).netloc, | |
"title": None, | |
"description": None, | |
"published_date": None, | |
"author": None, | |
"keywords": None | |
} | |
# Extract title | |
if soup.title: | |
metadata["title"] = soup.title.string | |
# Extract meta tags | |
for meta in soup.find_all("meta"): | |
name = meta.get("name", "").lower() | |
property = meta.get("property", "").lower() | |
content = meta.get("content") | |
if name == "description" or property == "og:description": | |
metadata["description"] = content | |
elif name == "author": | |
metadata["author"] = content | |
elif name == "keywords": | |
metadata["keywords"] = content | |
elif name in ["published_time", "article:published_time"]: | |
metadata["published_date"] = content | |
return metadata | |
async def process_search_results(self, query: str) -> Dict[str, Any]: | |
"""Process search results and create a RAG-based answer.""" | |
cache_key = self._get_cache_key(query) | |
cached_result = self._get_cached_result(cache_key) | |
if cached_result: | |
return cached_result | |
# Perform web search | |
search_results = await self.search_web(query) | |
# Fetch content from search results | |
documents = [] | |
metadata_list = [] | |
for result in search_results: | |
url = result.get("link") | |
if not url: | |
continue | |
content = await self.fetch_content(url) | |
if content: | |
# Split content into chunks | |
chunks = self.text_splitter.split_text(content) | |
# Store metadata | |
metadata = { | |
"source": url, | |
"title": result.get("title", url), | |
**result | |
} | |
metadata_list.append(metadata) | |
# Create documents | |
for chunk in chunks: | |
doc = Document( | |
page_content=chunk, | |
metadata=metadata | |
) | |
documents.append(doc) | |
if not documents: | |
return { | |
"answer": "I couldn't find any relevant information.", | |
"sources": [], | |
"metadata": [] | |
} | |
# Create vector store | |
vectorstore = FAISS.from_documents(documents, self.embeddings) | |
# Create retrieval chain | |
chain = RetrievalQAWithSourcesChain.from_chain_type( | |
llm=None, # We'll implement custom answer synthesis | |
retriever=vectorstore.as_retriever() | |
) | |
# Get relevant documents | |
relevant_docs = chain.retriever.get_relevant_documents(query) | |
# Extract unique sources and content | |
sources = [] | |
content = [] | |
used_metadata = [] | |
for doc in relevant_docs[:5]: # Limit to top 5 most relevant docs | |
source = doc.metadata["source"] | |
if source not in sources: | |
sources.append(source) | |
content.append(doc.page_content) | |
# Find corresponding metadata | |
for meta in metadata_list: | |
if meta["source"] == source: | |
used_metadata.append(meta) | |
break | |
result = { | |
"answer": "\n\n".join(content), | |
"sources": sources, | |
"metadata": used_metadata | |
} | |
# Cache the result | |
self._set_cached_result(cache_key, result) | |
return result | |
async def search(self, query: str) -> Dict[str, Any]: | |
"""Main search interface.""" | |
try: | |
return await self.process_search_results(query) | |
except Exception as e: | |
return { | |
"answer": f"An error occurred: {str(e)}", | |
"sources": [], | |
"metadata": [] | |
} | |