Spaces:
Runtime error
Runtime error
from typing import Dict, List, Any | |
import requests | |
from bs4 import BeautifulSoup | |
from duckduckgo_search import ddg | |
from transformers import pipeline | |
from langchain.embeddings import HuggingFaceEmbeddings | |
import time | |
import json | |
import os | |
from urllib.parse import urlparse | |
import asyncio | |
class ModelManager: | |
"""Manages AI models for text processing""" | |
def __init__(self): | |
# Initialize with smaller, CPU-friendly models | |
self.summarizer = pipeline( | |
"summarization", | |
model="facebook/bart-base", | |
device=-1 # Use CPU | |
) | |
self.embeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-MiniLM-L6-v2" | |
) | |
def generate_summary(self, text: str, max_length: int = 150) -> str: | |
"""Generate a concise summary of the text""" | |
if not text or len(text.split()) < 50: | |
return text | |
try: | |
summary = self.summarizer( | |
text, | |
max_length=max_length, | |
min_length=30, | |
do_sample=False | |
)[0]['summary_text'] | |
return summary | |
except Exception as e: | |
print(f"Error in summarization: {e}") | |
return text[:500] + "..." | |
class ContentProcessor: | |
"""Processes and analyzes different types of content""" | |
def __init__(self): | |
self.model_manager = ModelManager() | |
def process_content(self, content: str) -> Dict[str, Any]: | |
"""Process content and generate insights""" | |
if not content: | |
return {"summary": "", "insights": []} | |
try: | |
summary = self.model_manager.generate_summary(content) | |
return { | |
"summary": summary, | |
"insights": [] # Simplified for CPU deployment | |
} | |
except Exception as e: | |
print(f"Error processing content: {e}") | |
return {"summary": content[:500] + "...", "insights": []} | |
class OSINTEngine: | |
"""Main OSINT engine class""" | |
def __init__(self): | |
from osint_engine import OSINTEngine as ExternalOSINT | |
self.engine = ExternalOSINT() | |
async def search_username(self, query: str) -> Dict[str, Any]: | |
"""Search for usernames""" | |
return await self.engine.search_username(query) | |
async def search_image(self, query: str) -> Dict[str, Any]: | |
"""Search for images""" | |
return await self.engine.search_image(query) | |
async def search_social_media(self, query: str, platform: str) -> Dict[str, Any]: | |
"""Search for social media profiles""" | |
results = await self.engine.search_username(query) | |
if platform: | |
return {platform: [r for r in results.get('platforms', []) if r['platform'].lower() == platform.lower()]} | |
return results | |
async def gather_personal_info(self, kwargs: Dict[str, Any]) -> Dict[str, Any]: | |
"""Gather personal information""" | |
return await self.engine.gather_personal_info(kwargs) | |
async def search_historical_data(self, query: str) -> Dict[str, Any]: | |
"""Search for historical data""" | |
return await self.engine.search_historical_data(query) | |
class WebSearchEngine: | |
"""Main search engine class""" | |
def __init__(self): | |
self.processor = ContentProcessor() | |
self.session = requests.Session() | |
self.request_delay = 1.0 | |
self.last_request_time = 0 | |
self.osint_engine = OSINTEngine() # Add OSINT engine | |
def is_valid_url(self, url: str) -> bool: | |
"""Check if URL is valid for crawling""" | |
try: | |
parsed = urlparse(url) | |
return bool(parsed.netloc and parsed.scheme in ['http', 'https']) | |
except: | |
return False | |
def get_metadata(self, soup: BeautifulSoup) -> Dict[str, str]: | |
"""Extract metadata from page""" | |
metadata = {} | |
# Get title | |
title = soup.find('title') | |
if title: | |
metadata['title'] = title.text.strip() | |
# Get meta description | |
desc = soup.find('meta', attrs={'name': 'description'}) | |
if desc: | |
metadata['description'] = desc.get('content', '') | |
# Get publication date | |
date = soup.find('meta', attrs={'property': 'article:published_time'}) | |
if date: | |
metadata['published_date'] = date.get('content', '').split('T')[0] | |
return metadata | |
def process_url(self, url: str) -> Dict[str, Any]: | |
"""Process a single URL""" | |
if not self.is_valid_url(url): | |
return None | |
try: | |
# Rate limiting | |
current_time = time.time() | |
if current_time - self.last_request_time < self.request_delay: | |
time.sleep(self.request_delay) | |
response = self.session.get(url, timeout=10) | |
self.last_request_time = time.time() | |
if response.status_code != 200: | |
return None | |
soup = BeautifulSoup(response.text, 'lxml') | |
metadata = self.get_metadata(soup) | |
# Extract main content (simplified) | |
content = ' '.join([p.text for p in soup.find_all('p')]) | |
processed = self.processor.process_content(content) | |
return { | |
'url': url, | |
'title': metadata.get('title', url), | |
'summary': processed['summary'], | |
'published_date': metadata.get('published_date', '') | |
} | |
except Exception as e: | |
print(f"Error processing URL {url}: {e}") | |
return None | |
def search(self, query: str, max_results: int = 5) -> List[Dict[str, Any]]: | |
"""Perform search and process results""" | |
try: | |
# Perform DuckDuckGo search | |
search_results = ddg(query, max_results=max_results) | |
results = [] | |
for result in search_results: | |
processed = self.process_url(result['link']) | |
if processed: | |
results.append(processed) | |
return results[:max_results] | |
except Exception as e: | |
print(f"Error in search: {e}") | |
return [] | |
async def advanced_search(self, query: str, search_type: str = "web", **kwargs) -> Dict[str, Any]: | |
"""Perform advanced search based on type""" | |
results = {} | |
try: | |
if search_type == "web": | |
results["web"] = self.search(query, kwargs.get("max_results", 5)) | |
elif search_type == "username": | |
results["osint"] = await self.osint_engine.search_username(query) | |
elif search_type == "image": | |
results["image"] = await self.osint_engine.search_image(query) | |
elif search_type == "social": | |
results["social"] = await self.osint_engine.search_social_media( | |
query, | |
kwargs.get("platform") | |
) | |
elif search_type == "personal": | |
results["personal"] = await self.osint_engine.gather_personal_info(kwargs) | |
elif search_type == "historical": | |
results["historical"] = await self.osint_engine.search_historical_data(query) | |
except Exception as e: | |
results["error"] = str(e) | |
return results | |
# Main search function | |
def search(query: str, max_results: int = 5) -> List[Dict[str, Any]]: | |
"""Main search function""" | |
engine = WebSearchEngine() | |
return engine.search(query, max_results) | |
# Main advanced search function | |
async def advanced_search(query: str, search_type: str = "web", **kwargs) -> Dict[str, Any]: | |
"""Main advanced search function""" | |
engine = WebSearchEngine() | |
return await engine.advanced_search(query, search_type, **kwargs) | |