fikird commited on
Commit
9dd0b76
·
1 Parent(s): 12d42ff

Simplify dependencies and update OSINT engine for better compatibility

Browse files
Files changed (2) hide show
  1. osint_engine.py +57 -157
  2. requirements.txt +5 -14
osint_engine.py CHANGED
@@ -5,9 +5,7 @@ import time
5
  import asyncio
6
  import aiohttp
7
  import requests
8
- import instaloader
9
- import face_recognition
10
- import numpy as np
11
  from PIL import Image
12
  from io import BytesIO
13
  from typing import Dict, List, Any, Union
@@ -15,14 +13,11 @@ from selenium import webdriver
15
  from selenium.webdriver.chrome.options import Options
16
  from selenium.webdriver.chrome.service import Service
17
  from webdriver_manager.chrome import ChromeDriverManager
18
- from holehe.core import *
19
- from sherlock import sherlock
20
  from geopy.geocoders import Nominatim
21
  from waybackpy import WaybackMachineCDXServerAPI
22
- import phonenumbers
23
- from phonenumbers import geocoder, carrier, timezone
24
  import whois
25
  from datetime import datetime
 
26
 
27
  class OSINTEngine:
28
  """OSINT capabilities for advanced information gathering"""
@@ -36,120 +31,89 @@ class OSINTEngine:
36
 
37
  def setup_apis(self):
38
  """Initialize API clients"""
39
- self.instagram = instaloader.Instaloader()
40
  self.geolocator = Nominatim(user_agent="intelligent_search")
 
41
 
42
  async def search_username(self, username: str) -> Dict[str, Any]:
43
  """Search for username across multiple platforms"""
44
- results = {}
45
-
46
- # Sherlock search
47
- sherlock_results = await self.sherlock_search(username)
48
- results['platforms'] = sherlock_results
49
 
50
- # Email search
51
- email_results = await self.search_email(f"{username}@gmail.com")
52
- results['email'] = email_results
53
-
54
- return results
55
-
56
- async def sherlock_search(self, username: str) -> List[Dict[str, str]]:
57
- """Search username using Sherlock"""
58
- results = []
59
- sites = sherlock.site_data()
60
 
61
  async with aiohttp.ClientSession() as session:
62
  tasks = []
63
- for site_name, site_data in sites.items():
64
- task = self.check_username(session, username, site_name, site_data)
65
  tasks.append(task)
66
 
67
- results = await asyncio.gather(*tasks)
68
- return [r for r in results if r is not None]
 
 
 
 
 
 
 
 
 
 
69
 
70
- async def check_username(self, session, username: str, site_name: str, site_data: Dict) -> Dict[str, str]:
71
- """Check username on a specific platform"""
72
- url = site_data.get('url', '').format(username=username)
73
- if not url:
74
- return None
75
-
76
  try:
77
  async with session.get(url) as response:
78
  if response.status == 200:
79
  return {
80
- 'platform': site_name,
81
  'url': url,
82
- 'found': True
83
  }
84
  except:
85
  pass
86
  return None
87
 
88
- async def search_email(self, email: str) -> Dict[str, Any]:
89
- """Search for email presence on various platforms"""
90
- results = {}
91
- modules = get_functions()
92
-
93
- for module in modules:
94
- try:
95
- out = await module(email)
96
- if out:
97
- results[module.__name__] = out
98
- except:
99
- continue
100
-
101
- return results
102
-
103
  async def search_image(self, image_url: str) -> Dict[str, Any]:
104
- """Reverse image search and face recognition"""
105
- results = {}
 
 
 
 
106
 
107
  try:
108
- # Download image
109
  response = requests.get(image_url)
110
  img = Image.open(BytesIO(response.content))
111
 
