Spaces:
Running
Running
""" | |
Search History and Trends Service | |
This service manages search history, saved searches, and trend analysis. | |
""" | |
import logging | |
import json | |
from datetime import datetime, timedelta | |
from typing import List, Dict, Any, Optional, Tuple, Union | |
from sqlalchemy import func, desc, and_, or_, text | |
from sqlalchemy.future import select | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from sqlalchemy.orm import selectinload | |
from src.models.search_history import SearchHistory, SearchResult, SavedSearch, TrendTopic | |
from src.models.dark_web_content import DarkWebContent | |
from src.models.user import User | |
# Configure logging | |
logger = logging.getLogger(__name__) | |
async def add_search_history( | |
db: AsyncSession, | |
query: str, | |
user_id: Optional[int] = None, | |
result_count: int = 0, | |
category: Optional[str] = None, | |
is_saved: bool = False, | |
notes: Optional[str] = None, | |
tags: Optional[str] = None | |
) -> SearchHistory: | |
""" | |
Add a new search history entry. | |
Args: | |
db: Database session | |
query: Search query | |
user_id: ID of the user who performed the search (optional) | |
result_count: Number of results returned | |
category: Category of the search | |
is_saved: Whether this is a saved search | |
notes: Optional notes | |
tags: Optional tags (comma-separated) | |
Returns: | |
The created SearchHistory object | |
""" | |
search_history = SearchHistory( | |
query=query, | |
user_id=user_id, | |
result_count=result_count, | |
category=category, | |
is_saved=is_saved, | |
notes=notes, | |
tags=tags | |
) | |
db.add(search_history) | |
await db.commit() | |
await db.refresh(search_history) | |
# Update trend data | |
await update_trend_data(db, query, category) | |
return search_history | |
async def add_search_result( | |
db: AsyncSession, | |
search_id: int, | |
url: str, | |
title: Optional[str] = None, | |
snippet: Optional[str] = None, | |
source: Optional[str] = None, | |
relevance_score: float = 0.0, | |
content_id: Optional[int] = None | |
) -> SearchResult: | |
""" | |
Add a new search result. | |
Args: | |
db: Database session | |
search_id: ID of the parent search | |
url: URL of the result | |
title: Title of the result | |
snippet: Text snippet from the result | |
source: Source of the result | |
relevance_score: Score indicating relevance to the search query | |
content_id: ID of the content in our database (if applicable) | |
Returns: | |
The created SearchResult object | |
""" | |
search_result = SearchResult( | |
search_id=search_id, | |
url=url, | |
title=title, | |
snippet=snippet, | |
source=source, | |
relevance_score=relevance_score, | |
content_id=content_id | |
) | |
db.add(search_result) | |
await db.commit() | |
await db.refresh(search_result) | |
return search_result | |
async def get_search_history( | |
db: AsyncSession, | |
skip: int = 0, | |
limit: int = 100, | |
user_id: Optional[int] = None, | |
query_filter: Optional[str] = None, | |
date_from: Optional[datetime] = None, | |
date_to: Optional[datetime] = None, | |
category: Optional[str] = None, | |
is_saved: Optional[bool] = None, | |
include_results: bool = False | |
) -> List[SearchHistory]: | |
""" | |
Get search history with filtering options. | |
Args: | |
db: Database session | |
skip: Number of items to skip | |
limit: Maximum number of items to return | |
user_id: Filter by user ID | |
query_filter: Filter by search query (partial match) | |
date_from: Filter by timestamp (from) | |
date_to: Filter by timestamp (to) | |
category: Filter by category | |
is_saved: Filter by saved status | |
include_results: Whether to include search results | |
Returns: | |
List of SearchHistory objects | |
""" | |
statement = select(SearchHistory) | |
# Apply filters | |
if user_id is not None: | |
statement = statement.where(SearchHistory.user_id == user_id) | |
if query_filter: | |
statement = statement.where(SearchHistory.query.ilike(f"%{query_filter}%")) | |
if date_from: | |
statement = statement.where(SearchHistory.timestamp >= date_from) | |
if date_to: | |
statement = statement.where(SearchHistory.timestamp <= date_to) | |
if category: | |
statement = statement.where(SearchHistory.category == category) | |
if is_saved is not None: | |
statement = statement.where(SearchHistory.is_saved == is_saved) | |
# Load related data if requested | |
if include_results: | |
statement = statement.options(selectinload(SearchHistory.search_results)) | |
# Apply pagination | |
statement = statement.order_by(desc(SearchHistory.timestamp)).offset(skip).limit(limit) | |
result = await db.execute(statement) | |
return result.scalars().all() | |
async def get_search_by_id( | |
db: AsyncSession, | |
search_id: int, | |
include_results: bool = False | |
) -> Optional[SearchHistory]: | |
""" | |
Get a search history entry by ID. | |
Args: | |
db: Database session | |
search_id: Search history ID | |
include_results: Whether to include search results | |
Returns: | |
SearchHistory object or None if not found | |
""" | |
statement = select(SearchHistory).where(SearchHistory.id == search_id) | |
if include_results: | |
statement = statement.options(selectinload(SearchHistory.search_results)) | |
result = await db.execute(statement) | |
return result.scalars().first() | |
async def delete_search_history(db: AsyncSession, search_id: int) -> bool: | |
""" | |
Delete a search history entry. | |
Args: | |
db: Database session | |
search_id: ID of the search to delete | |
Returns: | |
True if successful, False otherwise | |
""" | |
search = await get_search_by_id(db, search_id) | |
if not search: | |
return False | |
await db.delete(search) | |
await db.commit() | |
return True | |
async def save_search( | |
db: AsyncSession, | |
search_id: int, | |
is_saved: bool = True, | |
notes: Optional[str] = None, | |
tags: Optional[str] = None | |
) -> Optional[SearchHistory]: | |
""" | |
Save or unsave a search history entry. | |
Args: | |
db: Database session | |
search_id: ID of the search | |
is_saved: Whether to save or unsave | |
notes: Optional notes to add | |
tags: Optional tags to add (comma-separated) | |
Returns: | |
Updated SearchHistory object or None if not found | |
""" | |
search = await get_search_by_id(db, search_id) | |
if not search: | |
return None | |
search.is_saved = is_saved | |
if notes: | |
search.notes = notes | |
if tags: | |
search.tags = tags | |
await db.commit() | |
await db.refresh(search) | |
return search | |
async def create_saved_search( | |
db: AsyncSession, | |
name: str, | |
query: str, | |
user_id: int, | |
frequency: int = 24, | |
notification_enabled: bool = True, | |
threshold: int = 1, | |
category: Optional[str] = None | |
) -> SavedSearch: | |
""" | |
Create a new saved search with periodic monitoring. | |
Args: | |
db: Database session | |
name: Name of the saved search | |
query: Search query | |
user_id: ID of the user | |
frequency: How often to run this search (in hours, 0 for manual only) | |
notification_enabled: Whether to send notifications for new results | |
threshold: Minimum number of new results for notification | |
category: Category of the search | |
Returns: | |
The created SavedSearch object | |
""" | |
saved_search = SavedSearch( | |
name=name, | |
query=query, | |
user_id=user_id, | |
frequency=frequency, | |
notification_enabled=notification_enabled, | |
threshold=threshold, | |
category=category | |
) | |
db.add(saved_search) | |
await db.commit() | |
await db.refresh(saved_search) | |
return saved_search | |
async def get_saved_searches( | |
db: AsyncSession, | |
user_id: Optional[int] = None, | |
is_active: Optional[bool] = None, | |
skip: int = 0, | |
limit: int = 100 | |
) -> List[SavedSearch]: | |
""" | |
Get saved searches with filtering options. | |
Args: | |
db: Database session | |
user_id: Filter by user ID | |
is_active: Filter by active status | |
skip: Number of items to skip | |
limit: Maximum number of items to return | |
Returns: | |
List of SavedSearch objects | |
""" | |
statement = select(SavedSearch) | |
# Apply filters | |
if user_id is not None: | |
statement = statement.where(SavedSearch.user_id == user_id) | |
if is_active is not None: | |
statement = statement.where(SavedSearch.is_active == is_active) | |
# Apply pagination | |
statement = statement.order_by(SavedSearch.name).offset(skip).limit(limit) | |
result = await db.execute(statement) | |
return result.scalars().all() | |
async def update_trend_data( | |
db: AsyncSession, | |
query: str, | |
category: Optional[str] = None | |
) -> None: | |
""" | |
Update trend data based on search queries. | |
Args: | |
db: Database session | |
query: Search query | |
category: Category of the search | |
""" | |
# Split query into individual terms/topics | |
topics = [t.strip() for t in query.split() if len(t.strip()) > 3] | |
# Process each topic | |
for topic in topics: | |
# Check if topic already exists | |
statement = select(TrendTopic).where(TrendTopic.topic == topic) | |
result = await db.execute(statement) | |
trend_topic = result.scalars().first() | |
if trend_topic: | |
# Update existing topic | |
trend_topic.last_seen = datetime.utcnow() | |
trend_topic.mention_count += 1 | |
# Calculate growth rate (percentage change over the last 24 hours) | |
time_diff = (trend_topic.last_seen - trend_topic.first_seen).total_seconds() / 3600 # hours | |
if time_diff > 0: | |
hourly_rate = trend_topic.mention_count / time_diff | |
trend_topic.growth_rate = hourly_rate * 24 # daily growth rate | |
# Update category if provided and not already set | |
if category and not trend_topic.category: | |
trend_topic.category = category | |
else: | |
# Create a new trend topic | |
trend_topic = TrendTopic( | |
topic=topic, | |
category=category, | |
mention_count=1, | |
growth_rate=1.0 # Initial growth rate | |
) | |
db.add(trend_topic) | |
await db.commit() | |
async def get_trending_topics( | |
db: AsyncSession, | |
days: int = 7, | |
limit: int = 20, | |
category: Optional[str] = None, | |
min_mentions: int = 3 | |
) -> List[TrendTopic]: | |
""" | |
Get trending topics over a specific time period. | |
Args: | |
db: Database session | |
days: Number of days to consider | |
limit: Maximum number of topics to return | |
category: Filter by category | |
min_mentions: Minimum number of mentions | |
Returns: | |
List of TrendTopic objects sorted by growth rate | |
""" | |
cutoff_date = datetime.utcnow() - timedelta(days=days) | |
statement = select(TrendTopic).where( | |
and_( | |
TrendTopic.last_seen >= cutoff_date, | |
TrendTopic.mention_count >= min_mentions, | |
TrendTopic.is_active == True | |
) | |
) | |
if category: | |
statement = statement.where(TrendTopic.category == category) | |
statement = statement.order_by(desc(TrendTopic.growth_rate)).limit(limit) | |
result = await db.execute(statement) | |
return result.scalars().all() | |
async def get_search_frequency( | |
db: AsyncSession, | |
days: int = 30, | |
interval: str = 'day' | |
) -> List[Dict[str, Any]]: | |
""" | |
Get search frequency over time for visualization. | |
Args: | |
db: Database session | |
days: Number of days to analyze | |
interval: Time interval ('hour', 'day', 'week', 'month') | |
Returns: | |
List of dictionaries with time intervals and search counts | |
""" | |
cutoff_date = datetime.utcnow() - timedelta(days=days) | |
# SQL query depends on the interval | |
if interval == 'hour': | |
date_format = "YYYY-MM-DD HH24:00" | |
trunc_expr = func.date_trunc('hour', SearchHistory.timestamp) | |
elif interval == 'day': | |
date_format = "YYYY-MM-DD" | |
trunc_expr = func.date_trunc('day', SearchHistory.timestamp) | |
elif interval == 'week': | |
date_format = "YYYY-WW" | |
trunc_expr = func.date_trunc('week', SearchHistory.timestamp) | |
else: # month | |
date_format = "YYYY-MM" | |
trunc_expr = func.date_trunc('month', SearchHistory.timestamp) | |
# Query for search count by interval | |
statement = select( | |
trunc_expr.label('interval'), | |
func.count(SearchHistory.id).label('count') | |
).where( | |
SearchHistory.timestamp >= cutoff_date | |
).group_by( | |
'interval' | |
).order_by( | |
'interval' | |
) | |
result = await db.execute(statement) | |
rows = result.all() | |
# Convert to list of dictionaries | |
return [{"interval": row.interval, "count": row.count} for row in rows] | |
async def get_popular_searches( | |
db: AsyncSession, | |
days: int = 30, | |
limit: int = 10 | |
) -> List[Dict[str, Any]]: | |
""" | |
Get the most popular search terms. | |
Args: | |
db: Database session | |
days: Number of days to analyze | |
limit: Maximum number of terms to return | |
Returns: | |
List of dictionaries with search queries and counts | |
""" | |
cutoff_date = datetime.utcnow() - timedelta(days=days) | |
statement = select( | |
SearchHistory.query, | |
func.count(SearchHistory.id).label('count') | |
).where( | |
SearchHistory.timestamp >= cutoff_date | |
).group_by( | |
SearchHistory.query | |
).order_by( | |
desc('count') | |
).limit(limit) | |
result = await db.execute(statement) | |
rows = result.all() | |
return [{"query": row.query, "count": row.count} for row in rows] | |
async def get_search_categories( | |
db: AsyncSession, | |
days: int = 30 | |
) -> List[Dict[str, Any]]: | |
""" | |
Get distribution of search categories. | |
Args: | |
db: Database session | |
days: Number of days to analyze | |
Returns: | |
List of dictionaries with categories and counts | |
""" | |
cutoff_date = datetime.utcnow() - timedelta(days=days) | |
statement = select( | |
SearchHistory.category, | |
func.count(SearchHistory.id).label('count') | |
).where( | |
and_( | |
SearchHistory.timestamp >= cutoff_date, | |
SearchHistory.category.is_not(None) | |
) | |
).group_by( | |
SearchHistory.category | |
).order_by( | |
desc('count') | |
) | |
result = await db.execute(statement) | |
rows = result.all() | |
return [{"category": row.category or "Uncategorized", "count": row.count} for row in rows] | |
async def get_search_trend_analysis( | |
db: AsyncSession, | |
days: int = 90, | |
trend_days: int = 7, | |
limit: int = 10 | |
) -> Dict[str, Any]: | |
""" | |
Get comprehensive analysis of search trends. | |
Args: | |
db: Database session | |
days: Total days to analyze | |
trend_days: Days to calculate short-term trends | |
limit: Maximum number of items in each category | |
Returns: | |
Dictionary with various trend analyses | |
""" | |
# Get overall search frequency | |
frequency = await get_search_frequency(db, days, 'day') | |
# Get popular searches | |
popular = await get_popular_searches(db, days, limit) | |
# Get recent trending topics | |
trending = await get_trending_topics(db, trend_days, limit) | |
# Get category distribution | |
categories = await get_search_categories(db, days) | |
# Get recent (last 24 hours) vs. overall popular terms | |
recent_popular = await get_popular_searches(db, 1, limit) | |
# Calculate velocity (rate of change) | |
# This compares the last 7 days to the previous 7 days | |
cutoff_recent = datetime.utcnow() - timedelta(days=trend_days) | |
cutoff_previous = cutoff_recent - timedelta(days=trend_days) | |
# Query for velocity calculation | |
statement_recent = select(func.count(SearchHistory.id)).where( | |
SearchHistory.timestamp >= cutoff_recent | |
) | |
statement_previous = select(func.count(SearchHistory.id)).where( | |
and_( | |
SearchHistory.timestamp >= cutoff_previous, | |
SearchHistory.timestamp < cutoff_recent | |
) | |
) | |
result_recent = await db.execute(statement_recent) | |
result_previous = await db.execute(statement_previous) | |
count_recent = result_recent.scalar() or 0 | |
count_previous = result_previous.scalar() or 0 | |
if count_previous > 0: | |
velocity = (count_recent - count_previous) / count_previous * 100 # percentage change | |
else: | |
velocity = 100.0 if count_recent > 0 else 0.0 | |
# Compile the results | |
return { | |
"frequency": frequency, | |
"popular_searches": popular, | |
"trending_topics": [ | |
{"topic": t.topic, "mentions": t.mention_count, "growth_rate": t.growth_rate} | |
for t in trending | |
], | |
"categories": categories, | |
"recent_popular": recent_popular, | |
"velocity": velocity, | |
"total_searches": { | |
"total": count_recent + count_previous, | |
"recent": count_recent, | |
"previous": count_previous | |
} | |
} |