Spaces:
Running
Running
from fastapi import APIRouter, Request, HTTPException, Depends | |
from fastapi import FastAPI, Request, Form, File, UploadFile, HTTPException, Depends | |
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse | |
from google.oauth2.credentials import Credentials | |
from googleapiclient.discovery import build | |
from typing import List, Dict, Optional, Set, Tuple | |
from app.services.gambling_filter import GamblingFilter | |
from app.models import User, UserDatabase | |
from app.config import templates, user_moderator, filter_instance | |
from app.services.youtube_moderator import keep_comment | |
import googleapiclient.discovery | |
import logging | |
# Configure logging at the top of the file | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - [%(levelname)s] %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
router = APIRouter() | |
def get_credentials_from_session(session) -> Credentials: | |
"""Utility to build a Credentials object from stored session data.""" | |
creds_data = session.get("credentials") | |
if not creds_data: | |
return None | |
return Credentials( | |
token=creds_data["token"], | |
refresh_token=creds_data["refresh_token"], | |
token_uri=creds_data["token_uri"], | |
client_id=creds_data["client_id"], | |
client_secret=creds_data["client_secret"], | |
scopes=creds_data["scopes"] | |
) | |
def get_current_user_from_cookie(request: Request): | |
token = request.cookies.get("token") | |
if not token: | |
raise HTTPException(status_code=401, detail="Not authenticated") | |
user = UserDatabase.get_user(token) | |
if not user: | |
raise HTTPException(status_code=401, detail="Invalid authentication credentials") | |
return user | |
async def list_channels(request: Request): | |
"""Return the user’s list of YouTube channels.""" | |
creds = get_credentials_from_session(request.session) | |
if not creds or not creds.valid: | |
raise HTTPException(status_code=401, detail="Unauthorized. Please /login first.") | |
youtube = build("youtube", "v3", credentials=creds) | |
response = youtube.channels().list( | |
part="id,snippet", | |
mine=True | |
).execute() | |
channels = [] | |
for item in response.get("items", []): | |
channels.append({ | |
"channelId": item["id"], | |
"title": item["snippet"]["title"] | |
}) | |
return {"channels": channels} | |
async def list_videos(request: Request, channel_id: str): | |
"""List videos for the specified channel.""" | |
creds = get_credentials_from_session(request.session) | |
if not creds or not creds.valid: | |
raise HTTPException(status_code=401, detail="Unauthorized. Please /login first.") | |
youtube = build("youtube", "v3", credentials=creds) | |
# Example: listing videos from a channel’s "uploads" playlist | |
# 1) Retrieve the uploads playlist from channel | |
channel_response = youtube.channels().list( | |
part="contentDetails", | |
id=channel_id | |
).execute() | |
uploads_playlist_id = channel_response["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"] | |
# 2) Retrieve items from the uploads playlist | |
playlist_items_response = youtube.playlistItems().list( | |
part="snippet", | |
playlistId=uploads_playlist_id, | |
maxResults=10 | |
).execute() | |
videos = [] | |
for item in playlist_items_response.get("items", []): | |
snippet = item["snippet"] | |
videos.append({ | |
"videoId": snippet["resourceId"]["videoId"], | |
"title": snippet["title"] | |
}) | |
return {"videos": videos} | |
async def api_keep_comment( | |
request: Request, | |
comment_id: str, | |
video_id: str | |
): | |
try: | |
logging.debug(f"Received keep request for comment_id: {comment_id}, video_id: {video_id}") | |
# Get current user's credentials | |
current_user = get_current_user_from_cookie(request) | |
logging.debug(f"Current user: {current_user.username}") | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
# Create a moderator instance with user credentials | |
user_moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
logging.debug("Setting moderation status to 'published' on YouTube...") | |
# Mark comment as approved on YouTube | |
result = user_moderator.youtube_service.comments().setModerationStatus( | |
id=comment_id, | |
moderationStatus="published" # This marks the comment as approved | |
).execute() | |
logging.debug(f"YouTube API response: {result}") | |
# Add the comment ID to the manual overrides so it won't be reflagged | |
keep_comment(comment_id, video_id) # Ensure this function is defined and working | |
logging.debug("Manual override saved for comment.") | |
return {"success": True, "message": "Comment kept successfully"} | |
except Exception as e: | |
logging.error(f"Error keeping comment: {e}", exc_info=True) | |
return {"success": False, "error": str(e)} | |
async def list_videos(request: Request, current_user: User = Depends(get_current_user_from_cookie)): | |
# Reconstruct the credentials from the stored dictionary | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
user_moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", credentials=user_creds | |
) | |
videos = user_moderator.get_channel_videos() | |
return templates.TemplateResponse("videos.html", { | |
"request": request, | |
"current_user": current_user, | |
"videos": videos | |
}) | |
async def refresh_video_comments( | |
request: Request, | |
video_id: str, | |
threshold: float = 0.55 | |
): | |
""" | |
Refresh comments for a specific video, reapplying moderation. | |
:param request: Request object | |
:param video_id: ID of the video to refresh comments for | |
:param threshold: Gambling confidence threshold | |
:return: Rendered template with updated comments | |
""" | |
# Get current user's credentials | |
current_user = get_current_user_from_cookie(request) | |
if not current_user: | |
return RedirectResponse(url="/login", status_code=303) | |
try: | |
# Recreate moderator with current user's credentials | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
user_moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
# Moderate comments for the video | |
result = user_moderator.moderate_video_comments(video_id, threshold) | |
# Fetch video details to pass to template | |
youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
video_request = youtube_service.videos().list( | |
part="snippet", | |
id=video_id | |
) | |
video_response = video_request.execute() | |
video_info = video_response['items'][0]['snippet'] if video_response['items'] else {} | |
return templates.TemplateResponse("video_comments.html", { | |
"request": request, | |
"video": { | |
"id": video_id, | |
"title": video_info.get('title', 'Unknown Video') | |
}, | |
"safe_comments": result.get('safe_comments', []), | |
"flagged_comments": result.get('gambling_comments', []), | |
"total_comments": result.get('total_comments', 0) | |
}) | |
except Exception as e: | |
logging.error(f"Error refreshing comments: {e}") | |
return templates.TemplateResponse("error.html", { | |
"request": request, | |
"error": f"Failed to refresh comments: {str(e)}" | |
}) | |
async def moderate_video(request: Request, video_id: str = Form(...), threshold: float = Form(0.55)): | |
if not user_moderator.youtube_service: | |
result = {"error": "YouTube service not authenticated. Please authenticate first."} | |
else: | |
result = user_moderator.moderate_video_comments(video_id, threshold) | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"result": result, | |
"video_id": video_id, | |
"rules": { | |
"platform": sorted(list(filter_instance._platform_names)), | |
"gambling_term": sorted(list(filter_instance._gambling_terms)), | |
"safe_indicator": sorted(list(filter_instance._safe_indicators)), | |
"gambling_context": sorted(list(filter_instance._gambling_contexts)), | |
"ambiguous_term": sorted(list(filter_instance._ambiguous_terms)) | |
} | |
}) | |
async def api_delete_comment( | |
request: Request, | |
comment_id: str, | |
video_id: str | |
): | |
current_user = get_current_user_from_cookie(request) | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
user_moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
success = user_moderator.delete_comment(comment_id) | |
return {"success": success} | |
async def moderate_video_comments( | |
request: Request, | |
video_id: str, | |
current_user: User = Depends(get_current_user_from_cookie) | |
): | |
# Reconstruct the Credentials object from the stored dict | |
user_creds = Credentials.from_authorized_user_info(current_user.youtube_credentials) | |
user_moderator.youtube_service = googleapiclient.discovery.build( | |
"youtube", "v3", | |
credentials=user_creds | |
) | |
moderation_results = user_moderator.moderate_video_comments(video_id) | |
return templates.TemplateResponse("video_comments.html", { | |
"request": request, | |
"current_user": current_user, | |
"video": {"id": video_id, "title": "Sample Video Title"}, # Optionally fetch actual title | |
"safe_comments": moderation_results.get('safe_comments', []), | |
"flagged_comments": moderation_results.get('gambling_comments', []) | |
}) | |