|
""" |
|
File: ocr.py |
|
Description: Optical Character Recognition (OCR) using software 2.0 models |
|
Author: Didier Guillevic |
|
Date: 2025-04-06 |
|
""" |
|
|
|
import os |
|
os.system("bash setup.sh") |
|
import magic |
|
import vlm |
|
|
|
import uuid |
|
import shutil |
|
import threading |
|
import time |
|
import pathlib |
|
|
|
import pdf2image |
|
from pdf2image.exceptions import PDFPageCountError, PDFSyntaxError |
|
import pypdf |
|
import base64 |
|
from contextlib import contextmanager |
|
from typing import List, Optional, Tuple, Union |
|
|
|
import logging |
|
|
|
class PDFScannerTempManager: |
|
""" |
|
Manages temporary directory creation and cleanup for PDF scanning operations. |
|
""" |
|
|
|
def __init__(self, base_temp_dir: str = 'tmp'): |
|
""" |
|
Initialize temporary directory manager. |
|
|
|
Args: |
|
base_temp_dir (str): Base directory for temporary files |
|
""" |
|
self.base_temp_dir = base_temp_dir |
|
self.active_temp_dirs: list[str] = [] |
|
|
|
|
|
os.makedirs(base_temp_dir, exist_ok=True) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
self.logger = logging.getLogger(__name__) |
|
|
|
@contextmanager |
|
def temp_directory(self) -> str: |
|
""" |
|
Create a temporary directory with UUID and manage its lifecycle. |
|
|
|
Yields: |
|
str: Path to the temporary directory |
|
""" |
|
|
|
dir_uuid = str(uuid.uuid4()) |
|
temp_dir = os.path.join(self.base_temp_dir, dir_uuid) |
|
|
|
try: |
|
|
|
os.makedirs(temp_dir, exist_ok=False) |
|
self.active_temp_dirs.append(temp_dir) |
|
|
|
|
|
yield temp_dir |
|
|
|
finally: |
|
|
|
self._cleanup_directory(temp_dir) |
|
|
|
def _cleanup_directory(self, directory: str) -> None: |
|
""" |
|
Safely remove a temporary directory. |
|
|
|
Args: |
|
directory (str): Path to directory to remove |
|
""" |
|
try: |
|
if os.path.exists(directory): |
|
shutil.rmtree(directory) |
|
|
|
|
|
if directory in self.active_temp_dirs: |
|
self.active_temp_dirs.remove(directory) |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error cleaning up directory {directory}: {e}") |
|
|
|
def cleanup_all(self) -> None: |
|
""" |
|
Clean up all temporary directories created during the session. |
|
""" |
|
for directory in list(self.active_temp_dirs): |
|
self._cleanup_directory(directory) |
|
|
|
|
|
class PDFScanner: |
|
""" |
|
A class to perform OCR on PDF files with robust temp management. |
|
""" |
|
|
|
def __init__(self, |
|
dpi: int = 300, |
|
temp_manager: Optional[PDFScannerTempManager] = None |
|
): |
|
""" |
|
Initialize the PDFScanner. |
|
|
|
Args: |
|
dpi (int): DPI for PDF conversion |
|
temp_manager (PDFScannerTempManager, optional): Temp directory manager |
|
""" |
|
self.dpi = dpi |
|
self.temp_manager = temp_manager or PDFScannerTempManager() |
|
self.logger = logging.getLogger(__name__) |
|
|
|
def _validate_pdf(self, pdf_path: str) -> Tuple[bool, str, bool]: |
|
""" |
|
Validate PDF file and check for encryption. |
|
|
|
Returns: |
|
Tuple[bool, str, bool]: (is_valid, error_message, is_encrypted) |
|
""" |
|
try: |
|
with open(pdf_path, 'rb') as file: |
|
|
|
if not file.read(4) == b'%PDF': |
|
return False, "Not a valid PDF file (missing PDF signature)", False |
|
|
|
|
|
file.seek(0) |
|
|
|
try: |
|
pdf_reader = pypdf.PdfReader(file, strict=False) |
|
is_encrypted = pdf_reader.is_encrypted |
|
|
|
if is_encrypted: |
|
return False, "PDF is encrypted and requires password", True |
|
|
|
num_pages = len(pdf_reader.pages) |
|
return True, f"Valid PDF with {num_pages} pages", False |
|
|
|
except pypdf.errors.PdfReadError as e: |
|
return False, f"Invalid PDF structure: {str(e)}", False |
|
|
|
except Exception as e: |
|
return False, f"Error validating PDF: {str(e)}", False |
|
|
|
def _repair_pdf(self, pdf_path: str, temp_dir: str) -> str: |
|
""" |
|
Attempt to repair a corrupted PDF file. |
|
|
|
Args: |
|
pdf_path (str): Path to original PDF |
|
temp_dir (str): Temporary directory for repair |
|
|
|
Returns: |
|
str: Path to repaired PDF |
|
""" |
|
repaired_pdf = os.path.join(temp_dir, 'repaired.pdf') |
|
|
|
try: |
|
|
|
with open(pdf_path, 'rb') as file: |
|
reader = pypdf.PdfReader(file, strict=False) |
|
writer = pypdf.PdfWriter() |
|
|
|
for page in reader.pages: |
|
writer.add_page(page) |
|
|
|
with open(repaired_pdf, 'wb') as output_file: |
|
writer.write(output_file) |
|
|
|
if os.path.exists(repaired_pdf): |
|
return repaired_pdf |
|
|
|
except Exception as e: |
|
self.logger.warning(f"pypdf repair failed: {str(e)}") |
|
|
|
|
|
try: |
|
gs_command = [ |
|
'gs', |
|
'-o', repaired_pdf, |
|
'-sDEVICE=pdfwrite', |
|
'-dPDFSETTINGS=/prepress', |
|
pdf_path |
|
] |
|
|
|
process = subprocess.run( |
|
gs_command, |
|
capture_output=True, |
|
text=True |
|
) |
|
|
|
if process.returncode == 0 and os.path.exists(repaired_pdf): |
|
return repaired_pdf |
|
else: |
|
raise Exception(f"Ghostscript repair failed: {process.stderr}") |
|
|
|
except Exception as e: |
|
self.logger.error(f"PDF repair failed: {str(e)}") |
|
raise |
|
|
|
def _process_images( |
|
self, |
|
images: list, |
|
temp_dir: str, |
|
language: str |
|
) -> list[str]: |
|
"""Helper method to process converted images.""" |
|
extracted_text = [] |
|
|
|
for i, image in enumerate(images): |
|
image_path = os.path.join(temp_dir, f'page_{i+1}.png') |
|
try: |
|
|
|
image.save(image_path, 'PNG', quality=100) |
|
|
|
|
|
text = process_image_file(image_path) |
|
extracted_text.append(text) |
|
|
|
except Exception as e: |
|
self.logger.error(f"Error processing page {i+1}: {str(e)}") |
|
extracted_text.append(f"[ERROR ON PAGE {i+1}]") |
|
|
|
return extracted_text |
|
|
|
def pdf_to_text( |
|
self, |
|
pdf_path: str, |
|
language: str = 'eng', |
|
first_page: Optional[int] = None, |
|
last_page: Optional[int] = None, |
|
attempt_repair: bool = True |
|
) -> list[str]: |
|
""" |
|
Convert a PDF file to text using OCR with robust error handling. |
|
|
|
Args: |
|
pdf_path (str): Path to the PDF file |
|
language (str): Language for OCR (default: 'eng') |
|
first_page (int, optional): First page to process (1-based) |
|
last_page (int, optional): Last page to process |
|
attempt_repair (bool): Whether to attempt repairing corrupted PDFs |
|
|
|
Returns: |
|
list[str]: List of extracted text for each page |
|
""" |
|
if not os.path.exists(pdf_path): |
|
raise FileNotFoundError(f"PDF file not found: {pdf_path}") |
|
|
|
|
|
with self.temp_manager.temp_directory() as temp_dir: |
|
|
|
is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path) |
|
if not is_valid: |
|
self.logger.warning(f"PDF validation issue: {error_message}") |
|
|
|
if is_encrypted: |
|
raise Exception("Cannot process encrypted PDF files") |
|
|
|
if attempt_repair: |
|
try: |
|
pdf_path = self._repair_pdf(pdf_path, temp_dir) |
|
self.logger.info("Using repaired PDF file") |
|
except Exception as e: |
|
self.logger.error(f"Repair failed: {str(e)}") |
|
|
|
|
|
conversion_methods = [ |
|
{'use_pdftocairo': True, 'strict': False}, |
|
{'use_pdftocairo': False, 'strict': False}, |
|
{'use_pdftocairo': True, 'strict': False, 'dpi': self.dpi * 2}, |
|
{'use_pdftocairo': False, 'strict': False, 'dpi': self.dpi * 3} |
|
] |
|
|
|
last_error = None |
|
for method in conversion_methods: |
|
try: |
|
self.logger.info(f"Trying conversion method: {method}") |
|
images = pdf2image.convert_from_path( |
|
pdf_path, |
|
dpi=method.get('dpi', self.dpi), |
|
first_page=first_page, |
|
last_page=last_page, |
|
thread_count=4, |
|
grayscale=True, |
|
**{k: v for k, v in method.items() if k != 'dpi'} |
|
) |
|
|
|
if images: |
|
return self._process_images(images, temp_dir, language) |
|
|
|
except Exception as e: |
|
last_error = e |
|
self.logger.warning(f"Method failed: {str(e)}") |
|
continue |
|
|
|
if last_error: |
|
raise Exception(f"All conversion methods failed. Last error: {str(last_error)}") |
|
|
|
|
|
|
|
|
|
pdf_scanner = PDFScanner() |
|
|
|
|
|
|
|
|
|
|
|
def process_file(input_file: str): |
|
"""Process given file with OCR" |
|
""" |
|
file_type = get_file_type(input_file) |
|
|
|
if file_type == "Image": |
|
return process_image_file(input_file) |
|
elif file_type == "PDF": |
|
return process_pdf_file(input_file) |
|
else: |
|
return "Unsupported file type. Please upload a PDF, or an image file." |
|
|
|
|
|
def process_image_file(input_file: str): |
|
"""Process image file with OCR |
|
""" |
|
messages = [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": ( |
|
|
|
|
|
"Could you perform optical characer recognition (OCR) on the image? " |
|
"Simply return the text without any additional comments. " |
|
"The exception would be if the image represents an ID card. " |
|
"In such a case, please return the information in a structured format. " |
|
) |
|
}, |
|
{ |
|
"type": "image_url", |
|
"image_url": f"data:image/jpeg;base64,{encode_image(input_file)}" |
|
} |
|
] |
|
} |
|
] |
|
return vlm.get_response(messages) |
|
|
|
|
|
def process_pdf_file(input_file: str): |
|
"""Process PDF file with OCR |
|
|
|
Args: |
|
input_file: the PDF file to process with OCR |
|
|
|
Returns: |
|
the text OCR result |
|
|
|
Note: |
|
Each page of the PDF is processed as an image. |
|
""" |
|
texts = pdf_scanner.pdf_to_text(pdf_path=input_file.name) |
|
output_text = '\n\n'.join(texts) |
|
return output_text |
|
|
|
|
|
|
|
|
|
|
|
def get_file_type(file_path): |
|
|
|
file_extension = os.path.splitext(file_path)[1].lower() |
|
|
|
|
|
mime = magic.Magic(mime=True) |
|
mime_type = mime.from_file(file_path) |
|
|
|
|
|
if file_extension == '.pdf' or mime_type == 'application/pdf': |
|
return 'PDF' |
|
elif file_extension in ['.jpg', '.jpeg', '.png', '.gif'] or mime_type.startswith('image/'): |
|
return 'Image' |
|
elif file_extension == '.pptx' or mime_type == 'application/vnd.openxmlformats-officedocument.presentationml.presentation': |
|
return 'PowerPoint' |
|
else: |
|
return 'Other' |
|
|
|
|
|
|
|
|
|
def encode_image(image_path): |
|
"""Encode the image to base64.""" |
|
try: |
|
with open(image_path, "rb") as image_file: |
|
return base64.b64encode(image_file.read()).decode('utf-8') |
|
except FileNotFoundError: |
|
print(f"Error: The file {image_path} was not found.") |
|
return None |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return None |
|
|