Spaces:
Runtime error
Runtime error
""" | |
OSINT engine for comprehensive information gathering. | |
""" | |
from typing import Dict, List, Any, Optional | |
import asyncio | |
import json | |
from dataclasses import dataclass | |
import holehe.core as holehe | |
from sherlock import sherlock | |
import face_recognition | |
import numpy as np | |
from PIL import Image | |
import io | |
import requests | |
from geopy.geocoders import Nominatim | |
from geopy.exc import GeocoderTimedOut | |
import whois | |
from datetime import datetime | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
class PersonInfo: | |
name: str | |
age: Optional[int] = None | |
location: Optional[str] = None | |
gender: Optional[str] = None | |
social_profiles: List[Dict[str, str]] = None | |
images: List[str] = None | |
def to_dict(self) -> Dict[str, Any]: | |
return { | |
"name": self.name, | |
"age": self.age, | |
"location": self.location, | |
"gender": self.gender, | |
"social_profiles": self.social_profiles or [], | |
"images": self.images or [] | |
} | |
class OSINTEngine: | |
def __init__(self): | |
self.geolocator = Nominatim(user_agent="intelligent_search_engine") | |
self.known_platforms = [ | |
"Twitter", "Instagram", "Facebook", "LinkedIn", "GitHub", | |
"Reddit", "YouTube", "TikTok", "Pinterest", "Snapchat", | |
"Twitch", "Medium", "Dev.to", "Stack Overflow" | |
] | |
async def search_username(self, username: str) -> Dict[str, Any]: | |
"""Search for username across multiple platforms.""" | |
results = [] | |
# Use holehe for email-based search | |
email = f"{username}@gmail.com" # Example email | |
holehe_results = await holehe.check_email(email) | |
# Use sherlock for username search | |
sherlock_results = sherlock.sherlock(username, self.known_platforms, verbose=False) | |
# Combine results | |
for platform, data in {**holehe_results, **sherlock_results}.items(): | |
if data.get("exists", False): | |
results.append({ | |
"platform": platform, | |
"url": data.get("url", ""), | |
"confidence": data.get("confidence", "high") | |
}) | |
return { | |
"username": username, | |
"found_on": results | |
} | |
async def search_person(self, name: str, location: Optional[str] = None, | |
age: Optional[int] = None, gender: Optional[str] = None) -> PersonInfo: | |
"""Search for information about a person.""" | |
person = PersonInfo( | |
name=name, | |
age=age, | |
location=location, | |
gender=gender | |
) | |
# Initialize social profiles list | |
person.social_profiles = [] | |
# Search for social media profiles | |
username_variants = [ | |
name.replace(" ", ""), | |
name.replace(" ", "_"), | |
name.replace(" ", "."), | |
name.lower().replace(" ", "") | |
] | |
for username in username_variants: | |
results = await self.search_username(username) | |
person.social_profiles.extend(results.get("found_on", [])) | |
return person | |
async def analyze_image(self, image_data: bytes) -> Dict[str, Any]: | |
"""Analyze an image for faces and other identifiable information.""" | |
try: | |
# Load image | |
image = face_recognition.load_image_file(io.BytesIO(image_data)) | |
# Detect faces | |
face_locations = face_recognition.face_locations(image) | |
face_encodings = face_recognition.face_encodings(image, face_locations) | |
results = { | |
"faces_found": len(face_locations), | |
"faces": [] | |
} | |
# Analyze each face | |
for i, (face_encoding, face_location) in enumerate(zip(face_encodings, face_locations)): | |
face_data = { | |
"location": { | |
"top": face_location[0], | |
"right": face_location[1], | |
"bottom": face_location[2], | |
"left": face_location[3] | |
} | |
} | |
results["faces"].append(face_data) | |
return results | |
except Exception as e: | |
return {"error": str(e)} | |
async def search_location(self, location: str) -> Dict[str, Any]: | |
"""Gather information about a location.""" | |
try: | |
# Geocode the location | |
location_data = self.geolocator.geocode(location, timeout=10) | |
if not location_data: | |
return {"error": "Location not found"} | |
return { | |
"address": location_data.address, | |
"latitude": location_data.latitude, | |
"longitude": location_data.longitude, | |
"raw": location_data.raw | |
} | |
except GeocoderTimedOut: | |
return {"error": "Geocoding service timed out"} | |
except Exception as e: | |
return {"error": str(e)} | |
async def analyze_domain(self, domain: str) -> Dict[str, Any]: | |
"""Analyze a domain for WHOIS and other information.""" | |
try: | |
w = whois.whois(domain) | |
return { | |
"registrar": w.registrar, | |
"creation_date": w.creation_date, | |
"expiration_date": w.expiration_date, | |
"last_updated": w.updated_date, | |
"status": w.status, | |
"name_servers": w.name_servers | |
} | |
except Exception as e: | |
return {"error": str(e)} | |