|
|
|
|
|
from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks |
|
from fastapi.responses import JSONResponse |
|
from sentence_transformers import SentenceTransformer |
|
from transformers import RobertaForSequenceClassification, AutoTokenizer |
|
from PyPDF2 import PdfReader |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import torch |
|
import os |
|
import numpy as np |
|
import shutil |
|
import uuid |
|
import tempfile |
|
import logging |
|
import time |
|
from typing import Dict, Any |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = FastAPI( |
|
title="Essay Analysis API", |
|
description="API for AI Content Detection and Plagiarism Checking", |
|
version="1.0.0", |
|
docs_url="/docs", |
|
redoc_url="/redoc" |
|
) |
|
|
|
|
|
CACHE_DIR = "/tmp/cache" |
|
PLAGIARISM_THRESHOLD = 0.85 |
|
MAX_TEXT_LENGTH = 512 |
|
MODEL_NAME = "Essay-Grader/roberta-ai-detector-20250401_232702" |
|
SENTENCE_MODEL = "sentence-transformers/all-roberta-large-v1" |
|
|
|
|
|
model_status = { |
|
"model_loaded": False, |
|
"last_error": None, |
|
"last_reload_attempt": None, |
|
"retry_count": 0 |
|
} |
|
|
|
|
|
embedder = None |
|
ai_tokenizer = None |
|
ai_model = None |
|
|
|
def initialize_models(): |
|
"""Initialize ML models with error handling and retry logic""" |
|
global embedder, ai_tokenizer, ai_model |
|
|
|
try: |
|
|
|
logger.info("Loading sentence transformer model...") |
|
embedder = SentenceTransformer( |
|
SENTENCE_MODEL, |
|
cache_folder=CACHE_DIR |
|
) |
|
|
|
|
|
logger.info(f"Loading AI detection model: {MODEL_NAME}") |
|
ai_tokenizer = AutoTokenizer.from_pretrained( |
|
MODEL_NAME, |
|
cache_dir=CACHE_DIR, |
|
use_fast=True |
|
) |
|
|
|
|
|
ai_model = RobertaForSequenceClassification.from_pretrained( |
|
MODEL_NAME, |
|
cache_dir=CACHE_DIR, |
|
device_map="auto" if torch.cuda.is_available() else None, |
|
trust_remote_code=True |
|
) |
|
|
|
|
|
test_input = ai_tokenizer( |
|
"Model initialization text " * 20, |
|
return_tensors="pt", |
|
max_length=MAX_TEXT_LENGTH, |
|
truncation=True, |
|
padding=True |
|
) |
|
with torch.no_grad(): |
|
|
|
if hasattr(ai_model, "device"): |
|
test_input = {k: v.to(ai_model.device) for k, v in test_input.items()} |
|
ai_model(**test_input) |
|
|
|
logger.info("All models loaded successfully") |
|
model_status.update({ |
|
"model_loaded": True, |
|
"last_error": None |
|
}) |
|
return True |
|
|
|
except Exception as e: |
|
error_msg = f"Model initialization failed: {str(e)}" |
|
logger.error(error_msg) |
|
model_status.update({ |
|
"last_error": error_msg, |
|
"model_loaded": False |
|
}) |
|
return False |
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
"""Application startup with retry logic""" |
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
max_retries = 3 |
|
|
|
while model_status["retry_count"] < max_retries: |
|
if initialize_models(): |
|
model_status.update({ |
|
"model_loaded": True, |
|
"retry_count": 0 |
|
}) |
|
return |
|
model_status["retry_count"] += 1 |
|
logger.warning(f"Retry attempt {model_status['retry_count']}/{max_retries}") |
|
time.sleep(5) |
|
|
|
logger.critical("Failed to initialize models after multiple attempts") |
|
|
|
def extract_text_from_pdf(pdf_path: str) -> str: |
|
"""Extract and concatenate text from PDF""" |
|
try: |
|
reader = PdfReader(pdf_path) |
|
return " ".join(page.extract_text() or "" for page in reader.pages) |
|
except Exception as e: |
|
logger.error(f"PDF extraction error: {str(e)}") |
|
raise RuntimeError("Failed to extract text from PDF") |
|
|
|
def chunk_text(text: str, chunk_size: int = 5) -> list: |
|
"""Split text into coherent chunks""" |
|
sentences = [s.strip() for s in text.split('.') if s.strip()] |
|
chunks = [] |
|
for i in range(0, len(sentences), chunk_size): |
|
chunk = '. '.join(sentences[i:i+chunk_size]) + '.' |
|
chunks.append(chunk) |
|
return chunks |
|
|
|
def analyze_ai_content(text: str) -> Dict[str, float]: |
|
"""Analyze text for AI-generated content""" |
|
try: |
|
inputs = ai_tokenizer( |
|
text, |
|
truncation=True, |
|
padding=True, |
|
return_tensors="pt", |
|
max_length=MAX_TEXT_LENGTH |
|
) |
|
|
|
|
|
device = next(ai_model.parameters()).device |
|
inputs = {k: v.to(device) for k, v in inputs.items()} |
|
|
|
with torch.no_grad(): |
|
outputs = ai_model(**inputs) |
|
probs = torch.softmax(outputs.logits, dim=1).squeeze() |
|
|
|
return { |
|
"human_written": round(probs[0].item() * 100, 2), |
|
"ai_generated": round(probs[1].item() * 100, 2) |
|
} |
|
except Exception as e: |
|
logger.error(f"AI analysis failed: {str(e)}") |
|
raise RuntimeError("Failed to analyze text content") |
|
|
|
def calculate_plagiarism_score(chunks: list) -> float: |
|
"""Calculate plagiarism percentage using similarity analysis""" |
|
if len(chunks) < 2: |
|
return 0.0 |
|
|
|
embeddings = embedder.encode(chunks) |
|
similarity_matrix = cosine_similarity(embeddings) |
|
np.fill_diagonal(similarity_matrix, 0) |
|
|
|
similar_pairs = np.sum(similarity_matrix > PLAGIARISM_THRESHOLD) |
|
total_possible = len(chunks) * (len(chunks) - 1) // 2 |
|
|
|
return round((similar_pairs / total_possible) * 100, 2) if total_possible else 0.0 |
|
|
|
@app.post("/analyze") |
|
async def analyze_document( |
|
file: UploadFile = File(...), |
|
background_tasks: BackgroundTasks = None |
|
) -> Dict[str, Any]: |
|
"""Main analysis endpoint""" |
|
if not model_status["model_loaded"]: |
|
raise HTTPException( |
|
status_code=503, |
|
detail="Service unavailable - models not loaded" |
|
) |
|
|
|
if not file.filename.lower().endswith(".pdf"): |
|
raise HTTPException(400, "Only PDF files are supported") |
|
|
|
try: |
|
with tempfile.TemporaryDirectory() as tmp_dir: |
|
|
|
file_path = os.path.join(tmp_dir, f"{uuid.uuid4()}.pdf") |
|
with open(file_path, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
text = extract_text_from_pdf(file_path) |
|
if not text.strip(): |
|
raise HTTPException(400, "No text found in document") |
|
|
|
|
|
ai_result = analyze_ai_content(text) |
|
chunks = chunk_text(text) |
|
plagiarism_score = calculate_plagiarism_score(chunks) |
|
|
|
return { |
|
"analysis": { |
|
"ai_detection": ai_result, |
|
"plagiarism_score": plagiarism_score |
|
}, |
|
"status": "success" |
|
} |
|
|
|
except HTTPException: |
|
raise |
|
except Exception as e: |
|
logger.error(f"Analysis pipeline failed: {str(e)}") |
|
raise HTTPException(500, f"Analysis failed: {str(e)}") |
|
|
|
@app.post("/reload-models") |
|
async def reload_models(background_tasks: BackgroundTasks): |
|
"""Model reload endpoint""" |
|
background_tasks.add_task(initialize_models) |
|
return {"status": "reload-initiated", "message": "Model reload in progress"} |
|
|
|
@app.get("/health") |
|
async def health_check() -> Dict[str, Any]: |
|
"""System health endpoint""" |
|
return { |
|
"status": "operational" if model_status["model_loaded"] else "degraded", |
|
"model_loaded": model_status["model_loaded"], |
|
"last_error": model_status["last_error"], |
|
"retry_count": model_status["retry_count"] |
|
} |
|
|
|
@app.get("/") |
|
async def root(): |
|
return { |
|
"""Root endpoint""" |
|
"service": "Essay Analysis API", |
|
"version": "1.0.0", |
|
"endpoints": ["/analyze", "/health", "/reload-models"] |
|
} |
|
|