Spaces:
Running
Running
""" | |
Web scraper component for Streamlit frontend. | |
This integrates with the backend scraper service. | |
""" | |
import streamlit as st | |
import pandas as pd | |
import plotly.graph_objects as go | |
import time | |
import re | |
import asyncio | |
import httpx | |
from typing import Dict, Any, List, Optional | |
import json | |
import sys | |
import os | |
# Add the src directory to the path so we can import the services | |
sys.path.append(os.path.abspath('.')) | |
try: | |
from src.services.scraper import WebScraper | |
from src.services.tor_proxy import TorProxyService | |
except ImportError: | |
# Fallback if imports fail - we'll use a simplified version | |
WebScraper = None | |
TorProxyService = None | |
# Check if Tor is running | |
def is_tor_running() -> bool: | |
"""Check if Tor service is running and accessible.""" | |
try: | |
with httpx.Client(timeout=3) as client: | |
response = client.get("http://127.0.0.1:9050") | |
return True | |
except Exception: | |
return False | |
# Create a scraper instance | |
async def get_scraper(): | |
"""Get a configured scraper instance.""" | |
if WebScraper and TorProxyService: | |
try: | |
tor_proxy = TorProxyService() | |
# Check if Tor is accessible | |
is_connected = await tor_proxy.check_connection() | |
if is_connected: | |
return WebScraper(tor_proxy_service=tor_proxy) | |
except Exception as e: | |
st.error(f"Error connecting to Tor: {e}") | |
# If we can't connect to Tor or imports failed, return None | |
return None | |
async def extract_content(url: str, use_tor: bool = False) -> Dict[str, Any]: | |
""" | |
Extract content from a URL using the backend scraper. | |
Args: | |
url (str): URL to scrape | |
use_tor (bool): Whether to use Tor proxy | |
Returns: | |
Dict[str, Any]: Extracted content | |
""" | |
scraper = await get_scraper() | |
if scraper: | |
try: | |
return await scraper.extract_content(url, use_tor=use_tor) | |
except Exception as e: | |
st.error(f"Error extracting content: {e}") | |
return { | |
"url": url, | |
"title": "Error extracting content", | |
"text_content": f"Failed to extract content: {e}", | |
"indicators": {}, | |
"links": [] | |
} | |
else: | |
# Fallback to simulated data if scraper is unavailable | |
st.warning("Advanced scraping functionality unavailable. Using limited extraction.") | |
try: | |
with httpx.Client(timeout=10) as client: | |
response = client.get(url) | |
return { | |
"url": url, | |
"title": f"Content from {url}", | |
"text_content": response.text[:1000] + "...", | |
"indicators": {}, | |
"links": [] | |
} | |
except Exception as e: | |
return { | |
"url": url, | |
"title": "Error fetching content", | |
"text_content": f"Failed to fetch content: {e}", | |
"indicators": {}, | |
"links": [] | |
} | |
def render_indicators(indicators: Dict[str, List[str]]): | |
""" | |
Render extracted indicators in a formatted way. | |
Args: | |
indicators (Dict[str, List[str]]): Dictionary of indicator types and values | |
""" | |
if not indicators: | |
st.info("No indicators found in the content.") | |
return | |
# Create tabs for different indicator types | |
tabs = st.tabs([ | |
f"IP Addresses ({len(indicators.get('ip_addresses', []))})", | |
f"Emails ({len(indicators.get('email_addresses', []))})", | |
f"Bitcoin ({len(indicators.get('bitcoin_addresses', []))})", | |
f"URLs ({len(indicators.get('urls', []))})", | |
f"Onion URLs ({len(indicators.get('onion_urls', []))})" | |
]) | |
# IP Addresses | |
with tabs[0]: | |
if indicators.get('ip_addresses'): | |
st.markdown("#### Extracted IP Addresses") | |
ip_df = pd.DataFrame(indicators['ip_addresses'], columns=["IP Address"]) | |
st.dataframe(ip_df, use_container_width=True) | |
else: | |
st.info("No IP addresses found.") | |
# Email Addresses | |
with tabs[1]: | |
if indicators.get('email_addresses'): | |
st.markdown("#### Extracted Email Addresses") | |
email_df = pd.DataFrame(indicators['email_addresses'], columns=["Email"]) | |
st.dataframe(email_df, use_container_width=True) | |
else: | |
st.info("No email addresses found.") | |
# Bitcoin Addresses | |
with tabs[2]: | |
if indicators.get('bitcoin_addresses'): | |
st.markdown("#### Extracted Bitcoin Addresses") | |
btc_df = pd.DataFrame(indicators['bitcoin_addresses'], columns=["Bitcoin Address"]) | |
st.dataframe(btc_df, use_container_width=True) | |
else: | |
st.info("No Bitcoin addresses found.") | |
# URLs | |
with tabs[3]: | |
if indicators.get('urls'): | |
st.markdown("#### Extracted URLs") | |
url_df = pd.DataFrame(indicators['urls'], columns=["URL"]) | |
st.dataframe(url_df, use_container_width=True) | |
else: | |
st.info("No URLs found.") | |
# Onion URLs | |
with tabs[4]: | |
if indicators.get('onion_urls'): | |
st.markdown("#### Extracted Onion URLs") | |
onion_df = pd.DataFrame(indicators['onion_urls'], columns=["Onion URL"]) | |
st.dataframe(onion_df, use_container_width=True) | |
else: | |
st.info("No onion URLs found.") | |
def create_keyword_highlight(text: str, keywords: Optional[List[str]] = None) -> str: | |
""" | |
Highlight keywords in text for display. | |
Args: | |
text (str): Text content to highlight | |
keywords (Optional[List[str]]): Keywords to highlight | |
Returns: | |
str: HTML with highlighted keywords | |
""" | |
if not text or not keywords: | |
return text | |
# Escape HTML | |
text = text.replace('<', '<').replace('>', '>') | |
# Highlight keywords | |
for keyword in keywords: | |
if not keyword.strip(): | |
continue | |
pattern = re.compile(re.escape(keyword), re.IGNORECASE) | |
text = pattern.sub(f'<span style="background-color: #E74C3C40; padding: 0 2px; border-radius: 3px;">{keyword}</span>', text) | |
return text | |
def render_web_scraper_ui(): | |
"""Render the web scraper user interface.""" | |
st.title("Dark Web Intelligence Gathering") | |
# Check if Tor is accessible | |
if is_tor_running(): | |
st.success("Tor service is available for .onion sites") | |
else: | |
st.warning("Tor service not detected. Limited to clearnet sites only.") | |
# Create UI layout | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
st.markdown("### Content Extraction & Analysis") | |
# URL input | |
url = st.text_input( | |
"Enter URL to analyze", | |
value="https://example.com", | |
help="Enter a URL to scrape and analyze. For .onion sites, ensure Tor is configured." | |
) | |
# Options | |
use_tor = st.checkbox( | |
"Use Tor proxy", | |
value='.onion' in url, | |
help="Use Tor proxy for accessing .onion sites or for anonymity" | |
) | |
# Keyword highlighting | |
keywords_input = st.text_area( | |
"Keywords to highlight (one per line)", | |
value="example\ndata\nbreach", | |
help="Enter keywords to highlight in the extracted content" | |
) | |
keywords = [k.strip() for k in keywords_input.split('\n') if k.strip()] | |
# Extract button | |
extract_button = st.button("Extract Content") | |
with col2: | |
st.markdown("### Analysis Options") | |
analysis_tabs = st.radio( | |
"Analysis Type", | |
["Text Analysis", "Indicators", "Sentiment Analysis", "Entity Recognition"], | |
help="Select the type of analysis to perform on the extracted content" | |
) | |
st.markdown("### Monitoring") | |
monitoring_options = st.multiselect( | |
"Add to monitoring list", | |
["IP Addresses", "Email Addresses", "Bitcoin Addresses", "URLs", "Onion URLs"], | |
default=["IP Addresses", "URLs"], | |
help="Select which indicator types to monitor" | |
) | |
alert_threshold = st.slider( | |
"Alert Threshold", | |
min_value=0.0, | |
max_value=1.0, | |
value=0.7, | |
step=0.05, | |
help="Set the confidence threshold for alerts" | |
) | |
# Handle content extraction | |
if extract_button: | |
with st.spinner("Extracting content..."): | |
# Run the async extraction | |
content_data = asyncio.run(extract_content(url, use_tor=use_tor)) | |
# Store results in session state | |
st.session_state.extracted_content = content_data | |
# Success message | |
st.success(f"Content extracted from {url}") | |
# Display extracted content if available | |
if 'extracted_content' in st.session_state: | |
content_data = st.session_state.extracted_content | |
# Display content in tabs | |
content_tabs = st.tabs(["Extracted Text", "Indicators", "Metadata", "Raw HTML"]) | |
# Extracted text tab | |
with content_tabs[0]: | |
st.markdown(f"### {content_data.get('title', 'Extracted Content')}") | |
st.info(f"Source: {content_data.get('url')}") | |
# Highlight keywords in text | |
highlighted_text = create_keyword_highlight( | |
content_data.get('text_content', 'No content extracted'), | |
keywords | |
) | |
st.markdown(f""" | |
<div style="border: 1px solid #3498DB; border-radius: 5px; padding: 15px; | |
background-color: #1A1A1A; height: 400px; overflow-y: auto;"> | |
{highlighted_text} | |
</div> | |
""", unsafe_allow_html=True) | |
# Indicators tab | |
with content_tabs[1]: | |
render_indicators(content_data.get('indicators', {})) | |
# Metadata tab | |
with content_tabs[2]: | |
st.markdown("### Document Metadata") | |
metadata = content_data.get('metadata', {}) | |
if metadata: | |
for key, value in metadata.items(): | |
if value: | |
st.markdown(f"**{key}:** {value}") | |
else: | |
st.info("No metadata available") | |
# Raw HTML tab | |
with content_tabs[3]: | |
st.markdown("### Raw HTML") | |
with st.expander("Show Raw HTML"): | |
st.code(content_data.get('html_content', 'No HTML content available'), language="html") | |
# Additional informational UI elements | |
st.markdown("---") | |
st.markdown("### About Dark Web Intelligence") | |
st.markdown(""" | |
This tool allows you to extract and analyze content from both clearnet and dark web sites. | |
For .onion sites, make sure Tor is properly configured. | |
**Features:** | |
- Extract and analyze content from any URL | |
- Highlight keywords of interest | |
- Identify indicators of compromise (IoCs) | |
- Add indicators to monitoring list | |
""") |