fikir ashenafi commited on
Commit
f6b5fd2
·
verified ·
1 Parent(s): 5bc1e89

Update engines/osint.py

Browse files
Files changed (1) hide show
  1. engines/osint.py +146 -159
engines/osint.py CHANGED
@@ -1,202 +1,189 @@
1
  """
2
- OSINT engine for comprehensive information gathering.
3
  """
4
  from typing import Dict, List, Any, Optional
5
  import asyncio
6
  import json
7
- from dataclasses import dataclass
8
- import holehe.core as holehe
9
- import subprocess
10
- import tempfile
11
- import os
12
- import face_recognition
13
- import numpy as np
14
- from PIL import Image
15
- import io
16
  import requests
17
- from geopy.geocoders import Nominatim
18
- from geopy.exc import GeocoderTimedOut
19
  import whois
20
- from datetime import datetime
 
21
  from tenacity import retry, stop_after_attempt, wait_exponential
22
-
23
- @dataclass
24
- class PersonInfo:
25
- name: str
26
- age: Optional[int] = None
27
- location: Optional[str] = None
28
- gender: Optional[str] = None
29
- social_profiles: List[Dict[str, str]] = None
30
- images: List[str] = None
31
-
32
- def to_dict(self) -> Dict[str, Any]:
33
- return {
34
- "name": self.name,
35
- "age": self.age,
36
- "location": self.location,
37
- "gender": self.gender,
38
- "social_profiles": self.social_profiles or [],
39
- "images": self.images or []
40
- }
41
 
42
  class OSINTEngine:
43
  def __init__(self):
44
- self.geolocator = Nominatim(user_agent="intelligent_search_engine")
 
45
  self.known_platforms = [
46
- "Twitter", "Instagram", "Facebook", "LinkedIn", "GitHub",
47
- "Reddit", "YouTube", "TikTok", "Pinterest", "Snapchat",
48
- "Twitch", "Medium", "Dev.to", "Stack Overflow"
49
  ]
50
 
51
- @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
52
  async def search_username(self, username: str) -> Dict[str, Any]:
53
- """Search for username across multiple platforms."""
54
  results = {
55
- "username": username,
56
- "found_on": []
 
57
  }
58
 
59
- # Create a temporary file for sherlock results
60
- with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') as tmp:
61
- tmp_path = tmp.name
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
 
 
63
  try:
64
- # Run sherlock as a subprocess
65
- process = subprocess.Popen(
66
- ["sherlock", username, "--output", tmp_path],
67
- stdout=subprocess.PIPE,
68
- stderr=subprocess.PIPE
69
- )
70
- stdout, stderr = process.communicate()
 
 
 
 
 
 
71
 
72
- # Read results from the temporary file
73
- if os.path.exists(tmp_path):
74
- with open(tmp_path, 'r') as f:
75
- for line in f:
76
- if "[+]" in line: # Found profile
77
- platform = line.split("[+]")[1].split(":")[0].strip()
78
- url = line.split(":")[-1].strip()
79
- results["found_on"].append({
80
- "platform": platform,
81
- "url": url
82
- })
83
- elif "[-]" in line: # Not found
84
- platform = line.split("[-]")[1].split(":")[0].strip()
85
- results["found_on"].append({
86
- "platform": platform,
87
- "url": ""
88
- })
89
-
90
- # Clean up temp file
91
- os.unlink(tmp_path)
92
  except Exception as e:
93
- print(f"Error running sherlock: {e}")
94
-
95
- # Use holehe for email-based search
96
- email = f"{username}@gmail.com" # Example email
97
- holehe_results = await holehe.check_email(email)
98
-
99
- # Combine results
100
- for platform, data in holehe_results.items():
101
- if data.get("exists", False):
102
- results["found_on"].append({
103
- "platform": platform,
104
- "url": data.get("url", ""),
105
- "confidence": data.get("confidence", "high")
106
- })
107
 
108
  return results
109
 
110
- async def search_person(self, name: str, location: Optional[str] = None,
111
- age: Optional[int] = None, gender: Optional[str] = None) -> PersonInfo:
112
- """Search for information about a person."""
113
- person = PersonInfo(
114
- name=name,
115
- age=age,
116
- location=location,
117
- gender=gender
118
- )
119
-
120
- # Initialize social profiles list
121
- person.social_profiles = []
122
-
123
- # Search for social media profiles
124
- username_variants = [
125
- name.replace(" ", ""),
126
- name.replace(" ", "_"),
127
- name.replace(" ", "."),
128
- name.lower().replace(" ", "")
129
- ]
130
-
131
- for username in username_variants:
132
- results = await self.search_username(username)
133
- person.social_profiles.extend(results.get("found_on", []))
134
-
135
- return person
136
-
137
  @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
