ISE / search_engine.py
fikird
Update OSINTEngine to integrate with external implementation
15a525a
raw
history blame
8.01 kB
from typing import Dict, List, Any
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import ddg
from transformers import pipeline
from langchain.embeddings import HuggingFaceEmbeddings
import time
import json
import os
from urllib.parse import urlparse
import asyncio
class ModelManager:
"""Manages AI models for text processing"""
def __init__(self):
# Initialize with smaller, CPU-friendly models
self.summarizer = pipeline(
"summarization",
model="facebook/bart-base",
device=-1 # Use CPU
)
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
def generate_summary(self, text: str, max_length: int = 150) -> str:
"""Generate a concise summary of the text"""
if not text or len(text.split()) < 50:
return text
try:
summary = self.summarizer(
text,
max_length=max_length,
min_length=30,
do_sample=False
)[0]['summary_text']
return summary
except Exception as e:
print(f"Error in summarization: {e}")
return text[:500] + "..."
class ContentProcessor:
"""Processes and analyzes different types of content"""
def __init__(self):
self.model_manager = ModelManager()
def process_content(self, content: str) -> Dict[str, Any]:
"""Process content and generate insights"""
if not content:
return {"summary": "", "insights": []}
try:
summary = self.model_manager.generate_summary(content)
return {
"summary": summary,
"insights": [] # Simplified for CPU deployment
}
except Exception as e:
print(f"Error processing content: {e}")
return {"summary": content[:500] + "...", "insights": []}
class OSINTEngine:
"""Main OSINT engine class"""
def __init__(self):
from osint_engine import OSINTEngine as ExternalOSINT
self.engine = ExternalOSINT()
async def search_username(self, query: str) -> Dict[str, Any]:
"""Search for usernames"""
return await self.engine.search_username(query)
async def search_image(self, query: str) -> Dict[str, Any]:
"""Search for images"""
return await self.engine.search_image(query)
async def search_social_media(self, query: str, platform: str) -> Dict[str, Any]:
"""Search for social media profiles"""
results = await self.engine.search_username(query)
if platform:
return {platform: [r for r in results.get('platforms', []) if r['platform'].lower() == platform.lower()]}
return results
async def gather_personal_info(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Gather personal information"""
return await self.engine.gather_personal_info(kwargs)
async def search_historical_data(self, query: str) -> Dict[str, Any]:
"""Search for historical data"""
return await self.engine.search_historical_data(query)
class WebSearchEngine:
"""Main search engine class"""
def __init__(self):
self.processor = ContentProcessor()
self.session = requests.Session()
self.request_delay = 1.0
self.last_request_time = 0
self.osint_engine = OSINTEngine() # Add OSINT engine
def is_valid_url(self, url: str) -> bool:
"""Check if URL is valid for crawling"""
try:
parsed = urlparse(url)
return bool(parsed.netloc and parsed.scheme in ['http', 'https'])
except:
return False
def get_metadata(self, soup: BeautifulSoup) -> Dict[str, str]:
"""Extract metadata from page"""
metadata = {}
# Get title
title = soup.find('title')
if title:
metadata['title'] = title.text.strip()
# Get meta description
desc = soup.find('meta', attrs={'name': 'description'})
if desc:
metadata['description'] = desc.get('content', '')
# Get publication date
date = soup.find('meta', attrs={'property': 'article:published_time'})
if date:
metadata['published_date'] = date.get('content', '').split('T')[0]
return metadata
def process_url(self, url: str) -> Dict[str, Any]:
"""Process a single URL"""
if not self.is_valid_url(url):
return None
try:
# Rate limiting
current_time = time.time()
if current_time - self.last_request_time < self.request_delay:
time.sleep(self.request_delay)
response = self.session.get(url, timeout=10)
self.last_request_time = time.time()
if response.status_code != 200:
return None
soup = BeautifulSoup(response.text, 'lxml')
metadata = self.get_metadata(soup)
# Extract main content (simplified)
content = ' '.join([p.text for p in soup.find_all('p')])
processed = self.processor.process_content(content)
return {
'url': url,
'title': metadata.get('title', url),
'summary': processed['summary'],
'published_date': metadata.get('published_date', '')
}
except Exception as e:
print(f"Error processing URL {url}: {e}")
return None
def search(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]:
"""Perform search and process results"""
try:
# Perform DuckDuckGo search
search_results = ddg(query, max_results=max_results)
results = []
for result in search_results:
processed = self.process_url(result['link'])
if processed:
results.append(processed)
return results[:max_results]
except Exception as e:
print(f"Error in search: {e}")
return []
async def advanced_search(self, query: str, search_type: str = "web", **kwargs) -> Dict[str, Any]:
"""Perform advanced search based on type"""
results = {}
try:
if search_type == "web":
results["web"] = self.search(query, kwargs.get("max_results", 5))
elif search_type == "username":
results["osint"] = await self.osint_engine.search_username(query)
elif search_type == "image":
results["image"] = await self.osint_engine.search_image(query)
elif search_type == "social":
results["social"] = await self.osint_engine.search_social_media(
query,
kwargs.get("platform")
)
elif search_type == "personal":
results["personal"] = await self.osint_engine.gather_personal_info(kwargs)
elif search_type == "historical":
results["historical"] = await self.osint_engine.search_historical_data(query)
except Exception as e:
results["error"] = str(e)
return results
# Main search function
def search(query: str, max_results: int = 5) -> List[Dict[str, Any]]:
"""Main search function"""
engine = WebSearchEngine()
return engine.search(query, max_results)
# Main advanced search function
async def advanced_search(query: str, search_type: str = "web", **kwargs) -> Dict[str, Any]:
"""Main advanced search function"""
engine = WebSearchEngine()
return await engine.advanced_search(query, search_type, **kwargs)