112
- # Convert to face_recognition format
113
- img_array = np.array(img)
114
- face_locations = face_recognition.face_locations(img_array)
115
- face_encodings = face_recognition.face_encodings(img_array, face_locations)
116
-
117
- results['faces_found'] = len(face_locations)
118
- results['face_locations'] = face_locations
119
-
120
- # Perform reverse image search
121
- results['reverse_search'] = await self.reverse_image_search(image_url)
122
-
123
- except Exception as e:
124
- results['error'] = str(e)
125
-
126
- return results
127
-
128
- async def reverse_image_search(self, image_url: str) -> List[Dict[str, str]]:
129
- """Perform reverse image search"""
130
- results = []
131
-
132
- try:
133
- driver = webdriver.Chrome(
134
- service=Service(ChromeDriverManager().install()),
135
- options=self.chrome_options
136
- )
137
 
138
- # Google Images
139
  search_url = f"https://lens.google.com/uploadbyurl?url={image_url}"
140
- driver.get(search_url)
141
- time.sleep(3)
142
-
143
- # Extract results (simplified)
144
- results.append({
145
  'source': 'Google Lens',
146
- 'url': driver.current_url
147
  })
148
 
149
- driver.quit()
150
-
151
  except Exception as e:
152
- results.append({'error': str(e)})
153
 
154
  return results
155
 
@@ -157,9 +121,6 @@ class OSINTEngine:
157
  """Gather personal information from various sources"""
158
  results = {}
159
 
160
- if 'phone' in data:
161
- results['phone'] = self.analyze_phone_number(data['phone'])
162
-
163
  if 'location' in data:
164
  results['location'] = await self.analyze_location(data['location'])
165
 
@@ -168,20 +129,6 @@ class OSINTEngine:
168
 
169
  return results
170
 
171
- def analyze_phone_number(self, phone: str) -> Dict[str, Any]:
172
- """Analyze phone number information"""
173
- try:
174
- number = phonenumbers.parse(phone)
175
- return {
176
- 'valid': phonenumbers.is_valid_number(number),
177
- 'type': phonenumbers.number_type(number),
178
- 'country': geocoder.description_for_number(number, "en"),
179
- 'carrier': carrier.name_for_number(number, "en"),
180
- 'timezone': timezone.time_zones_for_number(number)
181
- }
182
- except Exception as e:
183
- return {'error': str(e)}
184
-
185
  async def analyze_location(self, location: str) -> Dict[str, Any]:
186
  """Analyze location information"""
187
  try:
@@ -211,47 +158,6 @@ class OSINTEngine:
211
  except Exception as e:
212
  return {'error': str(e)}
213
 
214
- async def search_social_media(self, username: str, platform: str = None) -> Dict[str, Any]:
215
- """Search for user information on social media platforms"""
216
- results = {}
217
-
218
- if platform:
219
- platforms = [platform]
220
- else:
221
- platforms = ['instagram', 'twitter', 'reddit']
222
-
223
- for platform in platforms:
224
- try:
225
- if platform == 'instagram':
226
- results['instagram'] = await self.search_instagram(username)
227
- elif platform == 'twitter':
228
- results['twitter'] = await self.search_twitter(username)
229
- elif platform == 'reddit':
230
- results['reddit'] = await self.search_reddit(username)
231
- except Exception as e:
232
- results[platform] = {'error': str(e)}
233
-
234
- return results
235
-
236
- async def search_instagram(self, username: str) -> Dict[str, Any]:
237
- """Search Instagram for user information"""
238
- try:
239
- profile = instaloader.Profile.from_username(self.instagram.context, username)
240
- return {
241
- 'username': profile.username,
242
- 'full_name': profile.full_name,
243
- 'biography': profile.biography,
244
- 'followers': profile.followers,
245
- 'following': profile.followees,
246
- 'is_private': profile.is_private,
247
- 'is_verified': profile.is_verified,
248
- 'external_url': profile.external_url,
249
- 'posts_count': profile.mediacount,
250
- 'profile_pic_url': profile.profile_pic_url
251
- }
252
- except Exception as e:
253
- return {'error': str(e)}
254
-
255
  async def search_historical_data(self, url: str) -> List[Dict[str, Any]]:
