File size: 4,433 Bytes
48922fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
Web scraping and processing utilities.
"""
from typing import Dict, Any, List, Optional
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urlparse, urljoin
from tenacity import retry, stop_after_attempt, wait_exponential

class WebUtils:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
        })
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def fetch_url(self, url: str, timeout: int = 10) -> Optional[str]:
        """Fetch content from a URL."""
        try:
            response = self.session.get(url, timeout=timeout)
            response.raise_for_status()
            return response.text
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None
    
    def extract_text(self, html: str) -> str:
        """Extract clean text from HTML content."""
        soup = BeautifulSoup(html, "html.parser")
        
        # Remove unwanted elements
        for element in soup(["script", "style", "nav", "footer", "header"]):
            element.decompose()
        
        # Get text and clean it
        text = soup.get_text(separator="\n", strip=True)
        
        # Remove excessive newlines
        text = re.sub(r"\n\s*\n", "\n\n", text)
        
        return text.strip()
    
    def extract_metadata(self, html: str, url: str) -> Dict[str, Any]:
        """Extract metadata from HTML content."""
        soup = BeautifulSoup(html, "html.parser")
        
        metadata = {
            "url": url,
            "title": None,
            "description": None,
            "keywords": None,
            "author": None,
            "published_date": None
        }
        
        # Extract title
        metadata["title"] = (
            soup.title.string if soup.title else None
        )
        
        # Extract meta tags
        meta_tags = soup.find_all("meta")
        for tag in meta_tags:
            # Description
            if tag.get("name", "").lower() == "description":
                metadata["description"] = tag.get("content")
            
            # Keywords
            elif tag.get("name", "").lower() == "keywords":
                metadata["keywords"] = tag.get("content")
            
            # Author
            elif tag.get("name", "").lower() == "author":
                metadata["author"] = tag.get("content")
            
            # Published date
            elif tag.get("name", "").lower() in ["published_time", "publication_date"]:
                metadata["published_date"] = tag.get("content")
        
        return metadata
    
    def extract_links(self, html: str, base_url: str) -> List[str]:
        """Extract all links from HTML content."""
        soup = BeautifulSoup(html, "html.parser")
        links = []
        
        for link in soup.find_all("a"):
            href = link.get("href")
            if href:
                # Convert relative URLs to absolute
                absolute_url = urljoin(base_url, href)
                # Only include http(s) URLs
                if absolute_url.startswith(("http://", "https://")):
                    links.append(absolute_url)
        
        return list(set(links))  # Remove duplicates
    
    def is_valid_url(self, url: str) -> bool:
        """Check if a URL is valid."""
        try:
            result = urlparse(url)
            return all([result.scheme, result.netloc])
        except Exception:
            return False
    
    def clean_url(self, url: str) -> str:
        """Clean and normalize a URL."""
        # Remove tracking parameters
        parsed = urlparse(url)
        path = parsed.path
        
        # Remove common tracking parameters
        query_params = []
        if parsed.query:
            for param in parsed.query.split("&"):
                if "=" in param:
                    key = param.split("=")[0].lower()
                    if not any(track in key for track in ["utm_", "ref_", "source", "campaign"]):
                        query_params.append(param)
        
        # Rebuild URL
        clean_url = f"{parsed.scheme}://{parsed.netloc}{path}"
        if query_params:
            clean_url += "?" + "&".join(query_params)
        
        return clean_url