Spaces:
Runtime error
Runtime error
import os | |
import re | |
import json | |
import time | |
import asyncio | |
import aiohttp | |
import requests | |
import instaloader | |
import face_recognition | |
import numpy as np | |
from PIL import Image | |
from io import BytesIO | |
from typing import Dict, List, Any, Union | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from webdriver_manager.chrome import ChromeDriverManager | |
from holehe.core import * | |
from sherlock import sherlock | |
from geopy.geocoders import Nominatim | |
from waybackpy import WaybackMachineCDXServerAPI | |
import phonenumbers | |
from phonenumbers import geocoder, carrier, timezone | |
import whois | |
from datetime import datetime | |
class OSINTEngine: | |
"""OSINT capabilities for advanced information gathering""" | |
def __init__(self): | |
self.chrome_options = Options() | |
self.chrome_options.add_argument('--headless') | |
self.chrome_options.add_argument('--no-sandbox') | |
self.chrome_options.add_argument('--disable-dev-shm-usage') | |
self.setup_apis() | |
def setup_apis(self): | |
"""Initialize API clients""" | |
self.instagram = instaloader.Instaloader() | |
self.geolocator = Nominatim(user_agent="intelligent_search") | |
async def search_username(self, username: str) -> Dict[str, Any]: | |
"""Search for username across multiple platforms""" | |
results = {} | |
# Sherlock search | |
sherlock_results = await self.sherlock_search(username) | |
results['platforms'] = sherlock_results | |
# Email search | |
email_results = await self.search_email(f"{username}@gmail.com") | |
results['email'] = email_results | |
return results | |
async def sherlock_search(self, username: str) -> List[Dict[str, str]]: | |
"""Search username using Sherlock""" | |
results = [] | |
sites = sherlock.site_data() | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for site_name, site_data in sites.items(): | |
task = self.check_username(session, username, site_name, site_data) | |
tasks.append(task) | |
results = await asyncio.gather(*tasks) | |
return [r for r in results if r is not None] | |
async def check_username(self, session, username: str, site_name: str, site_data: Dict) -> Dict[str, str]: | |
"""Check username on a specific platform""" | |
url = site_data.get('url', '').format(username=username) | |
if not url: | |
return None | |
try: | |
async with session.get(url) as response: | |
if response.status == 200: | |
return { | |
'platform': site_name, | |
'url': url, | |
'found': True | |
} | |
except: | |
pass | |
return None | |
async def search_email(self, email: str) -> Dict[str, Any]: | |
"""Search for email presence on various platforms""" | |
results = {} | |
modules = get_functions() | |
for module in modules: | |
try: | |
out = await module(email) | |
if out: | |
results[module.__name__] = out | |
except: | |
continue | |
return results | |
async def search_image(self, image_url: str) -> Dict[str, Any]: | |
"""Reverse image search and face recognition""" | |
results = {} | |
try: | |
# Download image | |
response = requests.get(image_url) | |
img = Image.open(BytesIO(response.content)) | |
# Convert to face_recognition format | |
img_array = np.array(img) | |
face_locations = face_recognition.face_locations(img_array) | |
face_encodings = face_recognition.face_encodings(img_array, face_locations) | |
results['faces_found'] = len(face_locations) | |
results['face_locations'] = face_locations | |
# Perform reverse image search | |
results['reverse_search'] = await self.reverse_image_search(image_url) | |
except Exception as e: | |
results['error'] = str(e) | |
return results | |
async def reverse_image_search(self, image_url: str) -> List[Dict[str, str]]: | |
"""Perform reverse image search""" | |
results = [] | |
try: | |
driver = webdriver.Chrome( | |
service=Service(ChromeDriverManager().install()), | |
options=self.chrome_options | |
) | |
# Google Images | |
search_url = f"https://lens.google.com/uploadbyurl?url={image_url}" | |
driver.get(search_url) | |
time.sleep(3) | |
# Extract results (simplified) | |
results.append({ | |
'source': 'Google Lens', | |
'url': driver.current_url | |
}) | |
driver.quit() | |
except Exception as e: | |
results.append({'error': str(e)}) | |
return results | |
async def gather_personal_info(self, data: Dict[str, str]) -> Dict[str, Any]: | |
"""Gather personal information from various sources""" | |
results = {} | |
if 'phone' in data: | |
results['phone'] = self.analyze_phone_number(data['phone']) | |
if 'location' in data: | |
results['location'] = await self.analyze_location(data['location']) | |
if 'domain' in data: | |
results['domain'] = self.analyze_domain(data['domain']) | |
return results | |
def analyze_phone_number(self, phone: str) -> Dict[str, Any]: | |
"""Analyze phone number information""" | |
try: | |
number = phonenumbers.parse(phone) | |
return { | |
'valid': phonenumbers.is_valid_number(number), | |
'type': phonenumbers.number_type(number), | |
'country': geocoder.description_for_number(number, "en"), | |
'carrier': carrier.name_for_number(number, "en"), | |
'timezone': timezone.time_zones_for_number(number) | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
async def analyze_location(self, location: str) -> Dict[str, Any]: | |
"""Analyze location information""" | |
try: | |
location_data = self.geolocator.geocode(location) | |
if location_data: | |
return { | |
'address': location_data.address, | |
'latitude': location_data.latitude, | |
'longitude': location_data.longitude, | |
'raw': location_data.raw | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
return None | |
def analyze_domain(self, domain: str) -> Dict[str, Any]: | |
"""Analyze domain information""" | |
try: | |
domain_info = whois.whois(domain) | |
return { | |
'registrar': domain_info.registrar, | |
'creation_date': domain_info.creation_date, | |
'expiration_date': domain_info.expiration_date, | |
'last_updated': domain_info.updated_date, | |
'status': domain_info.status | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
async def search_social_media(self, username: str, platform: str = None) -> Dict[str, Any]: | |
"""Search for user information on social media platforms""" | |
results = {} | |
if platform: | |
platforms = [platform] | |
else: | |
platforms = ['instagram', 'twitter', 'reddit'] | |
for platform in platforms: | |
try: | |
if platform == 'instagram': | |
results['instagram'] = await self.search_instagram(username) | |
elif platform == 'twitter': | |
results['twitter'] = await self.search_twitter(username) | |
elif platform == 'reddit': | |
results['reddit'] = await self.search_reddit(username) | |
except Exception as e: | |
results[platform] = {'error': str(e)} | |
return results | |
async def search_instagram(self, username: str) -> Dict[str, Any]: | |
"""Search Instagram for user information""" | |
try: | |
profile = instaloader.Profile.from_username(self.instagram.context, username) | |
return { | |
'username': profile.username, | |
'full_name': profile.full_name, | |
'biography': profile.biography, | |
'followers': profile.followers, | |
'following': profile.followees, | |
'is_private': profile.is_private, | |
'is_verified': profile.is_verified, | |
'external_url': profile.external_url, | |
'posts_count': profile.mediacount, | |
'profile_pic_url': profile.profile_pic_url | |
} | |
except Exception as e: | |
return {'error': str(e)} | |
async def search_historical_data(self, url: str) -> List[Dict[str, Any]]: | |
"""Search for historical data using Wayback Machine""" | |
results = [] | |
try: | |
user_agent = "Mozilla/5.0" | |
cdx = WaybackMachineCDXServerAPI(url, user_agent) | |
for snapshot in cdx.snapshots(): | |
results.append({ | |
'timestamp': snapshot.timestamp, | |
'url': snapshot.archive_url, | |
'status': snapshot.status_code, | |
'mime_type': snapshot.mime_type | |
}) | |
except Exception as e: | |
results.append({'error': str(e)}) | |
return results | |
def solve_captcha(self, image_url: str) -> str: | |
"""Solve CAPTCHA using OCR (simplified version)""" | |
try: | |
response = requests.get(image_url) | |
img = Image.open(BytesIO(response.content)) | |
# Add your CAPTCHA solving logic here | |
return "CAPTCHA solution placeholder" | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Helper function to create document from gathered information | |
def create_report(data: Dict[str, Any], template: str = "default") -> str: | |
"""Create a formatted report from gathered information""" | |
if template == "default": | |
report = "# OSINT Investigation Report\n\n" | |
report += f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" | |
for section, content in data.items(): | |
report += f"## {section.title()}\n" | |
if isinstance(content, dict): | |
for key, value in content.items(): | |
report += f"* {key}: {value}\n" | |
elif isinstance(content, list): | |
for item in content: | |
report += f"* {item}\n" | |
else: | |
report += f"{content}\n" | |
report += "\n" | |
return report | |
else: | |
raise ValueError(f"Template '{template}' not found") | |