Spaces:
Running
Running
import re | |
import time | |
import os | |
import logging | |
from typing import List, Dict, Optional, Set, Tuple | |
import google_auth_oauthlib.flow | |
import googleapiclient.discovery | |
import googleapiclient.errors | |
from google_auth_oauthlib.flow import Flow | |
from google.oauth2.credentials import Credentials | |
from googleapiclient.discovery import build | |
from fastapi import FastAPI, Request, Form, File, UploadFile, HTTPException, Depends | |
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse | |
from fastapi.templating import Jinja2Templates | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.security import OAuth2PasswordBearer | |
from google.oauth2.credentials import Credentials | |
from pydantic import BaseModel | |
from app.services.gambling_filter import GamblingFilter | |
manual_overrides = {} | |
def keep_comment(comment_id: str, video_id: str): | |
# Mark this comment as manually kept | |
manual_overrides[(video_id, comment_id)] = "safe" | |
def get_credentials_from_session(session) -> Credentials: | |
"""Utility to build a Credentials object from stored session data.""" | |
creds_data = session.get("credentials") | |
if not creds_data: | |
return None | |
return Credentials( | |
token=creds_data["token"], | |
refresh_token=creds_data["refresh_token"], | |
token_uri=creds_data["token_uri"], | |
client_id=creds_data["client_id"], | |
client_secret=creds_data["client_secret"], | |
scopes=creds_data["scopes"] | |
) | |
class YouTubeCommentModerator: | |
def __init__(self, | |
client_secrets_path: str = "./app/client_secret.json", | |
gambling_filter: Optional[GamblingFilter] = None): | |
""" | |
Initialize the YouTube Comment Moderator with configurable settings. | |
:param client_secrets_path: Path to OAuth 2.0 client secrets file | |
:param gambling_filter: Optional pre-configured GamblingFilter instance | |
""" | |
# Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - [%(levelname)s] %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
self.logger = logging.getLogger(__name__) | |
# YouTube service | |
self.youtube_service = None | |
# Gambling Filter | |
self.gambling_filter = gambling_filter or GamblingFilter() | |
def moderate_video_comments(self, video_id: str, threshold: float = 0.55) -> Dict: | |
if not self.youtube_service: | |
self.logger.error("YouTube service not authenticated.") | |
return {"error": "Not authenticated"} | |
try: | |
comments = [] | |
request = self.youtube_service.commentThreads().list( | |
part="snippet", | |
videoId=video_id, | |
maxResults=100, | |
textFormat="plainText" | |
) | |
response = request.execute() | |
moderation_results = { | |
"total_comments": 0, | |
"gambling_comments": [], | |
"safe_comments": [], | |
"moderation_metrics": [] | |
} | |
while request is not None: | |
for item in response.get("items", []): | |
comment_id = item["snippet"]["topLevelComment"]["id"] | |
comment_snippet = item["snippet"]["topLevelComment"]["snippet"] | |
comment_text = comment_snippet["textDisplay"] | |
# Check for manual override first | |
if manual_overrides.get((video_id, comment_id)) == "safe": | |
# The user previously pressed "Keep" - skip the gambling filter | |
is_gambling = False | |
metrics = {"confidence_score": 0.0} | |
else: | |
# Normal path - filter it | |
is_gambling, metrics = self.gambling_filter.is_gambling_comment(comment_text, threshold) | |
comment_info = { | |
"id": comment_id, | |
"text": comment_text, | |
"author": comment_snippet["authorDisplayName"], | |
"is_gambling": is_gambling, | |
"metrics": metrics | |
} | |
moderation_results["total_comments"] += 1 | |
if is_gambling: | |
moderation_results["gambling_comments"].append(comment_info) | |
else: | |
moderation_results["safe_comments"].append(comment_info) | |
metrics["original_text"] = comment_text | |
moderation_results["moderation_metrics"].append(metrics) | |
# Handle pagination if available | |
request = self.youtube_service.commentThreads().list_next(request, response) | |
if request: | |
response = request.execute() | |
else: | |
break | |
return moderation_results | |
except Exception as e: | |
self.logger.error(f"Error moderating comments: {e}") | |
return {"error": str(e)} | |
def delete_comment(self, comment_id: str) -> bool: | |
""" | |
Delete a specific comment. | |
:param comment_id: YouTube comment ID | |
:return: Boolean indicating successful deletion | |
""" | |
try: | |
# self.youtube_service.comments().delete(id=comment_id).execute() | |
self.youtube_service.comments().setModerationStatus( | |
id=comment_id, | |
moderationStatus="rejected" | |
).execute() | |
self.logger.info(f"Comment {comment_id} deleted successfully.") | |
return True | |
except Exception as e: | |
self.logger.error(f"Failed to delete comment {comment_id}: {e}") | |
return False | |
def get_channel_videos(self, max_results: int = 50) -> List[Dict]: | |
""" | |
Retrieve videos from authenticated user's channel. | |
:param max_results: Maximum number of videos to retrieve | |
:return: List of video details | |
""" | |
if not self.youtube_service: | |
self.logger.error("YouTube service not authenticated.") | |
return [] | |
try: | |
request = self.youtube_service.search().list( | |
part="snippet", | |
channelId=self._get_channel_id(), | |
maxResults=max_results, | |
type="video" | |
) | |
response = request.execute() | |
videos = [] | |
for item in response.get("items", []): | |
video_info = { | |
"id": item["id"]["videoId"], | |
"title": item["snippet"]["title"], | |
"thumbnail": item["snippet"]["thumbnails"]["default"]["url"] | |
} | |
videos.append(video_info) | |
return videos | |
except Exception as e: | |
self.logger.error(f"Error retrieving videos: {e}") | |
return [] | |
def _get_channel_id(self) -> Optional[str]: | |
""" | |
Retrieve the authenticated user's channel ID. | |
:return: Channel ID or None | |
""" | |
try: | |
request = self.youtube_service.channels().list(part="id", mine=True) | |
response = request.execute() | |
return response["items"][0]["id"] | |
except Exception as e: | |
self.logger.error(f"Error retrieving channel ID: {e}") | |
return None | |