ISE / engines /search.py
fikird
feat: Enhanced search engine with caching and metadata
a3440c5
raw
history blame
10.1 kB
"""
Advanced RAG-based search engine with multi-source intelligence.
"""
from typing import List, Dict, Any, Optional
import asyncio
from langchain.chains import RetrievalQAWithSourcesChain
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from duckduckgo_search import DDGS
from googlesearch import search as gsearch
import requests
from bs4 import BeautifulSoup
from tenacity import retry, stop_after_attempt, wait_exponential
import json
import time
from datetime import datetime, timedelta
import hashlib
from urllib.parse import urlparse
import re
class SearchEngine:
def __init__(self):
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2"
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
self.cache = {}
self.cache_ttl = timedelta(hours=24)
self.search_delay = 2 # seconds between searches
self.last_search_time = datetime.min
def _get_cache_key(self, query: str, **kwargs) -> str:
"""Generate cache key from query and kwargs."""
cache_data = {
"query": query,
**kwargs
}
return hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest()
def _get_cached_result(self, cache_key: str) -> Optional[Dict[str, Any]]:
"""Get result from cache if valid."""
if cache_key in self.cache:
result, timestamp = self.cache[cache_key]
if datetime.now() - timestamp < self.cache_ttl:
return result
del self.cache[cache_key]
return None
def _set_cached_result(self, cache_key: str, result: Dict[str, Any]):
"""Store result in cache."""
self.cache[cache_key] = (result, datetime.now())
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def search_web(self, query: str, max_results: int = 10) -> List[Dict[str, str]]:
"""Perform web search using multiple search engines."""
results = []
# Respect rate limiting
time_since_last = datetime.now() - self.last_search_time
if time_since_last.total_seconds() < self.search_delay:
await asyncio.sleep(self.search_delay - time_since_last.total_seconds())
# DuckDuckGo Search
try:
with DDGS() as ddgs:
ddg_results = [r for r in ddgs.text(query, max_results=max_results)]
results.extend(ddg_results)
except Exception as e:
print(f"DuckDuckGo search error: {e}")
# Google Search
try:
google_results = gsearch(query, num_results=max_results)
results.extend([{"link": url, "title": url} for url in google_results])
except Exception as e:
print(f"Google search error: {e}")
self.last_search_time = datetime.now()
return results[:max_results]
def _clean_html(self, html: str) -> str:
"""Clean HTML content."""
# Remove script and style elements
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL)
# Remove comments
html = re.sub(r'<!--.*?-->', '', html, flags=re.DOTALL)
# Remove remaining tags
html = re.sub(r'<[^>]+>', ' ', html)
# Clean whitespace
html = re.sub(r'\s+', ' ', html).strip()
return html
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def fetch_content(self, url: str) -> Optional[str]:
"""Fetch and extract content from a webpage."""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Extract main content
soup = BeautifulSoup(response.text, "html.parser")
# Remove unwanted elements
for element in soup(["script", "style", "nav", "footer", "header", "aside"]):
element.decompose()
# Try to find main content
main_content = None
# Look for article tag
if soup.find("article"):
main_content = soup.find("article")
# Look for main tag
elif soup.find("main"):
main_content = soup.find("main")
# Look for div with common content class names
elif soup.find("div", class_=re.compile(r"content|article|post|entry")):
main_content = soup.find("div", class_=re.compile(r"content|article|post|entry"))
# Use body if no main content found
if not main_content:
main_content = soup.body
# Extract text
if main_content:
text = self._clean_html(str(main_content))
else:
text = self._clean_html(response.text)
return text
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
"""Extract metadata from webpage."""
metadata = {
"url": url,
"domain": urlparse(url).netloc,
"title": None,
"description": None,
"published_date": None,
"author": None,
"keywords": None
}
# Extract title
if soup.title:
metadata["title"] = soup.title.string
# Extract meta tags
for meta in soup.find_all("meta"):
name = meta.get("name", "").lower()
property = meta.get("property", "").lower()
content = meta.get("content")
if name == "description" or property == "og:description":
metadata["description"] = content
elif name == "author":
metadata["author"] = content
elif name == "keywords":
metadata["keywords"] = content
elif name in ["published_time", "article:published_time"]:
metadata["published_date"] = content
return metadata
async def process_search_results(self, query: str) -> Dict[str, Any]:
"""Process search results and create a RAG-based answer."""
cache_key = self._get_cache_key(query)
cached_result = self._get_cached_result(cache_key)
if cached_result:
return cached_result
# Perform web search
search_results = await self.search_web(query)
# Fetch content from search results
documents = []
metadata_list = []
for result in search_results:
url = result.get("link")
if not url:
continue
content = await self.fetch_content(url)
if content:
# Split content into chunks
chunks = self.text_splitter.split_text(content)
# Store metadata
metadata = {
"source": url,
"title": result.get("title", url),
**result
}
metadata_list.append(metadata)
# Create documents
for chunk in chunks:
doc = Document(
page_content=chunk,
metadata=metadata
)
documents.append(doc)
if not documents:
return {
"answer": "I couldn't find any relevant information.",
"sources": [],
"metadata": []
}
# Create vector store
vectorstore = FAISS.from_documents(documents, self.embeddings)
# Create retrieval chain
chain = RetrievalQAWithSourcesChain.from_chain_type(
llm=None, # We'll implement custom answer synthesis
retriever=vectorstore.as_retriever()
)
# Get relevant documents
relevant_docs = chain.retriever.get_relevant_documents(query)
# Extract unique sources and content
sources = []
content = []
used_metadata = []
for doc in relevant_docs[:5]: # Limit to top 5 most relevant docs
source = doc.metadata["source"]
if source not in sources:
sources.append(source)
content.append(doc.page_content)
# Find corresponding metadata
for meta in metadata_list:
if meta["source"] == source:
used_metadata.append(meta)
break
result = {
"answer": "\n\n".join(content),
"sources": sources,
"metadata": used_metadata
}
# Cache the result
self._set_cached_result(cache_key, result)
return result
async def search(self, query: str) -> Dict[str, Any]:
"""Main search interface."""
try:
return await self.process_search_results(query)
except Exception as e:
return {
"answer": f"An error occurred: {str(e)}",
"sources": [],
"metadata": []
}