from fastapi import FastAPI, File, UploadFile, Form from fastapi.staticfiles import StaticFiles from fastapi.responses import RedirectResponse from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from transformers import pipeline, M2M100ForConditionalGeneration, M2M100Tokenizer, MarianMTModel, MarianTokenizer import shutil # import os import logging from PyPDF2 import PdfReader import docx from PIL import Image import openpyxl # 📌 Pour lire les fichiers Excel (.xlsx) from pptx import Presentation import fitz # PyMuPDF import io from docx import Document import matplotlib.pyplot as plt import seaborn as sns import torch import re import pandas as pd from transformers import AutoTokenizer, AutoModelForCausalLM from fastapi.responses import FileResponse import os from fastapi.middleware.cors import CORSMiddleware import matplotlib matplotlib.use('Agg') import re import torch import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from transformers import AutoTokenizer, AutoModelForCausalLM from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import FileResponse import os from fastapi.middleware.cors import CORSMiddleware from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from transformers import pipeline, M2M100ForConditionalGeneration, M2M100Tokenizer import shutil import os import logging from fastapi.middleware.cors import CORSMiddleware from PyPDF2 import PdfReader import docx from PIL import Image # Pour ouvrir les images avant analyse from transformers import MarianMTModel, MarianTokenizer import os import fitz from transformers import M2M100ForConditionalGeneration, M2M100Tokenizer import logging import openpyxl from fastapi.responses import FileResponse, RedirectResponse, JSONResponse from fastapi.staticfiles import StaticFiles from io import BytesIO from pdfminer.high_level import extract_text from docx import Document import pandas as pd from pptx import Presentation import logging from transformers import pipeline from PIL import Image import io import docx2txt # Configuration du logging logging.basicConfig(level=logging.INFO) app = FastAPI() # Configuration CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) UPLOAD_DIR = "uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) from transformers import AutoModelForSeq2SeqLM, AutoTokenizer model_name = "facebook/m2m100_418M" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSeq2SeqLM.from_pretrained(model_name) # Fonction pour extraire le texte def extract_text_from_pdf(file): doc = fitz.open(stream=file.file.read(), filetype="pdf") return "\n".join([page.get_text() for page in doc]).strip() def extract_text_from_docx(file): doc = Document(io.BytesIO(file.file.read())) return "\n".join([para.text for para in doc.paragraphs]).strip() def extract_text_from_pptx(file): prs = Presentation(io.BytesIO(file.file.read())) return "\n".join([shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text")]).strip() def extract_text_from_excel(file): wb = openpyxl.load_workbook(io.BytesIO(file.file.read()), data_only=True) text = [str(cell) for sheet in wb.worksheets for row in sheet.iter_rows(values_only=True) for cell in row if cell] return "\n".join(text).strip() @app.post("/translate/") async def translate_document(file: UploadFile = File(...), target_lang: str = Form(...)): """API pour traduire un document.""" try: logging.info(f"📥 Fichier reçu : {file.filename}") logging.info(f"🌍 Langue cible reçue : {target_lang}") if model is None or tokenizer is None: return JSONResponse(status_code=500, content={"error": "Modèle de traduction non chargé"}) # Extraction du texte if file.filename.endswith(".pdf"): text = extract_text_from_pdf(file) elif file.filename.endswith(".docx"): text = extract_text_from_docx(file) elif file.filename.endswith(".pptx"): text = extract_text_from_pptx(file) elif file.filename.endswith(".xlsx"): text = extract_text_from_excel(file) else: return JSONResponse(status_code=400, content={"error": "Format non supporté"}) logging.info(f"📜 Texte extrait : {text[:50]}...") if not text: return JSONResponse(status_code=400, content={"error": "Aucun texte trouvé dans le document"}) # Vérifier si la langue cible est supportée target_lang_id = tokenizer.get_lang_id(target_lang) if target_lang_id is None: return JSONResponse( status_code=400, content={"error": f"Langue cible '{target_lang}' non supportée. Langues disponibles : {list(tokenizer.lang_code_to_id.keys())}"} ) # Traduction tokenizer.src_lang = "fr" encoded_text = tokenizer(text, return_tensors="pt", padding=True, truncation=True) logging.info(f"🔍 ID de la langue cible : {target_lang_id}") generated_tokens = model.generate(**encoded_text, forced_bos_token_id=target_lang_id) translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] logging.info(f"✅ Traduction réussie : {translated_text[:50]}...") return {"translated_text": translated_text} except Exception as e: logging.error(f"❌ Erreur lors de la traduction : {e}") return JSONResponse(status_code=500, content={"error": "Échec de la traduction"}) # Charger le modèle pour la génération de code codegen_model_name = "Salesforce/codegen-350M-mono" device = "cuda" if torch.cuda.is_available() else "cpu" codegen_tokenizer = AutoTokenizer.from_pretrained(codegen_model_name) codegen_model = AutoModelForCausalLM.from_pretrained(codegen_model_name).to(device) VALID_PLOTS = {"histplot", "scatterplot", "barplot", "lineplot", "boxplot"} @app.post("/generate_viz/") async def generate_viz(file: UploadFile = File(...), query: str = Form(...)): try: if query not in VALID_PLOTS: return {"error": f"Type de graphique invalide. Choisissez parmi : {', '.join(VALID_PLOTS)}"} df = pd.read_excel(file.file) numeric_cols = df.select_dtypes(include=["number"]).columns if len(numeric_cols) < 2: return {"error": "Le fichier doit contenir au moins deux colonnes numériques."} x_col, y_col = numeric_cols[:2] # Contraintes spécifiques pour éviter l'erreur avec histplot if query == "histplot": prompt_y = "" else: prompt_y = f', y="{y_col}"' # Générer l'invite pour le modèle prompt = f""" ### Génère uniquement du code Python fonctionnel pour tracer un {query} avec Matplotlib et Seaborn ### # Contraintes : # - Utilise 'df' sans recréer de nouvelles données # - Axe X : '{x_col}' # - Enregistre le graphique sous 'plot.png' # - Ne génère que du code Python valide, sans texte explicatif # Contraintes spécifiques pour sns.histplot : # - N'inclut pas "y=" car histplot ne supporte qu'un axe import matplotlib.pyplot as plt import seaborn as sns plt.figure(figsize=(8,6)) sns.{query}(data=df, x="{x_col}"{prompt_y}) plt.savefig("plot.png") plt.close() """ # Génération du code inputs = codegen_tokenizer(prompt, return_tensors="pt").to(device) outputs = codegen_model.generate(**inputs, max_new_tokens=120, pad_token_id=codegen_tokenizer.eos_token_id) generated_code = codegen_tokenizer.decode(outputs[0], skip_special_tokens=True).strip() # Nettoyage du code generated_code = re.sub(r"(import matplotlib.pyplot as plt\nimport seaborn as sns\n)+", "import matplotlib.pyplot as plt\nimport seaborn as sns\n", generated_code) if generated_code.strip().endswith("sns."): generated_code = generated_code.rsplit("\n", 1)[0] # Supprime la dernière ligne incomplète print("🔹 Code généré par l'IA :\n", generated_code) # Vérification syntaxique avant exécution try: compile(generated_code, "", "exec") except SyntaxError as e: return {"error": f"Erreur de syntaxe détectée : {e}\nCode généré :\n{generated_code}"} # Vérification des données print(df.head()) # Affiche les premières lignes du dataframe print(df.dtypes) # Vérifie les types de colonnes print(f"Colonne '{x_col}' - Valeurs uniques:", df[x_col].unique()) if df.empty or x_col not in df.columns or df[x_col].isnull().all(): return {"error": f"La colonne '{x_col}' est absente ou ne contient pas de données valides."} # Exécution du code généré exec_env = {"df": df, "plt": plt, "sns": sns, "pd": pd} exec(generated_code, exec_env) # Vérification de l'image générée img_path = "plot.png" if not os.path.exists(img_path): return {"error": "Le fichier plot.png n'a pas été généré."} if os.path.getsize(img_path) == 0: return {"error": "Le fichier plot.png est vide."} plt.close() return FileResponse(img_path, media_type="image/png") except Exception as e: return {"error": f"Erreur lors de la génération du graphique : {str(e)}"} # Charger le modèle de résumé summarizer = None try: summarizer = pipeline("summarization", model="facebook/bart-large-cnn") logging.info("✅ Modèle de résumé chargé avec succès !") except Exception as e: logging.error(f"❌ Erreur chargement modèle résumé : {e}") try: image_captioning = pipeline("image-to-text", model="nlpconnect/vit-gpt2-image-captioning") logging.info("✅ Modèle d'image chargé avec succès !") except Exception as e: image_captioning = None logging.error(f"❌ Erreur chargement modèle image : {e}") # Fonction pour extraire le texte d'un fichier Word def extract_text_from_docx(docx_file): doc = Document(BytesIO(docx_file)) text = "\n".join([para.text for para in doc.paragraphs]) return text # Fonction pour extraire le texte d'un fichier Excel def extract_text_from_excel(xlsx_file): # Utiliser pandas pour lire le fichier Excel df = pd.read_excel(BytesIO(xlsx_file)) text = df.to_string(index=False) return text # Fonction pour extraire le texte d'un fichier PowerPoint def extract_text_from_pptx(pptx_file): presentation = Presentation(BytesIO(pptx_file)) text = "" for slide in presentation.slides: for shape in slide.shapes: if hasattr(shape, "text"): text += shape.text + "\n" return text # Endpoint pour la fonctionnalité de résumé @app.post("/summarize/") async def summarize(file: UploadFile = File(...)): # Si le modèle n'est pas encore chargé, retourner un message indiquant que le modèle est en train de se charger if summarizer is None: return {"message": "Le modèle est en cours de chargement, veuillez patienter..."} # Extraire le contenu du fichier téléchargé contents = await file.read() # Identifier le type de fichier et extraire le texte if file.filename.endswith(".pdf"): text = extract_text(BytesIO(contents)) elif file.filename.endswith(".docx"): text = extract_text_from_docx(contents) elif file.filename.endswith(".xls") or file.filename.endswith(".xlsx"): text = extract_text_from_excel(contents) elif file.filename.endswith(".pptx") or file.filename.endswith(".ppt"): text = extract_text_from_pptx(contents) else: return {"summary": "Résumé non disponible pour ce format de fichier."} # Si un modèle de résumé est chargé, effectuer le résumé try: if summarizer: summary = summarizer(text[:1024]) # Limiter la taille d'entrée pour le modèle summary_text = summary[0]['summary_text'] else: summary_text = "❌ Modèle de résumé non disponible." except Exception as e: summary_text = f"❌ Erreur lors de la génération du résumé : {e}" # Retourner le résumé généré return {"summary": summary_text} @app.post("/image-caption/") async def caption_image(file: UploadFile = File(...)): if image_captioning is None: return JSONResponse(content={"error": "Le modèle de captioning n'est pas disponible."}, status_code=500) try: contents = await file.read() image = Image.open(io.BytesIO(contents)).convert("RGB") result = image_captioning(image) caption = result[0]['generated_text'] return {"caption": caption} except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) try: qa_pipeline = pipeline("question-answering", model="deepset/roberta-base-squad2") logging.info("✅ Modèle QA Texte chargé avec succès !") except Exception as e: qa_pipeline = None logging.error(f"❌ Erreur chargement modèle QA Texte : {e}") try: image_qa_pipeline = pipeline("visual-question-answering", model="Salesforce/blip-vqa-base") logging.info("✅ Modèle QA Image chargé avec succès !") except Exception as e: image_qa_pipeline = None logging.error(f"❌ Erreur chargement modèle QA Image : {e}") @app.post("/doc-qa/") async def doc_question_answer(file: UploadFile = File(...), question: str = Form(...)): if qa_pipeline is None: return JSONResponse(content={"error": "Modèle indisponible."}, status_code=500) try: contents = await file.read() filename = file.filename.lower() if filename.endswith(".docx"): with open("temp.docx", "wb") as f: f.write(contents) context = docx2txt.process("temp.docx") elif filename.endswith((".xlsx", ".xls")): df = pd.read_excel(BytesIO(contents)) context = df.to_string(index=False) elif filename.endswith(".pptx"): presentation = Presentation(BytesIO(contents)) context = "" for slide in presentation.slides: for shape in slide.shapes: if hasattr(shape, "text"): context += shape.text + "\n" elif filename.endswith(".pdf"): context = extract_text(BytesIO(contents)) else: return JSONResponse(content={"error": "Format non supporté."}, status_code=400) result = qa_pipeline(question=question, context=context) return {"answer": result["answer"]} except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) @app.post("/image-qa/") async def image_qa(file: UploadFile = File(...), question: str = Form(...)): if image_qa_pipeline is None: return JSONResponse(content={"error": "Le modèle n'est pas disponible."}, status_code=500) try: contents = await file.read() image = Image.open(io.BytesIO(contents)).convert("RGB") result = image_qa_pipeline(image=image, question=question) answer = result[0]['answer'] return {"answer": answer} except Exception as e: return JSONResponse(content={"error": str(e)}, status_code=500) # Servir les fichiers statiques (HTML, CSS, JS) app.mount("/static", StaticFiles(directory="static", html=True), name="static") @app.get("/") async def root(): return RedirectResponse(url="/static/principal.html")