Spaces:
Runtime error
Runtime error
import os | |
import re | |
import json | |
import time | |
import asyncio | |
import aiohttp | |
import requests | |
import httpx | |
from PIL import Image | |
from io import BytesIO | |
from typing import Dict, List, Any, Union, Optional | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from webdriver_manager.chrome import ChromeDriverManager | |
from geopy.geocoders import Nominatim | |
from waybackpy import WaybackMachineCDXServerAPI | |
import whois | |
from datetime import datetime | |
from googlesearch import search as google_search | |
import base64 | |
import io | |
class OSINTEngine: | |
"""OSINT capabilities for advanced information gathering""" | |
def __init__(self): | |
self.chrome_options = Options() | |
self.chrome_options.add_argument('--headless') | |
self.chrome_options.add_argument('--no-sandbox') | |
self.chrome_options.add_argument('--disable-dev-shm-usage') | |
self.setup_apis() | |
self.session = None | |
self.platforms = { | |
"twitter": "https://twitter.com/{}", | |
"instagram": "https://instagram.com/{}", | |
"facebook": "https://facebook.com/{}", | |
"linkedin": "https://linkedin.com/in/{}", | |
"github": "https://github.com/{}", | |
"reddit": "https://reddit.com/user/{}", | |
"youtube": "https://youtube.com/@{}", | |
"tiktok": "https://tiktok.com/@{}", | |
"pinterest": "https://pinterest.com/{}", | |
"snapchat": "https://snapchat.com/add/{}", | |
"twitch": "https://twitch.tv/{}", | |
"medium": "https://medium.com/@{}", | |
"devto": "https://dev.to/{}", | |
"stackoverflow": "https://stackoverflow.com/users/{}" | |
} | |
def setup_apis(self): | |
"""Initialize API clients""" | |
self.geolocator = Nominatim(user_agent="intelligent_search") | |
self.http_client = httpx.AsyncClient() | |
async def initialize(self): | |
if not self.session: | |
self.session = aiohttp.ClientSession() | |
async def close(self): | |
if self.session: | |
await self.session.close() | |
self.session = None | |
async def search_username(self, username: str) -> Dict[str, Any]: | |
"""Search for username across multiple platforms""" | |
results = { | |
'platforms': [], | |
'social_media': {}, | |
'websites': [] | |
} | |
# Common social media platforms | |
platforms = [ | |
{'name': 'GitHub', 'url': f'https://github.com/{username}'}, | |
{'name': 'Twitter', 'url': f'https://twitter.com/{username}'}, | |
{'name': 'Instagram', 'url': f'https://instagram.com/{username}'}, | |
{'name': 'LinkedIn', 'url': f'https://linkedin.com/in/{username}'}, | |
{'name': 'Facebook', 'url': f'https://facebook.com/{username}'}, | |
{'name': 'YouTube', 'url': f'https://youtube.com/@{username}'}, | |
] | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for platform in platforms: | |
task = self.check_profile(session, platform['url'], platform['name']) | |
tasks.append(task) | |
platform_results = await asyncio.gather(*tasks) | |
results['platforms'] = [r for r in platform_results if r is not None] | |
# Google search for additional mentions | |
try: | |
search_query = f'"{username}" OR "@{username}" -site:twitter.com -site:facebook.com -site:instagram.com' | |
web_results = list(google_search(search_query, num_results=5)) | |
results['websites'] = web_results | |
except Exception as e: | |
results['websites'] = [str(e)] | |
return results | |
async def check_profile(self, session, url: str, platform: str) -> Dict[str, str]: | |
"""Check if a profile exists on a platform""" | |
try: | |
async with session.get(url) as response: | |
if response.status == 200: | |
return { | |
'platform': platform, | |
'url': url, | |
'exists': True | |
} | |
except: | |
pass | |
return None | |
async def check_username(self, username: str, platform: str = "all") -> List[Dict]: | |
await self.initialize() | |
results = [] | |
platforms_to_check = [platform] if platform != "all" else self.platforms.keys() | |
for platform_name in platforms_to_check: | |
if platform_name in self.platforms: | |
url = self.platforms[platform_name].format(username) | |
try: | |
async with self.session.get(url) as response: | |
exists = response.status == 200 | |
results.append({ | |
"platform": platform_name, | |
"url": url, | |
"exists": exists | |
}) | |
except: | |
results.append({ | |
"platform": platform_name, | |
"url": url, | |
"exists": False, | |
"error": "Connection failed" | |
}) | |
return results | |
async def search_image(self, image_url: str) -> Dict[str, Any]: | |
"""Image analysis and reverse search""" | |
results = { | |
'analysis': {}, | |
'similar_images': [], | |
'error': None | |
} | |
try: | |
# Download and analyze image | |
response = requests.get(image_url) | |
img = Image.open(BytesIO(response.content)) | |
# Basic image analysis | |
results['analysis'] = { | |
'format': img.format, | |
'size': img.size, | |
'mode': img.mode | |
} | |
# Perform reverse image search using Google Lens | |
search_url = f"https://lens.google.com/uploadbyurl?url={image_url}" | |
results['similar_images'].append({ | |
'source': 'Google Lens', | |
'url': search_url | |
}) | |
except Exception as e: | |
results['error'] = str(e) | |
return results | |
async def gather_personal_info(self, data: Dict[str, str]) -> Dict[str, Any]: | |
"""Gather personal information from various sources""" | |
results = {} | |
if 'location' in data: | |
results['location'] = await self.analyze_location(data['location']) | |
if 'domain' in data: | |
results['domain'] = self.analyze_domain(data['domain']) | |
return results | |
async def analyze_location(self, location: str) -> Dict[str, Any]: | |
"""Analyze location information""" | |
try: | |
location_data = self.geolocator.geocode(location) | |
if location_data: | |
return { | |
'address': location_data.address, | |
'latitude': location_data.latitude, | |
'longitude': location_data.longitude, | |
'raw': location_data.raw | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
return None | |
def analyze_domain(self, domain: str) -> Dict[str, Any]: | |
"""Analyze domain information""" | |
try: | |
domain_info = whois.whois(domain) | |
return { | |
'registrar': domain_info.registrar, | |
'creation_date': domain_info.creation_date, | |
'expiration_date': domain_info.expiration_date, | |
'last_updated': domain_info.updated_date, | |
'status': domain_info.status | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
async def search_historical_data(self, url: str) -> List[Dict[str, Any]]: | |
"""Search for historical data using Wayback Machine""" | |
results = [] | |
try: | |
user_agent = "Mozilla/5.0" | |
cdx = WaybackMachineCDXServerAPI(url, user_agent) | |
for snapshot in cdx.snapshots(): | |
results.append({ | |
'timestamp': snapshot.timestamp, | |
'url': snapshot.archive_url, | |
'status': snapshot.status_code, | |
'mime_type': snapshot.mime_type | |
}) | |
except Exception as e: | |
results.append({'error': str(e)}) | |
return results | |
async def search_person(self, name: str, location: Optional[str] = None) -> List[Dict]: | |
await self.initialize() | |
results = [] | |
# Format search query | |
query = f"{name}" | |
if location: | |
query += f" {location}" | |
# Simulate searching various sources | |
sources = ["social_media", "news", "public_records", "professional"] | |
for source in sources: | |
# Simulate different data sources | |
if source == "social_media": | |
profile = { | |
"name": name, | |
"location": location, | |
"source": "Social Media", | |
"profile_image": "https://example.com/profile.jpg", | |
"social_links": [ | |
{"platform": "LinkedIn", "url": f"https://linkedin.com/in/{name.lower().replace(' ', '-')}"}, | |
{"platform": "Twitter", "url": f"https://twitter.com/{name.lower().replace(' ', '')}"} | |
], | |
"occupation": "Professional", | |
"last_seen": datetime.now().strftime("%Y-%m-%d") | |
} | |
results.append(profile) | |
elif source == "news": | |
news = { | |
"name": name, | |
"source": "News Articles", | |
"mentions": [ | |
{ | |
"title": f"Article about {name}", | |
"url": "https://example.com/news", | |
"date": "2023-01-01" | |
} | |
] | |
} | |
results.append(news) | |
elif source == "public_records": | |
record = { | |
"name": name, | |
"source": "Public Records", | |
"location": location, | |
"age_range": "25-35", | |
"possible_relatives": ["Jane Doe", "John Doe Sr."], | |
"previous_locations": ["New York, NY", "Los Angeles, CA"] | |
} | |
results.append(record) | |
elif source == "professional": | |
prof = { | |
"name": name, | |
"source": "Professional Records", | |
"education": ["University Example"], | |
"work_history": ["Company A", "Company B"], | |
"skills": ["Leadership", "Management"] | |
} | |
results.append(prof) | |
return results | |
async def get_person_details(self, person_id: str) -> Dict: | |
"""Get detailed information about a specific person""" | |
await self.initialize() | |
# Simulate gathering detailed information | |
details = { | |
"personal": { | |
"name": person_id, | |
"age_range": "25-35", | |
"locations": ["Current City, Country", "Previous City, Country"], | |
"education": ["University Name", "High School Name"], | |
"occupation": "Current Occupation" | |
}, | |
"social_media": { | |
"profiles": [ | |
{ | |
"platform": "LinkedIn", | |
"url": f"https://linkedin.com/in/{person_id}", | |
"last_active": "2023-01-01" | |
}, | |
{ | |
"platform": "Twitter", | |
"url": f"https://twitter.com/{person_id}", | |
"last_active": "2023-01-01" | |
} | |
] | |
}, | |
"contact": { | |
"email_pattern": "j***@example.com", | |
"phone_pattern": "+1 (***) ***-**89" | |
}, | |
"images": [ | |
{ | |
"url": "https://example.com/profile1.jpg", | |
"source": "LinkedIn", | |
"date": "2023-01-01" | |
} | |
], | |
"activities": { | |
"recent_posts": [ | |
{ | |
"platform": "Twitter", | |
"content": "Example post content", | |
"date": "2023-01-01" | |
} | |
], | |
"mentions": [ | |
{ | |
"source": "News Article", | |
"title": "Article Title", | |
"url": "https://example.com/article", | |
"date": "2023-01-01" | |
} | |
] | |
} | |
} | |
return details | |
async def analyze_image(self, image_path: str) -> Dict: | |
"""Analyze an image and return information about it""" | |
try: | |
# Open and analyze the image | |
img = Image.open(image_path if os.path.exists(image_path) else io.BytesIO(requests.get(image_path).content)) | |
analysis = { | |
"format": img.format, | |
"size": f"{img.size[0]}x{img.size[1]}", | |
"mode": img.mode, | |
"metadata": {}, | |
} | |
# Extract EXIF data if available | |
if hasattr(img, '_getexif') and img._getexif(): | |
exif = img._getexif() | |
if exif: | |
analysis["metadata"] = { | |
"datetime": exif.get(306, "Unknown"), | |
"make": exif.get(271, "Unknown"), | |
"model": exif.get(272, "Unknown"), | |
"software": exif.get(305, "Unknown") | |
} | |
return analysis | |
except Exception as e: | |
return {"error": str(e)} | |
async def find_similar_images(self, image_url: str) -> List[Dict]: | |
"""Find similar images""" | |
# Simulate finding similar images | |
return [ | |
{ | |
"url": "https://example.com/similar1.jpg", | |
"similarity": 0.95, | |
"source": "Website A" | |
}, | |
{ | |
"url": "https://example.com/similar2.jpg", | |
"similarity": 0.85, | |
"source": "Website B" | |
} | |
] | |
async def get_location_info(self, location: str) -> Dict: | |
"""Get information about a location""" | |
# Simulate location information retrieval | |
return { | |
"name": location, | |
"coordinates": {"lat": 40.7128, "lng": -74.0060}, | |
"country": "United States", | |
"timezone": "America/New_York", | |
"population": "8.4 million", | |
"weather": "Sunny, 72°F" | |
} | |
async def get_domain_info(self, domain: str) -> Dict: | |
"""Get information about a domain""" | |
# Simulate domain information retrieval | |
return { | |
"domain": domain, | |
"registrar": "Example Registrar", | |
"creation_date": "2020-01-01", | |
"expiration_date": "2024-01-01", | |
"nameservers": ["ns1.example.com", "ns2.example.com"], | |
"ip_address": "192.0.2.1", | |
"location": "United States" | |
} | |
# Helper function to create document from gathered information | |
def create_report(data: Dict[str, Any], template: str = "default") -> str: | |
"""Create a formatted report from gathered information""" | |
if template == "default": | |
report = "# OSINT Investigation Report\n\n" | |
report += f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" | |
for section, content in data.items(): | |
report += f"## {section.title()}\n" | |
if isinstance(content, dict): | |
for key, value in content.items(): | |
report += f"* {key}: {value}\n" | |
elif isinstance(content, list): | |
for item in content: | |
if isinstance(item, dict): | |
for k, v in item.items(): | |
report += f"* {k}: {v}\n" | |
else: | |
report += f"* {item}\n" | |
else: | |
report += f"{content}\n" | |
report += "\n" | |
return report | |
else: | |
raise ValueError(f"Template '{template}' not found") | |
async def create_report_from_data(data: Dict) -> Dict: | |
"""Create a formatted report from the gathered data""" | |
engine = OSINTEngine() | |
try: | |
report = {} | |
if "username" in data: | |
report["platforms"] = await engine.check_username(data["username"], data.get("platform", "all")) | |
if "image_url" in data: | |
report["analysis"] = await engine.analyze_image(data["image_url"]) | |
report["similar_images"] = await engine.find_similar_images(data["image_url"]) | |
if "location" in data: | |
report["location"] = await engine.get_location_info(data["location"]) | |
if "domain" in data: | |
report["domain"] = await engine.get_domain_info(data["domain"]) | |
if "name" in data: | |
report["matches"] = await engine.search_person(data["name"], data.get("location")) | |
if "person_id" in data: | |
report["details"] = await engine.get_person_details(data["person_id"]) | |
await engine.close() | |
return report | |
except Exception as e: | |
await engine.close() | |
return {"error": str(e)} | |