Omarrran's picture
Update app.py
c72b167 verified
import os
import tempfile
import time
import re
import logging
from datetime import datetime
import gradio as gr
import google.generativeai as genai
from PyPDF2 import PdfReader
from tika import parser
# Configure logging
LOG_FILE = "pdf_processor_log.txt"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(LOG_FILE)
]
)
logger = logging.getLogger("pdf_processor")
# Try Unstructured.io
try:
from unstructured.partition.pdf import partition_pdf
UNSTRUCTURED_AVAILABLE = True
except ImportError:
UNSTRUCTURED_AVAILABLE = False
logger.warning("unstructured.partition.pdf not available; skipping that method")
# Load Gemini API key from env (set in your Space Secrets)
API_KEY = os.getenv("GOOGLE_API_KEY")
if API_KEY:
genai.configure(api_key=API_KEY)
else:
logger.warning("GOOGLE_API_KEY not set in environment.")
EXTRACTED_TEXT = ""
PDF_SECTIONS = []
EXTRACTION_METHOD = ""
# --- Extraction Functions ---
def extract_text_with_unstructured(pdf_path):
try:
logger.info("Extracting via Unstructured.io...")
elements = partition_pdf(filename=pdf_path, extract_images_in_pdf=False)
sections, current = [], {"title": "Introduction", "content": ""}
for e in elements:
if hasattr(e, "text") and (t := e.text.strip()):
if len(t) < 80 and (t.isupper() or t.endswith(":") or re.match(r"^[0-9]+\.?\s+", t)):
if current["content"]:
sections.append(current)
current = {"title": t, "content": ""}
else:
current["content"] += t + "\n\n"
if current["content"]:
sections.append(current)
return sections
except Exception as e:
# Bubble up so process_pdf can catch & log
logger.error(f"Unstructured extraction error: {e}", exc_info=True)
raise
def extract_text_with_pypdf(pdf_path):
logger.info("Extracting via PyPDF2...")
reader = PdfReader(pdf_path)
full_text = ""
for i, page in enumerate(reader.pages, start=1):
txt = page.extract_text()
if txt:
full_text += f"\n\n--- Page {i} ---\n\n{txt}"
parts = re.split(r"\n\s*([A-Z][A-Z\s]+:?|[0-9]+\.\s+[A-Z].*?)\s*\n", full_text)
if len(parts) > 1:
return [
{"title": parts[i].strip(), "content": parts[i + 1].strip()}
for i in range(1, len(parts), 2)
]
return [{"title": "Document", "content": full_text}]
def extract_text_with_tika(pdf_path):
logger.info("Extracting via Tika...")
parsed = parser.from_file(pdf_path)
lines = (parsed.get("content") or "").split("\n")
sections, current = [], {"title": "Introduction", "content": ""}
for ln in lines:
ln = ln.strip()
if not ln:
continue
if len(ln) < 80 and (ln.isupper() or ln.endswith(":") or re.match(r"^[0-9]+\.?\s+[A-Z]", ln)):
if current["content"]:
sections.append(current)
current = {"title": ln, "content": ""}
else:
current["content"] += ln + "\n\n"
if current["content"]:
sections.append(current)
return sections
# --- Gemini calls ---
def generate_greg_brockman_summary(content):
model = genai.GenerativeModel("gemini-1.5-pro")
prompt = f"""
You are an expert document analyst specializing in proposal evaluation.
# GREG BROCKMAN TEMPLATE STRUCTURE
1. GOAL: ...
... (rest of template) ...
CONTENT:
{content}
"""
try:
resp = model.generate_content(prompt)
return resp.text, None
except Exception as e:
logger.error(f"Summary error: {e}")
return None, str(e)
def answer_question_about_pdf(content, question):
model = genai.GenerativeModel("gemini-1.5-pro")
prompt = f"""
You are a precise document analysis assistant.
DOCUMENT CONTENT:
{content}
QUESTION: {question}
"""
try:
resp = model.generate_content(prompt)
return resp.text, None
except Exception as e:
logger.error(f"Q&A error: {e}")
return None, str(e)
# --- Handlers ---
def process_pdf(pdf_file, progress=gr.Progress()):
global EXTRACTED_TEXT, PDF_SECTIONS, EXTRACTION_METHOD
if not API_KEY:
return None, None, "❌ Set GOOGLE_API_KEY in Secrets.", ""
if pdf_file is None:
return None, None, "❌ No file uploaded.", ""
# Determine path & write bytes if needed
tmp_dir = tempfile.gettempdir()
# Case 1: NamedString (in‐memory) with .name & .data
if hasattr(pdf_file, "name") and hasattr(pdf_file, "data"):
path = os.path.join(tmp_dir, pdf_file.name)
with open(path, "wb") as f:
f.write(pdf_file.data)
# Case 2: direct filepath (str)
elif isinstance(pdf_file, str):
path = pdf_file
# Case 3: file‐like with .read()
elif hasattr(pdf_file, "read"):
path = os.path.join(tmp_dir, getattr(pdf_file, "name", "uploaded.pdf"))
with open(path, "wb") as f:
f.write(pdf_file.read())
else:
return None, None, "❌ Unrecognized upload type", ""
# Try methods in order
methods = []
if UNSTRUCTURED_AVAILABLE:
methods.append(("unstructured", extract_text_with_unstructured))
methods += [
("pypdf", extract_text_with_pypdf),
("tika", extract_text_with_tika),
]
sections = None
last_err = ""
for name, fn in methods:
try:
secs = fn(path)
if secs:
sections = secs
EXTRACTION_METHOD = name
break
except Exception as e:
last_err = f"{name} failed: {e}"
logger.warning(last_err)
if not sections:
return None, None, "❌ Extraction failed", last_err
# Combine & summarize
combined, structure = "", ""
for i, sec in enumerate(sections, 1):
structure += f"{i}. {sec['title']}\n"
chunk = f"## {sec['title']}\n{sec['content']}\n\n"
combined += chunk if len(combined + chunk) < 30000 else f"## {sec['title']}\n[Truncated]\n\n"
EXTRACTED_TEXT = combined
PDF_SECTIONS = sections
summary, err = generate_greg_brockman_summary(combined)
if err:
return None, structure, f"❌ {err}", combined
return summary, structure, "✅ PDF processed", f"Used {EXTRACTION_METHOD}"
def ask_question(question):
if not API_KEY:
return "❌ Set GOOGLE_API_KEY in Secrets."
if not EXTRACTED_TEXT:
return "❌ Process a PDF first."
if not question.strip():
return "❌ Enter a question."
ans, err = answer_question_about_pdf(EXTRACTED_TEXT, question)
return ans if not err else f"❌ {err}"
def view_log():
try:
return open(LOG_FILE).read()
except Exception as e:
return f"Error reading log: {e}"
def save_summary(summary):
if not summary:
return "❌ No summary to save."
fn = f"summary_{datetime.now():%Y%m%d_%H%M%S}.txt"
with open(fn, "w", encoding="utf-8") as f:
f.write(summary)
return f"✅ Saved to {fn}"
def save_qa(question, answer):
if not question or not answer:
return "❌ Nothing to save."
fn = f"qa_{datetime.now():%Y%m%d_%H%M%S}.txt"
with open(fn, "w", encoding="utf-8") as f:
f.write(f"Q: {question}\n\nA: {answer}")
return f"✅ Saved to {fn}"
# --- Gradio UI ---
with gr.Blocks(title="PDF Analyzer with Gemini API") as app:
gr.Markdown("# 📄 PDF Analyzer with Gemini API")
gr.Markdown("Upload a PDF, get a Greg Brockman style summary, and ask questions.")
with gr.Tab("Setup"):
gr.Markdown("⚠️ Make sure `GOOGLE_API_KEY` is set in your Space's Secrets.")
with gr.Tab("PDF Processing"):
with gr.Row():
pdf_file = gr.File(label="Upload PDF", file_types=[".pdf"])
proc_btn = gr.Button("Process PDF", variant="primary")
status = gr.Markdown("Awaiting upload…")
summary_out = gr.Textbox(label="Summary", lines=15)
structure_out = gr.Textbox(label="Structure", lines=8)
log_info = gr.Textbox(label="Internal Log", lines=5)
proc_btn.click(
fn=process_pdf,
inputs=[pdf_file],
outputs=[summary_out, structure_out, status, log_info]
)
save_sum_btn = gr.Button("Save Summary")
save_sum_status = gr.Markdown()
save_sum_btn.click(save_summary, inputs=[summary_out], outputs=[save_sum_status])
with gr.Tab("Ask Questions"):
question_in = gr.Textbox(label="Your Question", lines=2)
ask_btn = gr.Button("Ask", variant="primary")
answer_out = gr.Textbox(label="Answer", lines=10)
ask_btn.click(ask_question, inputs=[question_in], outputs=[answer_out])
save_qa_btn = gr.Button("Save Q&A")
save_qa_status = gr.Markdown()
save_qa_btn.click(save_qa, inputs=[question_in, answer_out], outputs=[save_qa_status])
with gr.Tab("System Log"):
refresh_btn = gr.Button("Refresh Log")
sys_log = gr.Textbox(label="System Log", lines=20)
refresh_btn.click(view_log, inputs=None, outputs=[sys_log])
if __name__ == "__main__":
app.launch(server_name="0.0.0.0")