smartdocai / app.py
FatimaGr's picture
add
ec80e4b verified
raw
history blame
15.9 kB
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.staticfiles import StaticFiles
from fastapi.responses import RedirectResponse
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from transformers import pipeline, M2M100ForConditionalGeneration, M2M100Tokenizer, MarianMTModel, MarianTokenizer
import shutil
#
import os
import logging
from PyPDF2 import PdfReader
import docx
from PIL import Image
import openpyxl # 📌 Pour lire les fichiers Excel (.xlsx)
from pptx import Presentation
import fitz # PyMuPDF
import io
from docx import Document
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import re
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM
from fastapi.responses import FileResponse
import os
from fastapi.middleware.cors import CORSMiddleware
import matplotlib
matplotlib.use('Agg')
import re
import torch
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoTokenizer, AutoModelForCausalLM
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import FileResponse
import os
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from transformers import pipeline, M2M100ForConditionalGeneration, M2M100Tokenizer
import shutil
import os
import logging
from fastapi.middleware.cors import CORSMiddleware
from PyPDF2 import PdfReader
import docx
from PIL import Image # Pour ouvrir les images avant analyse
from transformers import MarianMTModel, MarianTokenizer
import os
import fitz
from transformers import M2M100ForConditionalGeneration, M2M100Tokenizer
import logging
import openpyxl
from fastapi.responses import FileResponse, RedirectResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from io import BytesIO
from pdfminer.high_level import extract_text
from docx import Document
import pandas as pd
from pptx import Presentation
import logging
from transformers import pipeline
from PIL import Image
import io
import docx2txt
from fastapi.responses import StreamingResponse
from io import BytesIO
import base64
# Configuration du logging
logging.basicConfig(level=logging.INFO)
app = FastAPI()
# Configuration CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
model_name = "facebook/m2m100_418M"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
# Fonction pour extraire le texte
def extract_text_from_pdf(file):
doc = fitz.open(stream=file.file.read(), filetype="pdf")
return "\n".join([page.get_text() for page in doc]).strip()
def extract_text_from_docx(file):
doc = Document(io.BytesIO(file.file.read()))
return "\n".join([para.text for para in doc.paragraphs]).strip()
def extract_text_from_pptx(file):
prs = Presentation(io.BytesIO(file.file.read()))
return "\n".join([shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text")]).strip()
def extract_text_from_excel(file):
wb = openpyxl.load_workbook(io.BytesIO(file.file.read()), data_only=True)
text = [str(cell) for sheet in wb.worksheets for row in sheet.iter_rows(values_only=True) for cell in row if cell]
return "\n".join(text).strip()
@app.post("/translate/")
async def translate_document(file: UploadFile = File(...), target_lang: str = Form(...)):
"""API pour traduire un document."""
try:
logging.info(f"📥 Fichier reçu : {file.filename}")
logging.info(f"🌍 Langue cible reçue : {target_lang}")
if model is None or tokenizer is None:
return JSONResponse(status_code=500, content={"error": "Modèle de traduction non chargé"})
# Extraction du texte
if file.filename.endswith(".pdf"):
text = extract_text_from_pdf(file)
elif file.filename.endswith(".docx"):
text = extract_text_from_docx(file)
elif file.filename.endswith(".pptx"):
text = extract_text_from_pptx(file)
elif file.filename.endswith(".xlsx"):
text = extract_text_from_excel(file)
else:
return JSONResponse(status_code=400, content={"error": "Format non supporté"})
logging.info(f"📜 Texte extrait : {text[:50]}...")
if not text:
return JSONResponse(status_code=400, content={"error": "Aucun texte trouvé dans le document"})
# Vérifier si la langue cible est supportée
target_lang_id = tokenizer.get_lang_id(target_lang)
if target_lang_id is None:
return JSONResponse(
status_code=400,
content={"error": f"Langue cible '{target_lang}' non supportée. Langues disponibles : {list(tokenizer.lang_code_to_id.keys())}"}
)
# Traduction
tokenizer.src_lang = "fr"
encoded_text = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
logging.info(f"🔍 ID de la langue cible : {target_lang_id}")
generated_tokens = model.generate(**encoded_text, forced_bos_token_id=target_lang_id)
translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0]
logging.info(f"✅ Traduction réussie : {translated_text[:50]}...")
return {"translated_text": translated_text}
except Exception as e:
logging.error(f"❌ Erreur lors de la traduction : {e}")
return JSONResponse(status_code=500, content={"error": "Échec de la traduction"})
# Charger le modèle pour la génération de code
codegen_model_name = "Salesforce/codegen-350M-mono"
device = "cuda" if torch.cuda.is_available() else "cpu"
codegen_tokenizer = AutoTokenizer.from_pretrained(codegen_model_name)
codegen_model = AutoModelForCausalLM.from_pretrained(codegen_model_name).to(device)
VALID_PLOTS = {"histplot", "scatterplot", "barplot", "lineplot", "boxplot"}
print("hello")
@app.post("/generate_viz/")
async def generate_viz(file: UploadFile = File(...), query: str = Form(...)):
print("🔵 Début /generate_viz")
try:
if query not in VALID_PLOTS:
return JSONResponse(content={"error": f"Type de graphique invalide. Choisissez parmi : {', '.join(VALID_PLOTS)}"}, status_code=400)
file_content = await file.read()
df = pd.read_excel(BytesIO(file_content))
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) < 1:
return JSONResponse(content={"error": "Le fichier doit contenir au moins une colonne numérique."}, status_code=400)
x_col = numeric_cols[0]
y_col = numeric_cols[1] if query != "histplot" and len(numeric_cols) > 1 else None
# ➔ CONSTRUCTION du prompt avec encadrement ```python
prompt = f"""
### Génère uniquement du code Python fonctionnel pour tracer un {query} avec Matplotlib et Seaborn
```python
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(8,6))
sns.{query}(data=df, x="{x_col}"{f', y="{y_col}"' if y_col else ''})
plt.savefig("plot.png")
plt.close()
"""
print("🟣 Prompt envoyé au modèle :")
print(prompt)
inputs = codegen_tokenizer(prompt, return_tensors="pt").to(device)
outputs = codegen_model.generate(
**inputs,
max_new_tokens=150,
pad_token_id=codegen_tokenizer.eos_token_id
)
generated_code = codegen_tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
# Nettoyage : retirer tout ce qui n'est pas du vrai code
generated_code = re.sub(r"^.*?```python", "", generated_code, flags=re.DOTALL)
generated_code = re.sub(r"```.*?$", "", generated_code, flags=re.DOTALL).strip()
print("🔵 Code généré propre :")
print(generated_code)
if not generated_code.strip():
return JSONResponse(content={"error": "Erreur : Code généré vide."}, status_code=500)
try:
compile(generated_code, "<string>", "exec")
except SyntaxError as e:
return JSONResponse(content={"error": f"Erreur de syntaxe détectée : {e}\nCode généré :\n{generated_code}"}, status_code=422)
exec_env = {"df": df, "plt": plt, "sns": sns, "pd": pd}
print("🔹🔹🔹 Code réellement exécuté :")
exec(generated_code, exec_env)
img_path = "plot.png"
if not os.path.exists(img_path):
return JSONResponse(content={"error": "Le fichier plot.png n'a pas été généré."}, status_code=500)
if os.path.getsize(img_path) == 0:
return JSONResponse(content={"error": "Le fichier plot.png est vide."}, status_code=500)
with open(img_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
print("🟢 Génération réussie ✅")
return JSONResponse(content={"image_base64": encoded_string})
except Exception as e:
print(f"🔴 Erreur serveur : {e}")
return JSONResponse(content={"error": f"Erreur lors de la génération du graphique : {str(e)}"}, status_code=500)
# Charger le modèle de résumé
summarizer = None
try:
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
logging.info("✅ Modèle de résumé chargé avec succès !")
except Exception as e:
logging.error(f"❌ Erreur chargement modèle résumé : {e}")
try:
image_captioning = pipeline("image-to-text", model="nlpconnect/vit-gpt2-image-captioning")
logging.info("✅ Modèle d'image chargé avec succès !")
except Exception as e:
image_captioning = None
logging.error(f"❌ Erreur chargement modèle image : {e}")
# Fonction pour extraire le texte d'un fichier Word
def extract_text_from_docx(docx_file):
doc = Document(BytesIO(docx_file))
text = "\n".join([para.text for para in doc.paragraphs])
return text
# Fonction pour extraire le texte d'un fichier Excel
def extract_text_from_excel(xlsx_file):
# Utiliser pandas pour lire le fichier Excel
df = pd.read_excel(BytesIO(xlsx_file))
text = df.to_string(index=False)
return text
# Fonction pour extraire le texte d'un fichier PowerPoint
def extract_text_from_pptx(pptx_file):
presentation = Presentation(BytesIO(pptx_file))
text = ""
for slide in presentation.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
text += shape.text + "\n"
return text
# Endpoint pour la fonctionnalité de résumé
@app.post("/summarize/")
async def summarize(file: UploadFile = File(...)):
# Si le modèle n'est pas encore chargé, retourner un message indiquant que le modèle est en train de se charger
if summarizer is None:
return {"message": "Le modèle est en cours de chargement, veuillez patienter..."}
# Extraire le contenu du fichier téléchargé
contents = await file.read()
# Identifier le type de fichier et extraire le texte
if file.filename.endswith(".pdf"):
text = extract_text(BytesIO(contents))
elif file.filename.endswith(".docx"):
text = extract_text_from_docx(contents)
elif file.filename.endswith(".xls") or file.filename.endswith(".xlsx"):
text = extract_text_from_excel(contents)
elif file.filename.endswith(".pptx") or file.filename.endswith(".ppt"):
text = extract_text_from_pptx(contents)
else:
return {"summary": "Résumé non disponible pour ce format de fichier."}
# Si un modèle de résumé est chargé, effectuer le résumé
try:
if summarizer:
summary = summarizer(text[:1024]) # Limiter la taille d'entrée pour le modèle
summary_text = summary[0]['summary_text']
else:
summary_text = "❌ Modèle de résumé non disponible."
except Exception as e:
summary_text = f"❌ Erreur lors de la génération du résumé : {e}"
# Retourner le résumé généré
return {"summary": summary_text}
@app.post("/image-caption/")
async def caption_image(file: UploadFile = File(...)):
if image_captioning is None:
return JSONResponse(content={"error": "Le modèle de captioning n'est pas disponible."}, status_code=500)
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert("RGB")
result = image_captioning(image)
caption = result[0]['generated_text']
return {"caption": caption}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
try:
qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2")
logging.info("✅ Modèle QA Texte chargé avec succès !")
except Exception as e:
qa_pipeline = None
logging.error(f"❌ Erreur chargement modèle QA Texte : {e}")
try:
image_qa_pipeline = pipeline("visual-question-answering", model="Salesforce/blip-vqa-base")
logging.info("✅ Modèle QA Image chargé avec succès !")
except Exception as e:
image_qa_pipeline = None
logging.error(f"❌ Erreur chargement modèle QA Image : {e}")
@app.post("/doc-qa/")
async def doc_question_answer(file: UploadFile = File(...), question: str = Form(...)):
if qa_pipeline is None:
return JSONResponse(content={"error": "Modèle indisponible."}, status_code=500)
try:
contents = await file.read()
filename = file.filename.lower()
if filename.endswith(".docx"):
with open("temp.docx", "wb") as f:
f.write(contents)
context = docx2txt.process("temp.docx")
elif filename.endswith((".xlsx", ".xls")):
df = pd.read_excel(BytesIO(contents))
context = df.to_string(index=False)
elif filename.endswith(".pptx"):
presentation = Presentation(BytesIO(contents))
context = ""
for slide in presentation.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
context += shape.text + "\n"
elif filename.endswith(".pdf"):
context = extract_text(BytesIO(contents))
else:
return JSONResponse(content={"error": "Format non supporté."}, status_code=400)
result = qa_pipeline(question=question, context=context)
return {"answer": result["answer"]}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/image-qa/")
async def image_qa(file: UploadFile = File(...), question: str = Form(...)):
if image_qa_pipeline is None:
return JSONResponse(content={"error": "Le modèle n'est pas disponible."}, status_code=500)
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert("RGB")
result = image_qa_pipeline(image=image, question=question)
answer = result[0]['answer']
return {"answer": answer}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
# Servir les fichiers statiques (HTML, CSS, JS)
app.mount("/static", StaticFiles(directory="static", html=True), name="static")
@app.get("/")
async def root():
return RedirectResponse(url="/static/principal.html")