smartdocai / app.py
malaknihed's picture
Update app.py
2966e05 verified
raw
history blame
14.9 kB
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.staticfiles import StaticFiles
from fastapi.responses import RedirectResponse
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from transformers import pipeline, M2M100ForConditionalGeneration, M2M100Tokenizer, MarianMTModel, MarianTokenizer
import shutil
#
import os
import logging
from PyPDF2 import PdfReader
import docx
from PIL import Image
import openpyxl
from pptx import Presentation
import fitz
import io
from docx import Document
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import re
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM
from fastapi.responses import FileResponse
import os
from fastapi.middleware.cors import CORSMiddleware
import matplotlib
matplotlib.use('Agg')
import re
import torch
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoTokenizer, AutoModelForCausalLM
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import FileResponse
import os
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from transformers import pipeline, M2M100ForConditionalGeneration, M2M100Tokenizer
import shutil
import os
import logging
from fastapi.middleware.cors import CORSMiddleware
from PyPDF2 import PdfReader
import docx
from PIL import Image # Pour ouvrir les images avant analyse
from transformers import MarianMTModel, MarianTokenizer
import os
import fitz
from transformers import M2M100ForConditionalGeneration, M2M100Tokenizer
import logging
import openpyxl
from fastapi.responses import FileResponse, RedirectResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from io import BytesIO
from pdfminer.high_level import extract_text
from docx import Document
import pandas as pd
from pptx import Presentation
import logging
from transformers import pipeline
from PIL import Image
import io
import docx2txt
from fastapi.responses import StreamingResponse
from io import BytesIO
import base64
# Configuration du logging
logging.basicConfig(level=logging.INFO)
app = FastAPI()
# Configuration CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
model_name = "facebook/m2m100_418M"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
# Fonction pour extraire le texte
def extract_text_from_pdf(file):
doc = fitz.open(stream=file.file.read(), filetype="pdf")
return "\n".join([page.get_text() for page in doc]).strip()
def extract_text_from_docx(file):
doc = Document(io.BytesIO(file.file.read()))
return "\n".join([para.text for para in doc.paragraphs]).strip()
def extract_text_from_pptx(file):
prs = Presentation(io.BytesIO(file.file.read()))
return "\n".join([shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text")]).strip()
def extract_text_from_excel(file):
wb = openpyxl.load_workbook(io.BytesIO(file.file.read()), data_only=True)
text = [str(cell) for sheet in wb.worksheets for row in sheet.iter_rows(values_only=True) for cell in row if cell]
return "\n".join(text).strip()
@app.post("/translate/")
async def translate_document(file: UploadFile = File(...), target_lang: str = Form(...)):
"""API pour traduire un document."""
try:
logging.info(f"📥 Fichier reçu : {file.filename}")
logging.info(f"🌍 Langue cible reçue : {target_lang}")
if model is None or tokenizer is None:
return JSONResponse(status_code=500, content={"error": "Modèle de traduction non chargé"})
# Extraction du texte
if file.filename.endswith(".pdf"):
text = extract_text_from_pdf(file)
elif file.filename.endswith(".docx"):
text = extract_text_from_docx(file)
elif file.filename.endswith(".pptx"):
text = extract_text_from_pptx(file)
elif file.filename.endswith(".xlsx"):
text = extract_text_from_excel(file)
else:
return JSONResponse(status_code=400, content={"error": "Format non supporté"})
logging.info(f"📜 Texte extrait : {text[:50]}...")
if not text:
return JSONResponse(status_code=400, content={"error": "Aucun texte trouvé dans le document"})
target_lang_id = tokenizer.get_lang_id(target_lang)
if target_lang_id is None:
return JSONResponse(
status_code=400,
content={"error": f"Langue cible '{target_lang}' non supportée. Langues disponibles : {list(tokenizer.lang_code_to_id.keys())}"}
)
tokenizer.src_lang = "fr"
encoded_text = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
logging.info(f"🔍 ID de la langue cible : {target_lang_id}")
generated_tokens = model.generate(**encoded_text, forced_bos_token_id=target_lang_id)
translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0]
logging.info(f"✅ Traduction réussie : {translated_text[:50]}...")
return {"translated_text": translated_text}
except Exception as e:
logging.error(f"❌ Erreur lors de la traduction : {e}")
return JSONResponse(status_code=500, content={"error": "Échec de la traduction"})
codegen_model_name = "Salesforce/codegen-350M-mono"
device = "cuda" if torch.cuda.is_available() else "cpu"
codegen_tokenizer = AutoTokenizer.from_pretrained(codegen_model_name)
codegen_model = AutoModelForCausalLM.from_pretrained(codegen_model_name).to(device)
VALID_PLOTS = {"histplot", "scatterplot", "barplot", "lineplot", "boxplot"}
print("hello")
@app.post("/generate_viz/")
async def generate_viz(file: UploadFile = File(...), query: str = Form(...)):
print("🔵 Début /generate_viz")
try:
if query not in VALID_PLOTS:
return JSONResponse(content={"error": f"Type de graphique invalide. Choisissez parmi : {', '.join(VALID_PLOTS)}"}, status_code=400)
file_content = await file.read()
df = pd.read_excel(BytesIO(file_content))
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) < 1:
return JSONResponse(content={"error": "Le fichier doit contenir au moins une colonne numérique."}, status_code=400)
x_col = numeric_cols[0]
y_col = numeric_cols[1] if query != "histplot" and len(numeric_cols) > 1 else None
prompt = f"""
### Génère uniquement du code Python fonctionnel pour tracer un {query} avec Matplotlib et Seaborn
```python
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(8,6))
sns.{query}(data=df, x="{x_col}"{f', y="{y_col}"' if y_col else ''})
plt.savefig("plot.png")
plt.close()
"""
print("🟣 Prompt envoyé au modèle :")
print(prompt)
inputs = codegen_tokenizer(prompt, return_tensors="pt").to(device)
outputs = codegen_model.generate(
**inputs,
max_new_tokens=150,
pad_token_id=codegen_tokenizer.eos_token_id
)
generated_code = codegen_tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
# Nettoyage : retirer tout ce qui n'est pas du vrai code
generated_code = re.sub(r"^.*?```python", "", generated_code, flags=re.DOTALL)
generated_code = re.sub(r"```.*?$", "", generated_code, flags=re.DOTALL).strip()
print("🔵 Code généré propre :")
print(generated_code)
if not generated_code.strip():
return JSONResponse(content={"error": "Erreur : Code généré vide."}, status_code=500)
try:
compile(generated_code, "<string>", "exec")
except SyntaxError as e:
return JSONResponse(content={"error": f"Erreur de syntaxe détectée : {e}\nCode généré :\n{generated_code}"}, status_code=422)
exec_env = {"df": df, "plt": plt, "sns": sns, "pd": pd}
print("🔹🔹🔹 Code réellement exécuté :")
exec(generated_code, exec_env)
img_path = "plot.png"
if not os.path.exists(img_path):
return JSONResponse(content={"error": "Le fichier plot.png n'a pas été généré."}, status_code=500)
if os.path.getsize(img_path) == 0:
return JSONResponse(content={"error": "Le fichier plot.png est vide."}, status_code=500)
with open(img_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
print("🟢 Génération réussie ✅")
return JSONResponse(content={"image_base64": encoded_string})
except Exception as e:
print(f"🔴 Erreur serveur : {e}")
return JSONResponse(content={"error": f"Erreur lors de la génération du graphique : {str(e)}"}, status_code=500)
summarizer = None
try:
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
logging.info("✅ Modèle de résumé chargé avec succès !")
except Exception as e:
logging.error(f"❌ Erreur chargement modèle résumé : {e}")
try:
image_captioning = pipeline("image-to-text", model="nlpconnect/vit-gpt2-image-captioning")
logging.info("✅ Modèle d'image chargé avec succès !")
except Exception as e:
image_captioning = None
logging.error(f"❌ Erreur chargement modèle image : {e}")
def extract_text_from_docx(docx_file):
doc = Document(BytesIO(docx_file))
text = "\n".join([para.text for para in doc.paragraphs])
return text
def extract_text_from_excel(xlsx_file):
df = pd.read_excel(BytesIO(xlsx_file))
text = df.to_string(index=False)
return text
def extract_text_from_pptx(pptx_file):
presentation = Presentation(BytesIO(pptx_file))
text = ""
for slide in presentation.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
text += shape.text + "\n"
return text
@app.post("/summarize/")
async def summarize(file: UploadFile = File(...)):
if summarizer is None:
return {"message": "Le modèle est en cours de chargement, veuillez patienter..."}
contents = await file.read()
if file.filename.endswith(".pdf"):
text = extract_text(BytesIO(contents))
elif file.filename.endswith(".docx"):
text = extract_text_from_docx(contents)
elif file.filename.endswith(".xls") or file.filename.endswith(".xlsx"):
text = extract_text_from_excel(contents)
elif file.filename.endswith(".pptx") or file.filename.endswith(".ppt"):
text = extract_text_from_pptx(contents)
else:
return {"summary": "Résumé non disponible pour ce format de fichier."}
try:
if summarizer:
summary = summarizer(text[:1024])
summary_text = summary[0]['summary_text']
else:
summary_text = "❌ Modèle de résumé non disponible."
except Exception as e:
summary_text = f"❌ Erreur lors de la génération du résumé : {e}"
return {"summary": summary_text}
@app.post("/image-caption/")
async def caption_image(file: UploadFile = File(...)):
if image_captioning is None:
return JSONResponse(content={"error": "Le modèle de captioning n'est pas disponible."}, status_code=500)
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert("RGB")
result = image_captioning(image)
caption = result[0]['generated_text']
return {"caption": caption}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
try:
qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2")
logging.info("✅ Modèle QA Texte chargé avec succès !")
except Exception as e:
qa_pipeline = None
logging.error(f"❌ Erreur chargement modèle QA Texte : {e}")
try:
image_qa_pipeline = pipeline("visual-question-answering", model="Salesforce/blip-vqa-base")
logging.info("✅ Modèle QA Image chargé avec succès !")
except Exception as e:
image_qa_pipeline = None
logging.error(f"❌ Erreur chargement modèle QA Image : {e}")
@app.post("/doc-qa/")
async def doc_question_answer(file: UploadFile = File(...), question: str = Form(...)):
if qa_pipeline is None:
return JSONResponse(content={"error": "Modèle indisponible."}, status_code=500)
try:
contents = await file.read()
filename = file.filename.lower()
if filename.endswith(".docx"):
with open("temp.docx", "wb") as f:
f.write(contents)
context = docx2txt.process("temp.docx")
elif filename.endswith((".xlsx", ".xls")):
df = pd.read_excel(BytesIO(contents))
context = df.to_string(index=False)
elif filename.endswith(".pptx"):
presentation = Presentation(BytesIO(contents))
context = ""
for slide in presentation.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
context += shape.text + "\n"
elif filename.endswith(".pdf"):
context = extract_text(BytesIO(contents))
else:
return JSONResponse(content={"error": "Format non supporté."}, status_code=400)
result = qa_pipeline(question=question, context=context)
return {"answer": result["answer"]}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
@app.post("/image-qa/")
async def image_qa(file: UploadFile = File(...), question: str = Form(...)):
if image_qa_pipeline is None:
return JSONResponse(content={"error": "Le modèle n'est pas disponible."}, status_code=500)
try:
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert("RGB")
result = image_qa_pipeline(image=image, question=question)
answer = result[0]['answer']
return {"answer": answer}
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
app.mount("/static", StaticFiles(directory="static", html=True), name="static")
@app.get("/")
async def root():
return RedirectResponse(url="/static/principal.html")