Spaces:
Running
Running
from fastapi import FastAPI, File, UploadFile, Form, HTTPException | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import JSONResponse, FileResponse | |
import uvicorn | |
import os | |
from typing import List, Optional | |
import shutil | |
from pathlib import Path | |
import uuid | |
# Import AI functionality modules | |
from modules.document_processor import process_document | |
from modules.image_processor import process_image | |
from modules.question_answering import process_question | |
from modules.data_visualization import generate_visualization | |
from modules.translator import translate_document | |
# Create FastAPI app | |
app = FastAPI(title="AI Document Analysis API") | |
# Create upload directory if it doesn't exist | |
UPLOAD_DIR = Path("uploads") | |
UPLOAD_DIR.mkdir(exist_ok=True) | |
# Mount static files | |
app.mount("/static", StaticFiles(directory="frontend"), name="static") | |
# Serve index.html at root | |
async def read_root(): | |
return FileResponse("frontend/index.html") | |
async def analyze_document( | |
file: UploadFile = File(...), | |
analysis_type: str = Form(...) | |
): | |
# Validate file type | |
allowed_extensions = [".pdf", ".docx", ".pptx", ".xlsx", ".xls"] | |
file_ext = os.path.splitext(file.filename)[1].lower() | |
if file_ext not in allowed_extensions: | |
raise HTTPException(status_code=400, detail=f"File type not supported. Allowed types: {', '.join(allowed_extensions)}") | |
# Create unique filename | |
unique_filename = f"{uuid.uuid4()}{file_ext}" | |
file_path = UPLOAD_DIR / unique_filename | |
# Save uploaded file | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
try: | |
# Process document based on analysis type | |
if analysis_type == "summarize": | |
result = process_document(str(file_path), "summarize") | |
else: | |
raise HTTPException(status_code=400, detail="Invalid analysis type") | |
# Return results | |
return { | |
"filename": file.filename, | |
"analysis_type": analysis_type, | |
"result": result | |
} | |
except Exception as e: | |
# Clean up file on error | |
if file_path.exists(): | |
os.remove(file_path) | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def analyze_image( | |
file: UploadFile = File(...), | |
analysis_type: str = Form(...) | |
): | |
# Validate file type | |
allowed_extensions = [".jpg", ".jpeg", ".png"] | |
file_ext = os.path.splitext(file.filename)[1].lower() | |
if file_ext not in allowed_extensions: | |
raise HTTPException(status_code=400, detail=f"File type not supported. Allowed types: {', '.join(allowed_extensions)}") | |
# Create unique filename | |
unique_filename = f"{uuid.uuid4()}{file_ext}" | |
file_path = UPLOAD_DIR / unique_filename | |
# Save uploaded file | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
try: | |
# Process image based on analysis type | |
if analysis_type == "caption": | |
result = process_image(str(file_path), "caption") | |
else: | |
raise HTTPException(status_code=400, detail="Invalid analysis type") | |
# Return results | |
return { | |
"filename": file.filename, | |
"analysis_type": analysis_type, | |
"result": result | |
} | |
except Exception as e: | |
# Clean up file on error | |
if file_path.exists(): | |
os.remove(file_path) | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def question_answering( | |
file: UploadFile = File(...), | |
question: str = Form(...) | |
): | |
# Validate file type | |
allowed_extensions = [".pdf", ".docx", ".pptx", ".xlsx", ".xls", ".jpg", ".jpeg", ".png"] | |
file_ext = os.path.splitext(file.filename)[1].lower() | |
if file_ext not in allowed_extensions: | |
raise HTTPException(status_code=400, detail=f"File type not supported. Allowed types: {', '.join(allowed_extensions)}") | |
# Create unique filename | |
unique_filename = f"{uuid.uuid4()}{file_ext}" | |
file_path = UPLOAD_DIR / unique_filename | |
# Save uploaded file | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
try: | |
# Process question | |
result = process_question(str(file_path), question) | |
# Return results | |
return { | |
"filename": file.filename, | |
"question": question, | |
"answer": result | |
} | |
except Exception as e: | |
# Clean up file on error | |
if file_path.exists(): | |
os.remove(file_path) | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def visualize_data( | |
file: UploadFile = File(...), | |
request: str = Form(...) | |
): | |
# Validate file type | |
allowed_extensions = [".xlsx", ".xls"] | |
file_ext = os.path.splitext(file.filename)[1].lower() | |
if file_ext not in allowed_extensions: | |
raise HTTPException(status_code=400, detail=f"File type not supported. Allowed types: {', '.join(allowed_extensions)}") | |
# Create unique filename | |
unique_filename = f"{uuid.uuid4()}{file_ext}" | |
file_path = UPLOAD_DIR / unique_filename | |
# Save uploaded file | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
try: | |
# Generate visualization | |
result = generate_visualization(str(file_path), request) | |
# Return results | |
return { | |
"filename": file.filename, | |
"request": request, | |
"visualization": result | |
} | |
except Exception as e: | |
# Clean up file on error | |
if file_path.exists(): | |
os.remove(file_path) | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def translate_document_endpoint( | |
file: UploadFile = File(...), | |
target_language: str = Form(...) | |
): | |
# Validate file type | |
allowed_extensions = [".pdf", ".docx", ".pptx", ".xlsx", ".xls"] | |
file_ext = os.path.splitext(file.filename)[1].lower() | |
if file_ext not in allowed_extensions: | |
raise HTTPException(status_code=400, detail=f"File type not supported. Allowed types: {', '.join(allowed_extensions)}") | |
# Create unique filename | |
unique_filename = f"{uuid.uuid4()}{file_ext}" | |
file_path = UPLOAD_DIR / unique_filename | |
# Save uploaded file | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
try: | |
# Translate document | |
result = translate_document(str(file_path), target_language) | |
# Return results | |
return { | |
"filename": file.filename, | |
"target_language": target_language, | |
"translation": result | |
} | |
except Exception as e: | |
# Clean up file on error | |
if file_path.exists(): | |
os.remove(file_path) | |
raise HTTPException(status_code=500, detail=str(e)) | |
# Add health check endpoint | |
def health_check(): | |
return {"status": "healthy"} | |
if __name__ == "__main__": | |
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |