Spaces:
Running
Running
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import RedirectResponse | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import JSONResponse, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.middleware.cors import CORSMiddleware | |
from transformers import pipeline, M2M100ForConditionalGeneration, M2M100Tokenizer, MarianMTModel, MarianTokenizer | |
import shutil | |
# | |
import os | |
import logging | |
from PyPDF2 import PdfReader | |
import docx | |
from PIL import Image | |
import openpyxl | |
from pptx import Presentation | |
import fitz | |
import io | |
from docx import Document | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import torch | |
import re | |
import pandas as pd | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
from fastapi.responses import FileResponse | |
import os | |
from fastapi.middleware.cors import CORSMiddleware | |
import matplotlib | |
matplotlib.use('Agg') | |
import re | |
import torch | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import FileResponse | |
import os | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import JSONResponse, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
from transformers import pipeline, M2M100ForConditionalGeneration, M2M100Tokenizer | |
import shutil | |
import os | |
import logging | |
from fastapi.middleware.cors import CORSMiddleware | |
from PyPDF2 import PdfReader | |
import docx | |
from PIL import Image # Pour ouvrir les images avant analyse | |
from transformers import MarianMTModel, MarianTokenizer | |
import os | |
import fitz | |
from transformers import M2M100ForConditionalGeneration, M2M100Tokenizer | |
import logging | |
import openpyxl | |
from fastapi.responses import FileResponse, RedirectResponse, JSONResponse | |
from fastapi.staticfiles import StaticFiles | |
from io import BytesIO | |
from pdfminer.high_level import extract_text | |
from docx import Document | |
import pandas as pd | |
from pptx import Presentation | |
import logging | |
from transformers import pipeline | |
from PIL import Image | |
import io | |
import docx2txt | |
from fastapi.responses import StreamingResponse | |
from io import BytesIO | |
import base64 | |
# Configuration du logging | |
logging.basicConfig(level=logging.INFO) | |
app = FastAPI() | |
# Configuration CORS | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
UPLOAD_DIR = "uploads" | |
os.makedirs(UPLOAD_DIR, exist_ok=True) | |
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
model_name = "facebook/m2m100_418M" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForSeq2SeqLM.from_pretrained(model_name) | |
# Fonction pour extraire le texte | |
def extract_text_from_pdf(file): | |
doc = fitz.open(stream=file.file.read(), filetype="pdf") | |
return "\n".join([page.get_text() for page in doc]).strip() | |
def extract_text_from_docx(file): | |
doc = Document(io.BytesIO(file.file.read())) | |
return "\n".join([para.text for para in doc.paragraphs]).strip() | |
def extract_text_from_pptx(file): | |
prs = Presentation(io.BytesIO(file.file.read())) | |
return "\n".join([shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text")]).strip() | |
def extract_text_from_excel(file): | |
wb = openpyxl.load_workbook(io.BytesIO(file.file.read()), data_only=True) | |
text = [str(cell) for sheet in wb.worksheets for row in sheet.iter_rows(values_only=True) for cell in row if cell] | |
return "\n".join(text).strip() | |
async def translate_document(file: UploadFile = File(...), target_lang: str = Form(...)): | |
"""API pour traduire un document.""" | |
try: | |
logging.info(f"📥 Fichier reçu : {file.filename}") | |
logging.info(f"🌍 Langue cible reçue : {target_lang}") | |
if model is None or tokenizer is None: | |
return JSONResponse(status_code=500, content={"error": "Modèle de traduction non chargé"}) | |
# Extraction du texte | |
if file.filename.endswith(".pdf"): | |
text = extract_text_from_pdf(file) | |
elif file.filename.endswith(".docx"): | |
text = extract_text_from_docx(file) | |
elif file.filename.endswith(".pptx"): | |
text = extract_text_from_pptx(file) | |
elif file.filename.endswith(".xlsx"): | |
text = extract_text_from_excel(file) | |
else: | |
return JSONResponse(status_code=400, content={"error": "Format non supporté"}) | |
logging.info(f"📜 Texte extrait : {text[:50]}...") | |
if not text: | |
return JSONResponse(status_code=400, content={"error": "Aucun texte trouvé dans le document"}) | |
target_lang_id = tokenizer.get_lang_id(target_lang) | |
if target_lang_id is None: | |
return JSONResponse( | |
status_code=400, | |
content={"error": f"Langue cible '{target_lang}' non supportée. Langues disponibles : {list(tokenizer.lang_code_to_id.keys())}"} | |
) | |
tokenizer.src_lang = "fr" | |
encoded_text = tokenizer(text, return_tensors="pt", padding=True, truncation=True) | |
logging.info(f"🔍 ID de la langue cible : {target_lang_id}") | |
generated_tokens = model.generate(**encoded_text, forced_bos_token_id=target_lang_id) | |
translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] | |
logging.info(f"✅ Traduction réussie : {translated_text[:50]}...") | |
return {"translated_text": translated_text} | |
except Exception as e: | |
logging.error(f"❌ Erreur lors de la traduction : {e}") | |
return JSONResponse(status_code=500, content={"error": "Échec de la traduction"}) | |
codegen_model_name = "Salesforce/codegen-350M-mono" | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
codegen_tokenizer = AutoTokenizer.from_pretrained(codegen_model_name) | |
codegen_model = AutoModelForCausalLM.from_pretrained(codegen_model_name).to(device) | |
VALID_PLOTS = {"histplot", "scatterplot", "barplot", "lineplot", "boxplot"} | |
print("hello") | |
async def generate_viz(file: UploadFile = File(...), query: str = Form(...)): | |
print("🔵 Début /generate_viz") | |
try: | |
if query not in VALID_PLOTS: | |
return JSONResponse(content={"error": f"Type de graphique invalide. Choisissez parmi : {', '.join(VALID_PLOTS)}"}, status_code=400) | |
file_content = await file.read() | |
df = pd.read_excel(BytesIO(file_content)) | |
numeric_cols = df.select_dtypes(include=["number"]).columns | |
if len(numeric_cols) < 1: | |
return JSONResponse(content={"error": "Le fichier doit contenir au moins une colonne numérique."}, status_code=400) | |
x_col = numeric_cols[0] | |
y_col = numeric_cols[1] if query != "histplot" and len(numeric_cols) > 1 else None | |
prompt = f""" | |
### Génère uniquement du code Python fonctionnel pour tracer un {query} avec Matplotlib et Seaborn | |
```python | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
plt.figure(figsize=(8,6)) | |
sns.{query}(data=df, x="{x_col}"{f', y="{y_col}"' if y_col else ''}) | |
plt.savefig("plot.png") | |
plt.close() | |
""" | |
print("🟣 Prompt envoyé au modèle :") | |
print(prompt) | |
inputs = codegen_tokenizer(prompt, return_tensors="pt").to(device) | |
outputs = codegen_model.generate( | |
**inputs, | |
max_new_tokens=150, | |
pad_token_id=codegen_tokenizer.eos_token_id | |
) | |
generated_code = codegen_tokenizer.decode(outputs[0], skip_special_tokens=True).strip() | |
# Nettoyage : retirer tout ce qui n'est pas du vrai code | |
generated_code = re.sub(r"^.*?```python", "", generated_code, flags=re.DOTALL) | |
generated_code = re.sub(r"```.*?$", "", generated_code, flags=re.DOTALL).strip() | |
print("🔵 Code généré propre :") | |
print(generated_code) | |
if not generated_code.strip(): | |
return JSONResponse(content={"error": "Erreur : Code généré vide."}, status_code=500) | |
try: | |
compile(generated_code, "<string>", "exec") | |
except SyntaxError as e: | |
return JSONResponse(content={"error": f"Erreur de syntaxe détectée : {e}\nCode généré :\n{generated_code}"}, status_code=422) | |
exec_env = {"df": df, "plt": plt, "sns": sns, "pd": pd} | |
print("🔹🔹🔹 Code réellement exécuté :") | |
exec(generated_code, exec_env) | |
img_path = "plot.png" | |
if not os.path.exists(img_path): | |
return JSONResponse(content={"error": "Le fichier plot.png n'a pas été généré."}, status_code=500) | |
if os.path.getsize(img_path) == 0: | |
return JSONResponse(content={"error": "Le fichier plot.png est vide."}, status_code=500) | |
with open(img_path, "rb") as image_file: | |
encoded_string = base64.b64encode(image_file.read()).decode('utf-8') | |
print("🟢 Génération réussie ✅") | |
return JSONResponse(content={"image_base64": encoded_string}) | |
except Exception as e: | |
print(f"🔴 Erreur serveur : {e}") | |
return JSONResponse(content={"error": f"Erreur lors de la génération du graphique : {str(e)}"}, status_code=500) | |
summarizer = None | |
try: | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
logging.info("✅ Modèle de résumé chargé avec succès !") | |
except Exception as e: | |
logging.error(f"❌ Erreur chargement modèle résumé : {e}") | |
try: | |
image_captioning = pipeline("image-to-text", model="nlpconnect/vit-gpt2-image-captioning") | |
logging.info("✅ Modèle d'image chargé avec succès !") | |
except Exception as e: | |
image_captioning = None | |
logging.error(f"❌ Erreur chargement modèle image : {e}") | |
def extract_text_from_docx(docx_file): | |
doc = Document(BytesIO(docx_file)) | |
text = "\n".join([para.text for para in doc.paragraphs]) | |
return text | |
def extract_text_from_excel(xlsx_file): | |
df = pd.read_excel(BytesIO(xlsx_file)) | |
text = df.to_string(index=False) | |
return text | |
def extract_text_from_pptx(pptx_file): | |
presentation = Presentation(BytesIO(pptx_file)) | |
text = "" | |
for slide in presentation.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
text += shape.text + "\n" | |
return text | |
async def summarize(file: UploadFile = File(...)): | |
if summarizer is None: | |
return {"message": "Le modèle est en cours de chargement, veuillez patienter..."} | |
contents = await file.read() | |
if file.filename.endswith(".pdf"): | |
text = extract_text(BytesIO(contents)) | |
elif file.filename.endswith(".docx"): | |
text = extract_text_from_docx(contents) | |
elif file.filename.endswith(".xls") or file.filename.endswith(".xlsx"): | |
text = extract_text_from_excel(contents) | |
elif file.filename.endswith(".pptx") or file.filename.endswith(".ppt"): | |
text = extract_text_from_pptx(contents) | |
else: | |
return {"summary": "Résumé non disponible pour ce format de fichier."} | |
try: | |
if summarizer: | |
summary = summarizer(text[:1024]) | |
summary_text = summary[0]['summary_text'] | |
else: | |
summary_text = "❌ Modèle de résumé non disponible." | |
except Exception as e: | |
summary_text = f"❌ Erreur lors de la génération du résumé : {e}" | |
return {"summary": summary_text} | |
async def caption_image(file: UploadFile = File(...)): | |
if image_captioning is None: | |
return JSONResponse(content={"error": "Le modèle de captioning n'est pas disponible."}, status_code=500) | |
try: | |
contents = await file.read() | |
image = Image.open(io.BytesIO(contents)).convert("RGB") | |
result = image_captioning(image) | |
caption = result[0]['generated_text'] | |
return {"caption": caption} | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
try: | |
qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2") | |
logging.info("✅ Modèle QA Texte chargé avec succès !") | |
except Exception as e: | |
qa_pipeline = None | |
logging.error(f"❌ Erreur chargement modèle QA Texte : {e}") | |
try: | |
image_qa_pipeline = pipeline("visual-question-answering", model="Salesforce/blip-vqa-base") | |
logging.info("✅ Modèle QA Image chargé avec succès !") | |
except Exception as e: | |
image_qa_pipeline = None | |
logging.error(f"❌ Erreur chargement modèle QA Image : {e}") | |
async def doc_question_answer(file: UploadFile = File(...), question: str = Form(...)): | |
if qa_pipeline is None: | |
return JSONResponse(content={"error": "Modèle indisponible."}, status_code=500) | |
try: | |
contents = await file.read() | |
filename = file.filename.lower() | |
if filename.endswith(".docx"): | |
with open("temp.docx", "wb") as f: | |
f.write(contents) | |
context = docx2txt.process("temp.docx") | |
elif filename.endswith((".xlsx", ".xls")): | |
df = pd.read_excel(BytesIO(contents)) | |
context = df.to_string(index=False) | |
elif filename.endswith(".pptx"): | |
presentation = Presentation(BytesIO(contents)) | |
context = "" | |
for slide in presentation.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
context += shape.text + "\n" | |
elif filename.endswith(".pdf"): | |
context = extract_text(BytesIO(contents)) | |
else: | |
return JSONResponse(content={"error": "Format non supporté."}, status_code=400) | |
result = qa_pipeline(question=question, context=context) | |
return {"answer": result["answer"]} | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
async def image_qa(file: UploadFile = File(...), question: str = Form(...)): | |
if image_qa_pipeline is None: | |
return JSONResponse(content={"error": "Le modèle n'est pas disponible."}, status_code=500) | |
try: | |
contents = await file.read() | |
image = Image.open(io.BytesIO(contents)).convert("RGB") | |
result = image_qa_pipeline(image=image, question=question) | |
answer = result[0]['answer'] | |
return {"answer": answer} | |
except Exception as e: | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
app.mount("/static", StaticFiles(directory="static", html=True), name="static") | |
async def root(): | |
return RedirectResponse(url="/static/principal.html") |