Spaces:
Running
Running
""" | |
Service for dark web content operations. | |
""" | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from sqlalchemy.future import select | |
from sqlalchemy import func, or_, text | |
from datetime import datetime | |
from typing import List, Optional, Dict, Any, Union | |
from src.models.dark_web_content import DarkWebContent, DarkWebMention, ContentType, ContentStatus | |
from src.models.threat import Threat, ThreatCategory, ThreatSeverity, ThreatStatus | |
from src.api.schemas import PaginationParams | |
async def create_content( | |
db: AsyncSession, | |
url: str, | |
content: str, | |
title: Optional[str] = None, | |
content_type: ContentType = ContentType.OTHER, | |
content_status: ContentStatus = ContentStatus.NEW, | |
source_name: Optional[str] = None, | |
source_type: Optional[str] = None, | |
language: Optional[str] = None, | |
relevance_score: float = 0.0, | |
sentiment_score: float = 0.0, | |
entity_data: Optional[str] = None, | |
) -> DarkWebContent: | |
""" | |
Create a new dark web content entry. | |
Args: | |
db: Database session | |
url: URL of the content | |
content: Text content | |
title: Title of the content | |
content_type: Type of content | |
content_status: Status of content | |
source_name: Name of the source | |
source_type: Type of source | |
language: Language of the content | |
relevance_score: Relevance score (0-1) | |
sentiment_score: Sentiment score (-1 to 1) | |
entity_data: JSON string of extracted entities | |
Returns: | |
DarkWebContent: Created content | |
""" | |
# Extract domain from URL if possible | |
domain = None | |
if url: | |
try: | |
from urllib.parse import urlparse | |
parsed_url = urlparse(url) | |
domain = parsed_url.netloc | |
except: | |
pass | |
db_content = DarkWebContent( | |
url=url, | |
domain=domain, | |
title=title, | |
content=content, | |
content_type=content_type, | |
content_status=content_status, | |
source_name=source_name, | |
source_type=source_type, | |
language=language, | |
scraped_at=datetime.utcnow(), | |
relevance_score=relevance_score, | |
sentiment_score=sentiment_score, | |
entity_data=entity_data, | |
) | |
db.add(db_content) | |
await db.commit() | |
await db.refresh(db_content) | |
return db_content | |
async def get_content_by_id(db: AsyncSession, content_id: int) -> Optional[DarkWebContent]: | |
""" | |
Get dark web content by ID. | |
Args: | |
db: Database session | |
content_id: Content ID | |
Returns: | |
Optional[DarkWebContent]: Content or None if not found | |
""" | |
result = await db.execute(select(DarkWebContent).filter(DarkWebContent.id == content_id)) | |
return result.scalars().first() | |
async def get_contents( | |
db: AsyncSession, | |
pagination: PaginationParams, | |
content_type: Optional[List[ContentType]] = None, | |
content_status: Optional[List[ContentStatus]] = None, | |
source_name: Optional[str] = None, | |
search_query: Optional[str] = None, | |
from_date: Optional[datetime] = None, | |
to_date: Optional[datetime] = None, | |
) -> List[DarkWebContent]: | |
""" | |
Get dark web contents with filtering and pagination. | |
Args: | |
db: Database session | |
pagination: Pagination parameters | |
content_type: Filter by content type | |
content_status: Filter by content status | |
source_name: Filter by source name | |
search_query: Search in title and content | |
from_date: Filter by scraped_at >= from_date | |
to_date: Filter by scraped_at <= to_date | |
Returns: | |
List[DarkWebContent]: List of dark web contents | |
""" | |
query = select(DarkWebContent) | |
# Apply filters | |
if content_type: | |
query = query.filter(DarkWebContent.content_type.in_(content_type)) | |
if content_status: | |
query = query.filter(DarkWebContent.content_status.in_(content_status)) | |
if source_name: | |
query = query.filter(DarkWebContent.source_name == source_name) | |
if search_query: | |
search_filter = or_( | |
DarkWebContent.title.ilike(f"%{search_query}%"), | |
DarkWebContent.content.ilike(f"%{search_query}%") | |
) | |
query = query.filter(search_filter) | |
if from_date: | |
query = query.filter(DarkWebContent.scraped_at >= from_date) | |
if to_date: | |
query = query.filter(DarkWebContent.scraped_at <= to_date) | |
# Apply pagination | |
query = query.order_by(DarkWebContent.scraped_at.desc()) | |
query = query.offset((pagination.page - 1) * pagination.size).limit(pagination.size) | |
result = await db.execute(query) | |
return result.scalars().all() | |
async def count_contents( | |
db: AsyncSession, | |
content_type: Optional[List[ContentType]] = None, | |
content_status: Optional[List[ContentStatus]] = None, | |
source_name: Optional[str] = None, | |
search_query: Optional[str] = None, | |
from_date: Optional[datetime] = None, | |
to_date: Optional[datetime] = None, | |
) -> int: | |
""" | |
Count dark web contents with filtering. | |
Args: | |
db: Database session | |
content_type: Filter by content type | |
content_status: Filter by content status | |
source_name: Filter by source name | |
search_query: Search in title and content | |
from_date: Filter by scraped_at >= from_date | |
to_date: Filter by scraped_at <= to_date | |
Returns: | |
int: Count of dark web contents | |
""" | |
query = select(func.count(DarkWebContent.id)) | |
# Apply filters (same as in get_contents) | |
if content_type: | |
query = query.filter(DarkWebContent.content_type.in_(content_type)) | |
if content_status: | |
query = query.filter(DarkWebContent.content_status.in_(content_status)) | |
if source_name: | |
query = query.filter(DarkWebContent.source_name == source_name) | |
if search_query: | |
search_filter = or_( | |
DarkWebContent.title.ilike(f"%{search_query}%"), | |
DarkWebContent.content.ilike(f"%{search_query}%") | |
) | |
query = query.filter(search_filter) | |
if from_date: | |
query = query.filter(DarkWebContent.scraped_at >= from_date) | |
if to_date: | |
query = query.filter(DarkWebContent.scraped_at <= to_date) | |
result = await db.execute(query) | |
return result.scalar() | |
async def create_mention( | |
db: AsyncSession, | |
content_id: int, | |
keyword: str, | |
keyword_category: Optional[str] = None, | |
context: Optional[str] = None, | |
snippet: Optional[str] = None, | |
mention_type: Optional[str] = None, | |
confidence: float = 0.0, | |
is_verified: bool = False, | |
) -> DarkWebMention: | |
""" | |
Create a new dark web mention. | |
Args: | |
db: Database session | |
content_id: ID of the content where the mention was found | |
keyword: Keyword that was mentioned | |
keyword_category: Category of the keyword | |
context: Text surrounding the mention | |
snippet: Extract of text containing the mention | |
mention_type: Type of mention | |
confidence: Confidence score (0-1) | |
is_verified: Whether the mention is verified | |
Returns: | |
DarkWebMention: Created mention | |
""" | |
db_mention = DarkWebMention( | |
content_id=content_id, | |
keyword=keyword, | |
keyword_category=keyword_category, | |
context=context, | |
snippet=snippet, | |
mention_type=mention_type, | |
confidence=confidence, | |
is_verified=is_verified, | |
) | |
db.add(db_mention) | |
await db.commit() | |
await db.refresh(db_mention) | |
return db_mention | |
async def get_mention_by_id(db: AsyncSession, mention_id: int) -> Optional[DarkWebMention]: | |
""" | |
Get dark web mention by ID. | |
Args: | |
db: Database session | |
mention_id: Mention ID | |
Returns: | |
Optional[DarkWebMention]: Mention or None if not found | |
""" | |
result = await db.execute(select(DarkWebMention).filter(DarkWebMention.id == mention_id)) | |
return result.scalars().first() | |
async def get_mentions( | |
db: AsyncSession, | |
pagination: PaginationParams, | |
keyword: Optional[str] = None, | |
content_id: Optional[int] = None, | |
is_verified: Optional[bool] = None, | |
from_date: Optional[datetime] = None, | |
to_date: Optional[datetime] = None, | |
) -> List[DarkWebMention]: | |
""" | |
Get dark web mentions with filtering and pagination. | |
Args: | |
db: Database session | |
pagination: Pagination parameters | |
keyword: Filter by keyword | |
content_id: Filter by content ID | |
is_verified: Filter by verification status | |
from_date: Filter by created_at >= from_date | |
to_date: Filter by created_at <= to_date | |
Returns: | |
List[DarkWebMention]: List of dark web mentions | |
""" | |
query = select(DarkWebMention) | |
# Apply filters | |
if keyword: | |
query = query.filter(DarkWebMention.keyword.ilike(f"%{keyword}%")) | |
if content_id: | |
query = query.filter(DarkWebMention.content_id == content_id) | |
if is_verified is not None: | |
query = query.filter(DarkWebMention.is_verified == is_verified) | |
if from_date: | |
query = query.filter(DarkWebMention.created_at >= from_date) | |
if to_date: | |
query = query.filter(DarkWebMention.created_at <= to_date) | |
# Apply pagination | |
query = query.order_by(DarkWebMention.created_at.desc()) | |
query = query.offset((pagination.page - 1) * pagination.size).limit(pagination.size) | |
result = await db.execute(query) | |
return result.scalars().all() | |
async def create_threat_from_content( | |
db: AsyncSession, | |
content_id: int, | |
title: str, | |
description: str, | |
severity: ThreatSeverity, | |
category: ThreatCategory, | |
confidence_score: float = 0.0, | |
) -> Threat: | |
""" | |
Create a threat from dark web content. | |
Args: | |
db: Database session | |
content_id: ID of the content | |
title: Threat title | |
description: Threat description | |
severity: Threat severity | |
category: Threat category | |
confidence_score: Confidence score (0-1) | |
Returns: | |
Threat: Created threat | |
""" | |
# Get the content | |
content = await get_content_by_id(db, content_id) | |
if not content: | |
raise ValueError(f"Content with ID {content_id} not found") | |
# Create the threat | |
from src.api.services.threat_service import create_threat | |
threat = await create_threat( | |
db=db, | |
title=title, | |
description=description, | |
severity=severity, | |
category=category, | |
status=ThreatStatus.NEW, | |
source_url=content.url, | |
source_name=content.source_name, | |
source_type=content.source_type, | |
confidence_score=confidence_score, | |
) | |
return threat |