256
  """Search for historical data using Wayback Machine"""
257
  results = []
@@ -273,16 +179,6 @@ class OSINTEngine:
273
 
274
  return results
275
 
276
- def solve_captcha(self, image_url: str) -> str:
277
- """Solve CAPTCHA using OCR (simplified version)"""
278
- try:
279
- response = requests.get(image_url)
280
- img = Image.open(BytesIO(response.content))
281
- # Add your CAPTCHA solving logic here
282
- return "CAPTCHA solution placeholder"
283
- except Exception as e:
284
- return f"Error: {str(e)}"
285
-
286
  # Helper function to create document from gathered information
287
  def create_report(data: Dict[str, Any], template: str = "default") -> str:
288
  """Create a formatted report from gathered information"""
@@ -297,7 +193,11 @@ def create_report(data: Dict[str, Any], template: str = "default") -> str:
297
  report += f"* {key}: {value}\n"
298
  elif isinstance(content, list):
299
  for item in content:
300
- report += f"* {item}\n"
 
 
 
 
301
  else:
302
  report += f"{content}\n"
303
  report += "\n"
 
5
  import asyncio
6
  import aiohttp
7
  import requests
8
+ import httpx
 
 
9
  from PIL import Image
10
  from io import BytesIO
11
  from typing import Dict, List, Any, Union
 
13
  from selenium.webdriver.chrome.options import Options
14
  from selenium.webdriver.chrome.service import Service
15
  from webdriver_manager.chrome import ChromeDriverManager
 
 
16
  from geopy.geocoders import Nominatim
17
  from waybackpy import WaybackMachineCDXServerAPI
 
 
18
  import whois
19
  from datetime import datetime
20
+ from googlesearch import search as google_search
21
 
22
  class OSINTEngine:
23
  """OSINT capabilities for advanced information gathering"""
 
31
 
32
  def setup_apis(self):
33
  """Initialize API clients"""
 
34
  self.geolocator = Nominatim(user_agent="intelligent_search")
35
+ self.http_client = httpx.AsyncClient()
36
 
37
  async def search_username(self, username: str) -> Dict[str, Any]:
38
  """Search for username across multiple platforms"""
39
+ results = {
40
+ 'platforms': [],
41
+ 'social_media': {},
42
+ 'websites': []
43
+ }
44
 
45
+ # Common social media platforms
46
+ platforms = [
47
+ {'name': 'GitHub', 'url': f'https://github.com/{username}'},
48
+ {'name': 'Twitter', 'url': f'https://twitter.com/{username}'},
49
+ {'name': 'Instagram', 'url': f'https://instagram.com/{username}'},
50
+ {'name': 'LinkedIn', 'url': f'https://linkedin.com/in/{username}'},
51
+ {'name': 'Facebook', 'url': f'https://facebook.com/{username}'},
52
+ {'name': 'YouTube', 'url': f'https://youtube.com/@{username}'},
53
+ ]
 
54
 
55
  async with aiohttp.ClientSession() as session:
56
  tasks = []
57
+ for platform in platforms:
58
+ task = self.check_profile(session, platform['url'], platform['name'])
59
  tasks.append(task)
60
 
61
+ platform_results = await asyncio.gather(*tasks)
62
+ results['platforms'] = [r for r in platform_results if r is not None]
63
+
64
+ # Google search for additional mentions
65
+ try:
66
+ search_query = f'"{username}" OR "@{username}" -site:twitter.com -site:facebook.com -site:instagram.com'
67
+ web_results = list(google_search(search_query, num_results=5))
68
+ results['websites'] = web_results
69
+ except Exception as e:
70
+ results['websites'] = [str(e)]
71
+
72
+ return results
73
 
74
+ async def check_profile(self, session, url: str, platform: str) -> Dict[str, str]:
75
+ """Check if a profile exists on a platform"""
 
 
 
 
76
  try:
77
  async with session.get(url) as response:
