import os import tempfile import time import re import logging from datetime import datetime import gradio as gr import google.generativeai as genai from PyPDF2 import PdfReader from tika import parser # Configure logging LOG_FILE = "pdf_processor_log.txt" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler(LOG_FILE) ] ) logger = logging.getLogger("pdf_processor") # Try Unstructured.io try: from unstructured.partition.pdf import partition_pdf UNSTRUCTURED_AVAILABLE = True except ImportError: UNSTRUCTURED_AVAILABLE = False logger.warning("unstructured.partition.pdf not available; skipping that method") # Load Gemini API key from env (set in your Space Secrets) API_KEY = os.getenv("GOOGLE_API_KEY") if API_KEY: genai.configure(api_key=API_KEY) else: logger.warning("GOOGLE_API_KEY not set in environment.") EXTRACTED_TEXT = "" PDF_SECTIONS = [] EXTRACTION_METHOD = "" # --- Extraction Functions --- def extract_text_with_unstructured(pdf_path): try: logger.info("Extracting via Unstructured.io...") elements = partition_pdf(filename=pdf_path, extract_images_in_pdf=False) sections, current = [], {"title": "Introduction", "content": ""} for e in elements: if hasattr(e, "text") and (t := e.text.strip()): if len(t) < 80 and (t.isupper() or t.endswith(":") or re.match(r"^[0-9]+\.?\s+", t)): if current["content"]: sections.append(current) current = {"title": t, "content": ""} else: current["content"] += t + "\n\n" if current["content"]: sections.append(current) return sections except Exception as e: # Bubble up so process_pdf can catch & log logger.error(f"Unstructured extraction error: {e}", exc_info=True) raise def extract_text_with_pypdf(pdf_path): logger.info("Extracting via PyPDF2...") reader = PdfReader(pdf_path) full_text = "" for i, page in enumerate(reader.pages, start=1): txt = page.extract_text() if txt: full_text += f"\n\n--- Page {i} ---\n\n{txt}" parts = re.split(r"\n\s*([A-Z][A-Z\s]+:?|[0-9]+\.\s+[A-Z].*?)\s*\n", full_text) if len(parts) > 1: return [ {"title": parts[i].strip(), "content": parts[i + 1].strip()} for i in range(1, len(parts), 2) ] return [{"title": "Document", "content": full_text}] def extract_text_with_tika(pdf_path): logger.info("Extracting via Tika...") parsed = parser.from_file(pdf_path) lines = (parsed.get("content") or "").split("\n") sections, current = [], {"title": "Introduction", "content": ""} for ln in lines: ln = ln.strip() if not ln: continue if len(ln) < 80 and (ln.isupper() or ln.endswith(":") or re.match(r"^[0-9]+\.?\s+[A-Z]", ln)): if current["content"]: sections.append(current) current = {"title": ln, "content": ""} else: current["content"] += ln + "\n\n" if current["content"]: sections.append(current) return sections # --- Gemini calls --- def generate_greg_brockman_summary(content): model = genai.GenerativeModel("gemini-1.5-pro") prompt = f""" You are an expert document analyst specializing in proposal evaluation. # GREG BROCKMAN TEMPLATE STRUCTURE 1. GOAL: ... ... (rest of template) ... CONTENT: {content} """ try: resp = model.generate_content(prompt) return resp.text, None except Exception as e: logger.error(f"Summary error: {e}") return None, str(e) def answer_question_about_pdf(content, question): model = genai.GenerativeModel("gemini-1.5-pro") prompt = f""" You are a precise document analysis assistant. DOCUMENT CONTENT: {content} QUESTION: {question} """ try: resp = model.generate_content(prompt) return resp.text, None except Exception as e: logger.error(f"Q&A error: {e}") return None, str(e) # --- Handlers --- def process_pdf(pdf_file, progress=gr.Progress()): global EXTRACTED_TEXT, PDF_SECTIONS, EXTRACTION_METHOD if not API_KEY: return None, None, "❌ Set GOOGLE_API_KEY in Secrets.", "" if pdf_file is None: return None, None, "❌ No file uploaded.", "" # Determine path & write bytes if needed tmp_dir = tempfile.gettempdir() # Case 1: NamedString (in‐memory) with .name & .data if hasattr(pdf_file, "name") and hasattr(pdf_file, "data"): path = os.path.join(tmp_dir, pdf_file.name) with open(path, "wb") as f: f.write(pdf_file.data) # Case 2: direct filepath (str) elif isinstance(pdf_file, str): path = pdf_file # Case 3: file‐like with .read() elif hasattr(pdf_file, "read"): path = os.path.join(tmp_dir, getattr(pdf_file, "name", "uploaded.pdf")) with open(path, "wb") as f: f.write(pdf_file.read()) else: return None, None, "❌ Unrecognized upload type", "" # Try methods in order methods = [] if UNSTRUCTURED_AVAILABLE: methods.append(("unstructured", extract_text_with_unstructured)) methods += [ ("pypdf", extract_text_with_pypdf), ("tika", extract_text_with_tika), ] sections = None last_err = "" for name, fn in methods: try: secs = fn(path) if secs: sections = secs EXTRACTION_METHOD = name break except Exception as e: last_err = f"{name} failed: {e}" logger.warning(last_err) if not sections: return None, None, "❌ Extraction failed", last_err # Combine & summarize combined, structure = "", "" for i, sec in enumerate(sections, 1): structure += f"{i}. {sec['title']}\n" chunk = f"## {sec['title']}\n{sec['content']}\n\n" combined += chunk if len(combined + chunk) < 30000 else f"## {sec['title']}\n[Truncated]\n\n" EXTRACTED_TEXT = combined PDF_SECTIONS = sections summary, err = generate_greg_brockman_summary(combined) if err: return None, structure, f"❌ {err}", combined return summary, structure, "✅ PDF processed", f"Used {EXTRACTION_METHOD}" def ask_question(question): if not API_KEY: return "❌ Set GOOGLE_API_KEY in Secrets." if not EXTRACTED_TEXT: return "❌ Process a PDF first." if not question.strip(): return "❌ Enter a question." ans, err = answer_question_about_pdf(EXTRACTED_TEXT, question) return ans if not err else f"❌ {err}" def view_log(): try: return open(LOG_FILE).read() except Exception as e: return f"Error reading log: {e}" def save_summary(summary): if not summary: return "❌ No summary to save." fn = f"summary_{datetime.now():%Y%m%d_%H%M%S}.txt" with open(fn, "w", encoding="utf-8") as f: f.write(summary) return f"✅ Saved to {fn}" def save_qa(question, answer): if not question or not answer: return "❌ Nothing to save." fn = f"qa_{datetime.now():%Y%m%d_%H%M%S}.txt" with open(fn, "w", encoding="utf-8") as f: f.write(f"Q: {question}\n\nA: {answer}") return f"✅ Saved to {fn}" # --- Gradio UI --- with gr.Blocks(title="PDF Analyzer with Gemini API") as app: gr.Markdown("# 📄 PDF Analyzer with Gemini API") gr.Markdown("Upload a PDF, get a Greg Brockman style summary, and ask questions.") with gr.Tab("Setup"): gr.Markdown("⚠️ Make sure `GOOGLE_API_KEY` is set in your Space's Secrets.") with gr.Tab("PDF Processing"): with gr.Row(): pdf_file = gr.File(label="Upload PDF", file_types=[".pdf"]) proc_btn = gr.Button("Process PDF", variant="primary") status = gr.Markdown("Awaiting upload…") summary_out = gr.Textbox(label="Summary", lines=15) structure_out = gr.Textbox(label="Structure", lines=8) log_info = gr.Textbox(label="Internal Log", lines=5) proc_btn.click( fn=process_pdf, inputs=[pdf_file], outputs=[summary_out, structure_out, status, log_info] ) save_sum_btn = gr.Button("Save Summary") save_sum_status = gr.Markdown() save_sum_btn.click(save_summary, inputs=[summary_out], outputs=[save_sum_status]) with gr.Tab("Ask Questions"): question_in = gr.Textbox(label="Your Question", lines=2) ask_btn = gr.Button("Ask", variant="primary") answer_out = gr.Textbox(label="Answer", lines=10) ask_btn.click(ask_question, inputs=[question_in], outputs=[answer_out]) save_qa_btn = gr.Button("Save Q&A") save_qa_status = gr.Markdown() save_qa_btn.click(save_qa, inputs=[question_in, answer_out], outputs=[save_qa_status]) with gr.Tab("System Log"): refresh_btn = gr.Button("Refresh Log") sys_log = gr.Textbox(label="System Log", lines=20) refresh_btn.click(view_log, inputs=None, outputs=[sys_log]) if __name__ == "__main__": app.launch(server_name="0.0.0.0")