""" Service for alert operations. """ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy import func, or_, and_ from datetime import datetime from typing import List, Optional, Dict, Any, Union from src.models.alert import Alert, AlertStatus, AlertCategory from src.models.threat import ThreatSeverity from src.api.schemas import PaginationParams async def create_alert( db: AsyncSession, title: str, description: str, severity: ThreatSeverity, category: AlertCategory, source_url: Optional[str] = None, threat_id: Optional[int] = None, mention_id: Optional[int] = None, ) -> Alert: """ Create a new alert. Args: db: Database session title: Alert title description: Alert description severity: Alert severity category: Alert category source_url: Source URL for the alert threat_id: ID of related threat mention_id: ID of related dark web mention Returns: Alert: Created alert """ db_alert = Alert( title=title, description=description, severity=severity, status=AlertStatus.NEW, category=category, generated_at=datetime.utcnow(), source_url=source_url, is_read=False, threat_id=threat_id, mention_id=mention_id, ) db.add(db_alert) await db.commit() await db.refresh(db_alert) return db_alert async def get_alert_by_id(db: AsyncSession, alert_id: int) -> Optional[Alert]: """ Get alert by ID. Args: db: Database session alert_id: Alert ID Returns: Optional[Alert]: Alert or None if not found """ result = await db.execute(select(Alert).filter(Alert.id == alert_id)) return result.scalars().first() async def get_alerts( db: AsyncSession, pagination: PaginationParams, severity: Optional[List[ThreatSeverity]] = None, status: Optional[List[AlertStatus]] = None, category: Optional[List[AlertCategory]] = None, is_read: Optional[bool] = None, search_query: Optional[str] = None, from_date: Optional[datetime] = None, to_date: Optional[datetime] = None, ) -> List[Alert]: """ Get alerts with filtering and pagination. Args: db: Database session pagination: Pagination parameters severity: Filter by severity status: Filter by status category: Filter by category is_read: Filter by read status search_query: Search in title and description from_date: Filter by generated_at >= from_date to_date: Filter by generated_at <= to_date Returns: List[Alert]: List of alerts """ query = select(Alert) # Apply filters if severity: query = query.filter(Alert.severity.in_(severity)) if status: query = query.filter(Alert.status.in_(status)) if category: query = query.filter(Alert.category.in_(category)) if is_read is not None: query = query.filter(Alert.is_read == is_read) if search_query: search_filter = or_( Alert.title.ilike(f"%{search_query}%"), Alert.description.ilike(f"%{search_query}%") ) query = query.filter(search_filter) if from_date: query = query.filter(Alert.generated_at >= from_date) if to_date: query = query.filter(Alert.generated_at <= to_date) # Apply pagination query = query.order_by(Alert.generated_at.desc()) query = query.offset((pagination.page - 1) * pagination.size).limit(pagination.size) result = await db.execute(query) return result.scalars().all() async def count_alerts( db: AsyncSession, severity: Optional[List[ThreatSeverity]] = None, status: Optional[List[AlertStatus]] = None, category: Optional[List[AlertCategory]] = None, is_read: Optional[bool] = None, search_query: Optional[str] = None, from_date: Optional[datetime] = None, to_date: Optional[datetime] = None, ) -> int: """ Count alerts with filtering. Args: db: Database session severity: Filter by severity status: Filter by status category: Filter by category is_read: Filter by read status search_query: Search in title and description from_date: Filter by generated_at >= from_date to_date: Filter by generated_at <= to_date Returns: int: Count of alerts """ query = select(func.count(Alert.id)) # Apply filters (same as in get_alerts) if severity: query = query.filter(Alert.severity.in_(severity)) if status: query = query.filter(Alert.status.in_(status)) if category: query = query.filter(Alert.category.in_(category)) if is_read is not None: query = query.filter(Alert.is_read == is_read) if search_query: search_filter = or_( Alert.title.ilike(f"%{search_query}%"), Alert.description.ilike(f"%{search_query}%") ) query = query.filter(search_filter) if from_date: query = query.filter(Alert.generated_at >= from_date) if to_date: query = query.filter(Alert.generated_at <= to_date) result = await db.execute(query) return result.scalar() async def update_alert_status( db: AsyncSession, alert_id: int, status: AlertStatus, action_taken: Optional[str] = None, ) -> Optional[Alert]: """ Update alert status. Args: db: Database session alert_id: Alert ID status: New status action_taken: Description of action taken Returns: Optional[Alert]: Updated alert or None if not found """ alert = await get_alert_by_id(db, alert_id) if not alert: return None alert.status = status if action_taken: alert.action_taken = action_taken if status == AlertStatus.RESOLVED: alert.resolved_at = datetime.utcnow() alert.updated_at = datetime.utcnow() await db.commit() await db.refresh(alert) return alert async def mark_alert_as_read( db: AsyncSession, alert_id: int, ) -> Optional[Alert]: """ Mark alert as read. Args: db: Database session alert_id: Alert ID Returns: Optional[Alert]: Updated alert or None if not found """ alert = await get_alert_by_id(db, alert_id) if not alert: return None alert.is_read = True alert.updated_at = datetime.utcnow() await db.commit() await db.refresh(alert) return alert async def assign_alert( db: AsyncSession, alert_id: int, user_id: int, ) -> Optional[Alert]: """ Assign alert to a user. Args: db: Database session alert_id: Alert ID user_id: User ID to assign to Returns: Optional[Alert]: Updated alert or None if not found """ alert = await get_alert_by_id(db, alert_id) if not alert: return None alert.assigned_to_id = user_id alert.status = AlertStatus.ASSIGNED alert.updated_at = datetime.utcnow() await db.commit() await db.refresh(alert) return alert async def get_alert_counts_by_severity( db: AsyncSession, from_date: Optional[datetime] = None, to_date: Optional[datetime] = None, ) -> Dict[str, int]: """ Get count of alerts by severity. Args: db: Database session from_date: Filter by generated_at >= from_date to_date: Filter by generated_at <= to_date Returns: Dict[str, int]: Mapping of severity to count """ result = {} for severity in ThreatSeverity: query = select(func.count(Alert.id)).filter(Alert.severity == severity) if from_date: query = query.filter(Alert.generated_at >= from_date) if to_date: query = query.filter(Alert.generated_at <= to_date) count_result = await db.execute(query) count = count_result.scalar() or 0 result[severity.value] = count return result