Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import time | |
import re | |
import logging | |
from datetime import datetime | |
import gradio as gr | |
import google.generativeai as genai | |
from PyPDF2 import PdfReader | |
from tika import parser | |
# Configure logging | |
LOG_FILE = "pdf_processor_log.txt" | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler(LOG_FILE) | |
] | |
) | |
logger = logging.getLogger("pdf_processor") | |
# Try Unstructured.io | |
try: | |
from unstructured.partition.pdf import partition_pdf | |
UNSTRUCTURED_AVAILABLE = True | |
except ImportError: | |
UNSTRUCTURED_AVAILABLE = False | |
logger.warning("unstructured.partition.pdf not available; skipping that method") | |
# Load Gemini API key from env (set in your Space Secrets) | |
API_KEY = os.getenv("GOOGLE_API_KEY") | |
if API_KEY: | |
genai.configure(api_key=API_KEY) | |
else: | |
logger.warning("GOOGLE_API_KEY not set in environment.") | |
EXTRACTED_TEXT = "" | |
PDF_SECTIONS = [] | |
EXTRACTION_METHOD = "" | |
# --- Extraction Functions --- | |
def extract_text_with_unstructured(pdf_path): | |
try: | |
logger.info("Extracting via Unstructured.io...") | |
elements = partition_pdf(filename=pdf_path, extract_images_in_pdf=False) | |
sections, current = [], {"title": "Introduction", "content": ""} | |
for e in elements: | |
if hasattr(e, "text") and (t := e.text.strip()): | |
if len(t) < 80 and (t.isupper() or t.endswith(":") or re.match(r"^[0-9]+\.?\s+", t)): | |
if current["content"]: | |
sections.append(current) | |
current = {"title": t, "content": ""} | |
else: | |
current["content"] += t + "\n\n" | |
if current["content"]: | |
sections.append(current) | |
return sections | |
except Exception as e: | |
# Bubble up so process_pdf can catch & log | |
logger.error(f"Unstructured extraction error: {e}", exc_info=True) | |
raise | |
def extract_text_with_pypdf(pdf_path): | |
logger.info("Extracting via PyPDF2...") | |
reader = PdfReader(pdf_path) | |
full_text = "" | |
for i, page in enumerate(reader.pages, start=1): | |
txt = page.extract_text() | |
if txt: | |
full_text += f"\n\n--- Page {i} ---\n\n{txt}" | |
parts = re.split(r"\n\s*([A-Z][A-Z\s]+:?|[0-9]+\.\s+[A-Z].*?)\s*\n", full_text) | |
if len(parts) > 1: | |
return [ | |
{"title": parts[i].strip(), "content": parts[i + 1].strip()} | |
for i in range(1, len(parts), 2) | |
] | |
return [{"title": "Document", "content": full_text}] | |
def extract_text_with_tika(pdf_path): | |
logger.info("Extracting via Tika...") | |
parsed = parser.from_file(pdf_path) | |
lines = (parsed.get("content") or "").split("\n") | |
sections, current = [], {"title": "Introduction", "content": ""} | |
for ln in lines: | |
ln = ln.strip() | |
if not ln: | |
continue | |
if len(ln) < 80 and (ln.isupper() or ln.endswith(":") or re.match(r"^[0-9]+\.?\s+[A-Z]", ln)): | |
if current["content"]: | |
sections.append(current) | |
current = {"title": ln, "content": ""} | |
else: | |
current["content"] += ln + "\n\n" | |
if current["content"]: | |
sections.append(current) | |
return sections | |
# --- Gemini calls --- | |
def generate_greg_brockman_summary(content): | |
model = genai.GenerativeModel("gemini-1.5-pro") | |
prompt = f""" | |
You are an expert document analyst specializing in proposal evaluation. | |
# GREG BROCKMAN TEMPLATE STRUCTURE | |
1. GOAL: ... | |
(rest of template) ... | |
CONTENT: | |
{content} | |
""" | |
try: | |
resp = model.generate_content(prompt) | |
return resp.text, None | |
except Exception as e: | |
logger.error(f"Summary error: {e}") | |
return None, str(e) | |
def answer_question_about_pdf(content, question): | |
model = genai.GenerativeModel("gemini-1.5-pro") | |
prompt = f""" | |
You are a precise document analysis assistant. | |
DOCUMENT CONTENT: | |
{content} | |
QUESTION: {question} | |
""" | |
try: | |
resp = model.generate_content(prompt) | |
return resp.text, None | |
except Exception as e: | |
logger.error(f"Q&A error: {e}") | |
return None, str(e) | |
# --- Handlers --- | |
def process_pdf(pdf_file, progress=gr.Progress()): | |
global EXTRACTED_TEXT, PDF_SECTIONS, EXTRACTION_METHOD | |
if not API_KEY: | |
return None, None, "❌ Set GOOGLE_API_KEY in Secrets.", "" | |
if pdf_file is None: | |
return None, None, "❌ No file uploaded.", "" | |
# Determine path & write bytes if needed | |
tmp_dir = tempfile.gettempdir() | |
# Case 1: NamedString (in‐memory) with .name & .data | |
if hasattr(pdf_file, "name") and hasattr(pdf_file, "data"): | |
path = os.path.join(tmp_dir, pdf_file.name) | |
with open(path, "wb") as f: | |
f.write(pdf_file.data) | |
# Case 2: direct filepath (str) | |
elif isinstance(pdf_file, str): | |
path = pdf_file | |
# Case 3: file‐like with .read() | |
elif hasattr(pdf_file, "read"): | |
path = os.path.join(tmp_dir, getattr(pdf_file, "name", "uploaded.pdf")) | |
with open(path, "wb") as f: | |
f.write(pdf_file.read()) | |
else: | |
return None, None, "❌ Unrecognized upload type", "" | |
# Try methods in order | |
methods = [] | |
if UNSTRUCTURED_AVAILABLE: | |
methods.append(("unstructured", extract_text_with_unstructured)) | |
methods += [ | |
("pypdf", extract_text_with_pypdf), | |
("tika", extract_text_with_tika), | |
] | |
sections = None | |
last_err = "" | |
for name, fn in methods: | |
try: | |
secs = fn(path) | |
if secs: | |
sections = secs | |
EXTRACTION_METHOD = name | |
break | |
except Exception as e: | |
last_err = f"{name} failed: {e}" | |
logger.warning(last_err) | |
if not sections: | |
return None, None, "❌ Extraction failed", last_err | |
# Combine & summarize | |
combined, structure = "", "" | |
for i, sec in enumerate(sections, 1): | |
structure += f"{i}. {sec['title']}\n" | |
chunk = f"## {sec['title']}\n{sec['content']}\n\n" | |
combined += chunk if len(combined + chunk) < 30000 else f"## {sec['title']}\n[Truncated]\n\n" | |
EXTRACTED_TEXT = combined | |
PDF_SECTIONS = sections | |
summary, err = generate_greg_brockman_summary(combined) | |
if err: | |
return None, structure, f"❌ {err}", combined | |
return summary, structure, "✅ PDF processed", f"Used {EXTRACTION_METHOD}" | |
def ask_question(question): | |
if not API_KEY: | |
return "❌ Set GOOGLE_API_KEY in Secrets." | |
if not EXTRACTED_TEXT: | |
return "❌ Process a PDF first." | |
if not question.strip(): | |
return "❌ Enter a question." | |
ans, err = answer_question_about_pdf(EXTRACTED_TEXT, question) | |
return ans if not err else f"❌ {err}" | |
def view_log(): | |
try: | |
return open(LOG_FILE).read() | |
except Exception as e: | |
return f"Error reading log: {e}" | |
def save_summary(summary): | |
if not summary: | |
return "❌ No summary to save." | |
fn = f"summary_{datetime.now():%Y%m%d_%H%M%S}.txt" | |
with open(fn, "w", encoding="utf-8") as f: | |
f.write(summary) | |
return f"✅ Saved to {fn}" | |
def save_qa(question, answer): | |
if not question or not answer: | |
return "❌ Nothing to save." | |
fn = f"qa_{datetime.now():%Y%m%d_%H%M%S}.txt" | |
with open(fn, "w", encoding="utf-8") as f: | |
f.write(f"Q: {question}\n\nA: {answer}") | |
return f"✅ Saved to {fn}" | |
# --- Gradio UI --- | |
with gr.Blocks(title="PDF Analyzer with Gemini API") as app: | |
gr.Markdown("# 📄 PDF Analyzer with Gemini API") | |
gr.Markdown("Upload a PDF, get a Greg Brockman style summary, and ask questions.") | |
with gr.Tab("Setup"): | |
gr.Markdown("⚠️ Make sure `GOOGLE_API_KEY` is set in your Space's Secrets.") | |
with gr.Tab("PDF Processing"): | |
with gr.Row(): | |
pdf_file = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
proc_btn = gr.Button("Process PDF", variant="primary") | |
status = gr.Markdown("Awaiting upload…") | |
summary_out = gr.Textbox(label="Summary", lines=15) | |
structure_out = gr.Textbox(label="Structure", lines=8) | |
log_info = gr.Textbox(label="Internal Log", lines=5) | |
proc_btn.click( | |
fn=process_pdf, | |
inputs=[pdf_file], | |
outputs=[summary_out, structure_out, status, log_info] | |
) | |
save_sum_btn = gr.Button("Save Summary") | |
save_sum_status = gr.Markdown() | |
save_sum_btn.click(save_summary, inputs=[summary_out], outputs=[save_sum_status]) | |
with gr.Tab("Ask Questions"): | |
question_in = gr.Textbox(label="Your Question", lines=2) | |
ask_btn = gr.Button("Ask", variant="primary") | |
answer_out = gr.Textbox(label="Answer", lines=10) | |
ask_btn.click(ask_question, inputs=[question_in], outputs=[answer_out]) | |
save_qa_btn = gr.Button("Save Q&A") | |
save_qa_status = gr.Markdown() | |
save_qa_btn.click(save_qa, inputs=[question_in, answer_out], outputs=[save_qa_status]) | |
with gr.Tab("System Log"): | |
refresh_btn = gr.Button("Refresh Log") | |
sys_log = gr.Textbox(label="System Log", lines=20) | |
refresh_btn.click(view_log, inputs=None, outputs=[sys_log]) | |
if __name__ == "__main__": | |
app.launch(server_name="0.0.0.0") | |