138
- async def analyze_image(self, image_data: bytes) -> Dict[str, Any]:
139
- """Analyze an image for faces and other identifiable information."""
 
 
 
 
 
 
 
 
140
  try:
141
- # Load image
142
- image = face_recognition.load_image_file(io.BytesIO(image_data))
 
 
 
 
 
 
 
 
 
 
143
 
144
- # Detect faces
145
- face_locations = face_recognition.face_locations(image)
146
- face_encodings = face_recognition.face_encodings(image, face_locations)
 
 
 
147
 
148
- results = {
149
- "faces_found": len(face_locations),
150
- "faces": []
151
- }
152
 
153
- # Analyze each face
154
- for i, (face_encoding, face_location) in enumerate(zip(face_encodings, face_locations)):
155
- face_data = {
156
- "location": {
157
- "top": face_location[0],
158
- "right": face_location[1],
159
- "bottom": face_location[2],
160
- "left": face_location[3]
161
- }
162
- }
163
- results["faces"].append(face_data)
 
 
164
 
165
- return results
166
  except Exception as e:
167
- return {"error": str(e)}
168
-
169
- async def search_location(self, location: str) -> Dict[str, Any]:
170
- """Gather information about a location."""
171
- try:
172
- # Geocode the location
173
- location_data = self.geolocator.geocode(location, timeout=10)
174
-
175
- if not location_data:
176
- return {"error": "Location not found"}
177
-
178
- return {
179
- "address": location_data.address,
180
- "latitude": location_data.latitude,
181
- "longitude": location_data.longitude,
182
- "raw": location_data.raw
183
- }
184
- except GeocoderTimedOut:
185
- return {"error": "Geocoding service timed out"}
186
- except Exception as e:
187
- return {"error": str(e)}
188
 
189
- async def analyze_domain(self, domain: str) -> Dict[str, Any]:
190
- """Analyze a domain for WHOIS and other information."""
 
191
  try:
192
  w = whois.whois(domain)
193
  return {
 
194
  "registrar": w.registrar,
195
  "creation_date": w.creation_date,
196
  "expiration_date": w.expiration_date,
197
- "last_updated": w.updated_date,
198
  "status": w.status,
199
- "name_servers": w.name_servers
 
 
 
 
 
 
 
 
200
  }
201
  except Exception as e:
