Spaces:
Runtime error
Runtime error
import os | |
import re | |
import json | |
import time | |
import asyncio | |
import aiohttp | |
import requests | |
import httpx | |
from PIL import Image | |
from io import BytesIO | |
from typing import Dict, List, Any, Union | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from webdriver_manager.chrome import ChromeDriverManager | |
from geopy.geocoders import Nominatim | |
from waybackpy import WaybackMachineCDXServerAPI | |
import whois | |
from datetime import datetime | |
from googlesearch import search as google_search | |
class OSINTEngine: | |
"""OSINT capabilities for advanced information gathering""" | |
def __init__(self): | |
self.chrome_options = Options() | |
self.chrome_options.add_argument('--headless') | |
self.chrome_options.add_argument('--no-sandbox') | |
self.chrome_options.add_argument('--disable-dev-shm-usage') | |
self.setup_apis() | |
def setup_apis(self): | |
"""Initialize API clients""" | |
self.geolocator = Nominatim(user_agent="intelligent_search") | |
self.http_client = httpx.AsyncClient() | |
async def search_username(self, username: str) -> Dict[str, Any]: | |
"""Search for username across multiple platforms""" | |
results = { | |
'platforms': [], | |
'social_media': {}, | |
'websites': [] | |
} | |
# Common social media platforms | |
platforms = [ | |
{'name': 'GitHub', 'url': f'https://github.com/{username}'}, | |
{'name': 'Twitter', 'url': f'https://twitter.com/{username}'}, | |
{'name': 'Instagram', 'url': f'https://instagram.com/{username}'}, | |
{'name': 'LinkedIn', 'url': f'https://linkedin.com/in/{username}'}, | |
{'name': 'Facebook', 'url': f'https://facebook.com/{username}'}, | |
{'name': 'YouTube', 'url': f'https://youtube.com/@{username}'}, | |
] | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for platform in platforms: | |
task = self.check_profile(session, platform['url'], platform['name']) | |
tasks.append(task) | |
platform_results = await asyncio.gather(*tasks) | |
results['platforms'] = [r for r in platform_results if r is not None] | |
# Google search for additional mentions | |
try: | |
search_query = f'"{username}" OR "@{username}" -site:twitter.com -site:facebook.com -site:instagram.com' | |
web_results = list(google_search(search_query, num_results=5)) | |
results['websites'] = web_results | |
except Exception as e: | |
results['websites'] = [str(e)] | |
return results | |
async def check_profile(self, session, url: str, platform: str) -> Dict[str, str]: | |
"""Check if a profile exists on a platform""" | |
try: | |
async with session.get(url) as response: | |
if response.status == 200: | |
return { | |
'platform': platform, | |
'url': url, | |
'exists': True | |
} | |
except: | |
pass | |
return None | |
async def search_image(self, image_url: str) -> Dict[str, Any]: | |
"""Image analysis and reverse search""" | |
results = { | |
'analysis': {}, | |
'similar_images': [], | |
'error': None | |
} | |
try: | |
# Download and analyze image | |
response = requests.get(image_url) | |
img = Image.open(BytesIO(response.content)) | |
# Basic image analysis | |
results['analysis'] = { | |
'format': img.format, | |
'size': img.size, | |
'mode': img.mode | |
} | |
# Perform reverse image search using Google Lens | |
search_url = f"https://lens.google.com/uploadbyurl?url={image_url}" | |
results['similar_images'].append({ | |
'source': 'Google Lens', | |
'url': search_url | |
}) | |
except Exception as e: | |
results['error'] = str(e) | |
return results | |
async def gather_personal_info(self, data: Dict[str, str]) -> Dict[str, Any]: | |
"""Gather personal information from various sources""" | |
results = {} | |
if 'location' in data: | |
results['location'] = await self.analyze_location(data['location']) | |
if 'domain' in data: | |
results['domain'] = self.analyze_domain(data['domain']) | |
return results | |
async def analyze_location(self, location: str) -> Dict[str, Any]: | |
"""Analyze location information""" | |
try: | |
location_data = self.geolocator.geocode(location) | |
if location_data: | |
return { | |
'address': location_data.address, | |
'latitude': location_data.latitude, | |
'longitude': location_data.longitude, | |
'raw': location_data.raw | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
return None | |
def analyze_domain(self, domain: str) -> Dict[str, Any]: | |
"""Analyze domain information""" | |
try: | |
domain_info = whois.whois(domain) | |
return { | |
'registrar': domain_info.registrar, | |
'creation_date': domain_info.creation_date, | |
'expiration_date': domain_info.expiration_date, | |
'last_updated': domain_info.updated_date, | |
'status': domain_info.status | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
async def search_historical_data(self, url: str) -> List[Dict[str, Any]]: | |
"""Search for historical data using Wayback Machine""" | |
results = [] | |
try: | |
user_agent = "Mozilla/5.0" | |
cdx = WaybackMachineCDXServerAPI(url, user_agent) | |
for snapshot in cdx.snapshots(): | |
results.append({ | |
'timestamp': snapshot.timestamp, | |
'url': snapshot.archive_url, | |
'status': snapshot.status_code, | |
'mime_type': snapshot.mime_type | |
}) | |
except Exception as e: | |
results.append({'error': str(e)}) | |
return results | |
# Helper function to create document from gathered information | |
def create_report(data: Dict[str, Any], template: str = "default") -> str: | |
"""Create a formatted report from gathered information""" | |
if template == "default": | |
report = "# OSINT Investigation Report\n\n" | |
report += f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" | |
for section, content in data.items(): | |
report += f"## {section.title()}\n" | |
if isinstance(content, dict): | |
for key, value in content.items(): | |
report += f"* {key}: {value}\n" | |
elif isinstance(content, list): | |
for item in content: | |
if isinstance(item, dict): | |
for k, v in item.items(): | |
report += f"* {k}: {v}\n" | |
else: | |
report += f"* {item}\n" | |
else: | |
report += f"{content}\n" | |
report += "\n" | |
return report | |
else: | |
raise ValueError(f"Template '{template}' not found") | |