78
  if response.status == 200:
79
  return {
80
+ 'platform': platform,
81
  'url': url,
82
+ 'exists': True
83
  }
84
  except:
85
  pass
86
  return None
87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  async def search_image(self, image_url: str) -> Dict[str, Any]:
89
+ """Image analysis and reverse search"""
90
+ results = {
91
+ 'analysis': {},
92
+ 'similar_images': [],
93
+ 'error': None
94
+ }
95
 
96
  try:
97
+ # Download and analyze image
98
  response = requests.get(image_url)
99
  img = Image.open(BytesIO(response.content))
100
 
101
+ # Basic image analysis
102
+ results['analysis'] = {
103
+ 'format': img.format,
104
+ 'size': img.size,
105
+ 'mode': img.mode
106
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
 
108
+ # Perform reverse image search using Google Lens
109
  search_url = f"https://lens.google.com/uploadbyurl?url={image_url}"
110
+ results['similar_images'].append({
 
 
 
 
111
  'source': 'Google Lens',
112
+ 'url': search_url
113
  })
114
 
 
 
115
  except Exception as e:
116
+ results['error'] = str(e)
117
 
118
  return results
119
 
 
121
  """Gather personal information from various sources"""
122
  results = {}
123
 
 
 
 
124
  if 'location' in data:
125
  results['location'] = await self.analyze_location(data['location'])
126
 
 
129
 
130
  return results
131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  async def analyze_location(self, location: str) -> Dict[str, Any]:
133
  """Analyze location information"""
134
  try:
 
158
  except Exception as e:
159
  return {'error': str(e)}
160
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
  async def search_historical_data(self, url: str) -> List[Dict[str, Any]]:
162
  """Search for historical data using Wayback Machine"""
163
  results = []
 
179
 
180
  return results
181
 
 
 
 
 
 
 
 
 
 
 
182
  # Helper function to create document from gathered information
183
  def create_report(data: Dict[str, Any], template: str = "default") -> str:
184
  """Create a formatted report from gathered information"""
 
193
  report += f"* {key}: {value}\n"
194
  elif isinstance(content, list):
195
  for item in content:
196
+ if isinstance(item, dict):
197
+ for k, v in item.items():
198
+ report += f"* {k}: {v}\n"
199
+ else:
200
+ report += f"* {item}\n"
201
  else:
202
  report += f"{content}\n"
203
  report += "\n"
requirements.txt CHANGED
@@ -1,5 +1,5 @@
1
  gradio==4.14.0
2
- torch==2.1.0
3
  transformers==4.35.2
4
  duckduckgo-search==3.9.3
5
  beautifulsoup4==4.12.2
@@ -11,18 +11,9 @@ protobuf==4.25.1
11
  pillow==10.1.0
12
  selenium==4.15.2
13
  webdriver-manager==4.0.1
14
- socid-extractor==0.0.24
15
- holehe==1.61
16
- sherlock3==0.1
17
- python-magic==0.4.27
18
- face-recognition==1.3.0
19
- opencv-python-headless==4.8.1.78
20
- googlesearch-python==1.2.3
21
- instaloader==4.10.1
22
- tweepy==4.14.0
23
- praw==7.7.1
24
- geopy==2.4.1
25
- phonenumbers==8.13.24
26
- python-whois==0.8.0
27
  aiohttp==3.9.1
 
 
 
 
28
  waybackpy==3.0.6
 
1
  gradio==4.14.0
2
+ torch==2.1.0+cpu
3
  transformers==4.35.2
4
  duckduckgo-search==3.9.3
5
  beautifulsoup4==4.12.2
 
11
  pillow==10.1.0
12
  selenium==4.15.2
13
  webdriver-manager==4.0.1
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  aiohttp==3.9.1
15
+ python-whois==0.8.0
16
+ geopy==2.4.1
17
+ httpx==0.25.2
18
+ googlesearch-python==1.2.3
19
  waybackpy==3.0.6