202
  return {"error": str(e)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ OSINT engine for username and person search.
3
  """
4
  from typing import Dict, List, Any, Optional
5
  import asyncio
6
  import json
 
 
 
 
 
 
 
 
 
7
  import requests
8
+ from bs4 import BeautifulSoup
 
9
  import whois
10
+ from holehe.core import *
11
+ from geopy.geocoders import Nominatim
12
  from tenacity import retry, stop_after_attempt, wait_exponential
13
+ from duckduckgo_search import DDGS
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
  class OSINTEngine:
16
  def __init__(self):
17
+ self.geolocator = Nominatim(user_agent="ise_search")
18
+ self.holehe_modules = import_submodules("holehe.modules")
19
  self.known_platforms = [
20
+ "twitter.com", "facebook.com", "instagram.com", "linkedin.com",
21
+ "github.com", "youtube.com", "reddit.com", "pinterest.com",
22
+ "medium.com", "tumblr.com", "flickr.com", "vimeo.com"
23
  ]
24
 
 
25
  async def search_username(self, username: str) -> Dict[str, Any]:
26
+ """Search for username across platforms."""
27
  results = {
28
+ "found": [],
29
+ "not_found": [],
30
+ "error": []
31
  }
32
 
33
+ # Manual platform check
34
+ for platform in self.known_platforms:
35
+ try:
36
+ url = f"https://{platform}/{username}"
37
+ response = requests.head(url, timeout=5, allow_redirects=True)
38
+ if response.status_code == 200:
39
+ results["found"].append({
40
+ "platform": platform.split(".")[0].title(),
41
+ "url": url
42
+ })
43
+ else:
44
+ results["not_found"].append(platform.split(".")[0].title())
45
+ except Exception as e:
46
+ results["error"].append({
47
+ "platform": platform,
48
+ "error": str(e)
49
+ })
50
 
51
+ # Run holehe checks
52
  try:
53
+ holehe_results = []
54
+ for module in self.holehe_modules:
55
+ try:
56
+ check_func = getattr(module, "check")
57
+ out = await check_func(username)
58
+ if out and out.get("exists"):
59
+ results["found"].append({
60
+ "platform": out["name"],
61
+ "url": out.get("url", ""),
62
+ "email": out.get("email", "")
63
+ })
64
+ except Exception as e:
65
+ print(f"Error in holehe module {module.__name__}: {e}")
66
 
67
+ results["holehe"] = holehe_results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  except Exception as e:
69
+ print(f"Error running holehe: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
70
 
71
  return results
72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
74
+ async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]:
75
+ """Search for person information."""
76
+ results = {
77
+ "basic_info": {},
78
+ "social_profiles": [],
79
+ "locations": [],
80
+ "possible_relatives": [],
81
+ "error": None
82
+ }
83
+
84
  try:
85
+ # Geocode location if provided
86
+ if location:
87
+ try:
88
+ loc = self.geolocator.geocode(location)
89
+ if loc:
90
+ results["locations"].append({
91
+ "address": loc.address,
92
+ "latitude": loc.latitude,
93
+ "longitude": loc.longitude
94
+ })
95
+ except Exception as e:
96
+ print(f"Error geocoding location: {e}")
97
 
98
+ # Basic search query
99
+ search_query = f"{name}"
100
+ if location:
101
+ search_query += f" {location}"
102
+ if age:
103
+ search_query += f" {age} years old"
104
 
105
+ # Use DuckDuckGo for initial search
106
+ with DDGS() as ddgs:
107
+ search_results = [r for r in ddgs.text(search_query, max_results=10)]
 
108
 
109
+ for result in search_results:
110
+ try:
111
+ url = result["link"]
112
+ # Check if URL is from a known social platform
113
+ if any(platform in url.lower() for platform in self.known_platforms):
114
+ platform = next(p for p in self.known_platforms if p in url.lower())
115
+ results["social_profiles"].append({
116
+ "platform": platform.split(".")[0].title(),
117
+ "url": url,
118
+ "title": result.get("title", "")
119
+ })
120
+ except Exception as e:
121
+ print(f"Error processing search result: {e}")
122
 
 
123
  except Exception as e:
124
+ results["error"] = str(e)
125
+
126
+ return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
 
128
+ @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
129
+ async def domain_lookup(self, domain: str) -> Dict[str, Any]:
130
+ """Perform WHOIS lookup for a domain."""
131
  try:
132
  w = whois.whois(domain)
133
  return {
134
+ "domain_name": w.domain_name,
135
  "registrar": w.registrar,
136
  "creation_date": w.creation_date,
137
  "expiration_date": w.expiration_date,
138
+ "name_servers": w.name_servers,
139
  "status": w.status,
140
+ "emails": w.emails,
141
+ "dnssec": w.dnssec,
142
+ "name": w.name,
143
+ "org": w.org,
144
+ "address": w.address,
145
+ "city": w.city,
146
+ "state": w.state,
147
+ "zipcode": w.zipcode,
148
+ "country": w.country
149
  }
150
  except Exception as e:
151
  return {"error": str(e)}
152
+
153
+ async def analyze_social_profile(self, url: str) -> Dict[str, Any]:
154
+ """Analyze a social media profile."""
155
+ results = {
156
+ "profile_info": {},
157
+ "recent_activity": [],
158
+ "connections": [],
159
+ "error": None
160
+ }
161
+
162
+ try:
163
+ headers = {
164
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
165
+ }
166
+ response = requests.get(url, headers=headers, timeout=10)
167
+ response.raise_for_status()
168
+
169
+ soup = BeautifulSoup(response.text, "html.parser")
170
+
171
+ # Extract basic profile info
172
+ results["profile_info"]["title"] = soup.title.string if soup.title else None
173
+
174
+ # Extract meta information
175
+ for meta in soup.find_all("meta"):
176
+ property = meta.get("property", "")
177
+ content = meta.get("content", "")
178
+
179
+ if "og:title" in property:
180
+ results["profile_info"]["og_title"] = content
181
+ elif "og:description" in property:
182
+ results["profile_info"]["og_description"] = content
183
+ elif "og:image" in property:
184
+ results["profile_info"]["og_image"] = content
185
+
186
+ except Exception as e:
187
+ results["error"] = str(e)
188
+
189
+ return results