from typing import Dict, List, Any import requests from bs4 import BeautifulSoup from duckduckgo_search import ddg from transformers import pipeline from langchain.embeddings import HuggingFaceEmbeddings import time import json import os from urllib.parse import urlparse import asyncio class ModelManager: """Manages AI models for text processing""" def __init__(self): # Initialize with smaller, CPU-friendly models self.summarizer = pipeline( "summarization", model="facebook/bart-base", device=-1 # Use CPU ) self.embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) def generate_summary(self, text: str, max_length: int = 150) -> str: """Generate a concise summary of the text""" if not text or len(text.split()) < 50: return text try: summary = self.summarizer( text, max_length=max_length, min_length=30, do_sample=False )[0]['summary_text'] return summary except Exception as e: print(f"Error in summarization: {e}") return text[:500] + "..." class ContentProcessor: """Processes and analyzes different types of content""" def __init__(self): self.model_manager = ModelManager() def process_content(self, content: str) -> Dict[str, Any]: """Process content and generate insights""" if not content: return {"summary": "", "insights": []} try: summary = self.model_manager.generate_summary(content) return { "summary": summary, "insights": [] # Simplified for CPU deployment } except Exception as e: print(f"Error processing content: {e}") return {"summary": content[:500] + "...", "insights": []} class OSINTEngine: """Main OSINT engine class""" def __init__(self): from osint_engine import OSINTEngine as ExternalOSINT self.engine = ExternalOSINT() async def search_username(self, query: str) -> Dict[str, Any]: """Search for usernames""" return await self.engine.search_username(query) async def search_image(self, query: str) -> Dict[str, Any]: """Search for images""" return await self.engine.search_image(query) async def search_social_media(self, query: str, platform: str) -> Dict[str, Any]: """Search for social media profiles""" results = await self.engine.search_username(query) if platform: return {platform: [r for r in results.get('platforms', []) if r['platform'].lower() == platform.lower()]} return results async def gather_personal_info(self, kwargs: Dict[str, Any]) -> Dict[str, Any]: """Gather personal information""" return await self.engine.gather_personal_info(kwargs) async def search_historical_data(self, query: str) -> Dict[str, Any]: """Search for historical data""" return await self.engine.search_historical_data(query) class WebSearchEngine: """Main search engine class""" def __init__(self): self.processor = ContentProcessor() self.session = requests.Session() self.request_delay = 1.0 self.last_request_time = 0 self.osint_engine = OSINTEngine() # Add OSINT engine def is_valid_url(self, url: str) -> bool: """Check if URL is valid for crawling""" try: parsed = urlparse(url) return bool(parsed.netloc and parsed.scheme in ['http', 'https']) except: return False def get_metadata(self, soup: BeautifulSoup) -> Dict[str, str]: """Extract metadata from page""" metadata = {} # Get title title = soup.find('title') if title: metadata['title'] = title.text.strip() # Get meta description desc = soup.find('meta', attrs={'name': 'description'}) if desc: metadata['description'] = desc.get('content', '') # Get publication date date = soup.find('meta', attrs={'property': 'article:published_time'}) if date: metadata['published_date'] = date.get('content', '').split('T')[0] return metadata def process_url(self, url: str) -> Dict[str, Any]: """Process a single URL""" if not self.is_valid_url(url): return None try: # Rate limiting current_time = time.time() if current_time - self.last_request_time < self.request_delay: time.sleep(self.request_delay) response = self.session.get(url, timeout=10) self.last_request_time = time.time() if response.status_code != 200: return None soup = BeautifulSoup(response.text, 'lxml') metadata = self.get_metadata(soup) # Extract main content (simplified) content = ' '.join([p.text for p in soup.find_all('p')]) processed = self.processor.process_content(content) return { 'url': url, 'title': metadata.get('title', url), 'summary': processed['summary'], 'published_date': metadata.get('published_date', '') } except Exception as e: print(f"Error processing URL {url}: {e}") return None def search(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]: """Perform search and process results""" try: # Perform DuckDuckGo search search_results = ddg(query, max_results=max_results) results = [] for result in search_results: processed = self.process_url(result['link']) if processed: results.append(processed) return results[:max_results] except Exception as e: print(f"Error in search: {e}") return [] async def advanced_search(self, query: str, search_type: str = "web", **kwargs) -> Dict[str, Any]: """Perform advanced search based on type""" results = {} try: if search_type == "web": results["web"] = self.search(query, kwargs.get("max_results", 5)) elif search_type == "username": results["osint"] = await self.osint_engine.search_username(query) elif search_type == "image": results["image"] = await self.osint_engine.search_image(query) elif search_type == "social": results["social"] = await self.osint_engine.search_social_media( query, kwargs.get("platform") ) elif search_type == "personal": results["personal"] = await self.osint_engine.gather_personal_info(kwargs) elif search_type == "historical": results["historical"] = await self.osint_engine.search_historical_data(query) except Exception as e: results["error"] = str(e) return results # Main search function def search(query: str, max_results: int = 5) -> List[Dict[str, Any]]: """Main search function""" engine = WebSearchEngine() return engine.search(query, max_results) # Main advanced search function async def advanced_search(query: str, search_type: str = "web", **kwargs) -> Dict[str, Any]: """Main advanced search function""" engine = WebSearchEngine() return await engine.advanced_search(query, search_type, **kwargs)