""" File: ocr.py Description: (Traditional) Optical Character Recognition (OCR) using tesseract. Author: Didier Guillevic Date: 2024-11-23 """ import os os.system("bash setup.sh") # Ensure setup script runs before importing pytesseract import pytesseract from pdf2image import convert_from_path from pdf2image.exceptions import PDFPageCountError, PDFSyntaxError import os import uuid import shutil import logging import pypdf import subprocess import ocrmypdf from typing import List, Optional, Tuple, Union from contextlib import contextmanager tesseract_psm_modes = { 0: "Orientation and script detection (OSD) only.", 1: "Automatic page segmentation with OSD.", 2: "Automatic page segmentation, but no OSD, or OCR.", 3: "Fully automatic page segmentation, but no OSD. (**default**)", 4: "Assume a single column of text of variable sizes.", 5: "Assume a single uniform block of vertically aligned text.", 6: "Assume a single uniform block of text.", 7: "Treat the image as a single text line.", 8: "Treat the image as a single word.", 9: "Treat the image as a single word in a circle.", 10: "Treat the image as a single character.", 11: "Sparse text. Find as much text as possible in no particular order.", 12: "Sparse text with OSD.", 13: "Raw line. Treat the image as a single text line, bypassing hacks that are Tesseract-specific." } tesseract_psm_descriptions = { "0: Orientation and script detection (OSD) only.": 0, "1: Automatic page segmentation with OSD.": 1, "2: Automatic page segmentation, but no OSD, or OCR.": 2, "3: Fully automatic page segmentation, but no OSD. (**default**)": 3, "4: Assume a single column of text of variable sizes.": 4, "5: Assume a single uniform block of vertically aligned text.": 5, "6: Assume a single uniform block of text.": 6, "7: Treat the image as a single text line.": 7, "8: Treat the image as a single word.": 8, "9: Treat the image as a single word in a circle.": 9, "10: Treat the image as a single character.": 10, "11: Sparse text. Find as much text as possible in no particular order.": 11, "12: Sparse text with OSD.": 12, "13: Raw line. Treat the image as a single text line, bypassing hacks that are Tesseract-specific.": 13 } class PDFScannerTempManager: """ Manages temporary directory creation and cleanup for PDF scanning operations. """ def __init__(self, base_temp_dir: str = 'tmp'): """ Initialize temporary directory manager. Args: base_temp_dir (str): Base directory for temporary files """ self.base_temp_dir = base_temp_dir self.active_temp_dirs: list[str] = [] # Ensure base temporary directory exists os.makedirs(base_temp_dir, exist_ok=True) # Set up logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) @contextmanager def temp_directory(self) -> str: """ Create a temporary directory with UUID and manage its lifecycle. Yields: str: Path to the temporary directory """ # Generate unique directory name dir_uuid = str(uuid.uuid4()) temp_dir = os.path.join(self.base_temp_dir, dir_uuid) try: # Create directory os.makedirs(temp_dir, exist_ok=False) self.active_temp_dirs.append(temp_dir) # Yield directory path yield temp_dir finally: # Remove directory and its contents self._cleanup_directory(temp_dir) def _cleanup_directory(self, directory: str) -> None: """ Safely remove a temporary directory. Args: directory (str): Path to directory to remove """ try: if os.path.exists(directory): shutil.rmtree(directory) # Remove from active directories if directory in self.active_temp_dirs: self.active_temp_dirs.remove(directory) except Exception as e: self.logger.error(f"Error cleaning up directory {directory}: {e}") def cleanup_all(self) -> None: """ Clean up all temporary directories created during the session. """ for directory in list(self.active_temp_dirs): self._cleanup_directory(directory) class PDFScanner: """ A class to perform OCR on PDF files using Tesseract with robust temp management. """ def __init__(self, tesseract_cmd: str = 'tesseract', dpi: int = 300, temp_manager: Optional[PDFScannerTempManager] = None): """ Initialize the PDFScanner. Args: tesseract_cmd (str): Path to tesseract executable dpi (int): DPI for PDF conversion temp_manager (PDFScannerTempManager, optional): Temp directory manager """ self.dpi = dpi self.temp_manager = temp_manager or PDFScannerTempManager() pytesseract.pytesseract.tesseract_cmd = tesseract_cmd self.logger = logging.getLogger(__name__) def _validate_pdf(self, pdf_path: str) -> Tuple[bool, str, bool]: """ Validate PDF file and check for encryption. Returns: Tuple[bool, str, bool]: (is_valid, error_message, is_encrypted) """ try: with open(pdf_path, 'rb') as file: # Check if file starts with PDF signature if not file.read(4) == b'%PDF': return False, "Not a valid PDF file (missing PDF signature)", False # Reset file pointer file.seek(0) try: pdf_reader = pypdf.PdfReader(file, strict=False) is_encrypted = pdf_reader.is_encrypted if is_encrypted: return False, "PDF is encrypted and requires password", True num_pages = len(pdf_reader.pages) return True, f"Valid PDF with {num_pages} pages", False except pypdf.errors.PdfReadError as e: return False, f"Invalid PDF structure: {str(e)}", False except Exception as e: return False, f"Error validating PDF: {str(e)}", False def _repair_pdf(self, pdf_path: str, temp_dir: str) -> str: """ Attempt to repair a corrupted PDF file. Args: pdf_path (str): Path to original PDF temp_dir (str): Temporary directory for repair Returns: str: Path to repaired PDF """ repaired_pdf = os.path.join(temp_dir, 'repaired.pdf') try: # pypdf repair attempt with open(pdf_path, 'rb') as file: reader = pypdf.PdfReader(file, strict=False) writer = pypdf.PdfWriter() for page in reader.pages: writer.add_page(page) with open(repaired_pdf, 'wb') as output_file: writer.write(output_file) if os.path.exists(repaired_pdf): return repaired_pdf except Exception as e: self.logger.warning(f"pypdf repair failed: {str(e)}") # Ghostscript repair attempt try: gs_command = [ 'gs', '-o', repaired_pdf, '-sDEVICE=pdfwrite', '-dPDFSETTINGS=/prepress', pdf_path ] process = subprocess.run( gs_command, capture_output=True, text=True ) if process.returncode == 0 and os.path.exists(repaired_pdf): return repaired_pdf else: raise Exception(f"Ghostscript repair failed: {process.stderr}") except Exception as e: self.logger.error(f"PDF repair failed: {str(e)}") raise def _process_images( self, images: list, temp_dir: str, language: str ) -> list[str]: """Helper method to process converted images.""" extracted_text = [] for i, image in enumerate(images): image_path = os.path.join(temp_dir, f'page_{i+1}.png') try: # Save with higher quality image.save(image_path, 'PNG', quality=100) # Perform OCR with additional configuration text = pytesseract.image_to_string( image, lang=language, config='--psm 1 --oem 1' ) extracted_text.append(text) except Exception as e: self.logger.error(f"Error processing page {i+1}: {str(e)}") extracted_text.append(f"[ERROR ON PAGE {i+1}]") return extracted_text def pdf_to_text( self, pdf_path: str, language: str = 'eng', first_page: Optional[int] = None, last_page: Optional[int] = None, attempt_repair: bool = True ) -> list[str]: """ Convert a PDF file to text using OCR with robust error handling. Args: pdf_path (str): Path to the PDF file language (str): Language for OCR (default: 'eng') first_page (int, optional): First page to process (1-based) last_page (int, optional): Last page to process attempt_repair (bool): Whether to attempt repairing corrupted PDFs Returns: list[str]: List of extracted text for each page """ if not os.path.exists(pdf_path): raise FileNotFoundError(f"PDF file not found: {pdf_path}") # Use context manager for automatic cleanup with self.temp_manager.temp_directory() as temp_dir: # Validate PDF is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path) if not is_valid: self.logger.warning(f"PDF validation issue: {error_message}") if is_encrypted: raise Exception("Cannot process encrypted PDF files") if attempt_repair: try: pdf_path = self._repair_pdf(pdf_path, temp_dir) self.logger.info("Using repaired PDF file") except Exception as e: self.logger.error(f"Repair failed: {str(e)}") # Conversion methods with increasing complexity conversion_methods = [ {'use_pdftocairo': True, 'strict': False}, {'use_pdftocairo': False, 'strict': False}, {'use_pdftocairo': True, 'strict': False, 'dpi': self.dpi * 2}, {'use_pdftocairo': False, 'strict': False, 'dpi': self.dpi * 3} ] last_error = None for method in conversion_methods: try: self.logger.info(f"Trying conversion method: {method}") images = convert_from_path( pdf_path, dpi=method.get('dpi', self.dpi), first_page=first_page, last_page=last_page, thread_count=4, grayscale=True, **{k: v for k, v in method.items() if k != 'dpi'} ) if images: return self._process_images(images, temp_dir, language) except Exception as e: last_error = e self.logger.warning(f"Method failed: {str(e)}") continue if last_error: raise Exception(f"All conversion methods failed. Last error: {str(last_error)}") def pdf_to_searchable_pdf(self, pdf_path: str, output_path: str, language: str = 'eng', first_page: Optional[int] = None, last_page: Optional[int] = None, attempt_repair: bool = True) -> str: """ Convert a scanned PDF file to a searchable PDF using Tesseract. Args: pdf_path (str): Path to the input PDF file output_path (str): Path to save the searchable PDF language (str): Language for OCR (default: 'eng') first_page (int, optional): First page to process (1-based) last_page (int, optional): Last page to process attempt_repair (bool): Whether to attempt repairing corrupted PDFs Returns: str: Path to the output searchable PDF """ if not os.path.exists(pdf_path): raise FileNotFoundError(f"PDF file not found: {pdf_path}") # Use context manager for automatic cleanup with self.temp_manager.temp_directory() as temp_dir: # Validate PDF is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path) if not is_valid: self.logger.warning(f"PDF validation issue: {error_message}") if is_encrypted: raise Exception("Cannot process encrypted PDF files") if attempt_repair: try: pdf_path = self._repair_pdf(pdf_path, temp_dir) self.logger.info("Using repaired PDF file") except Exception as e: self.logger.error(f"Repair failed: {str(e)}") # Process partial PDFs if requested if first_page is not None or last_page is not None: partial_pdf_path = os.path.join(temp_dir, 'partial.pdf') with open(pdf_path, 'rb') as input_file: reader = pypdf.PdfReader(input_file) writer = pypdf.PdfWriter() # Use 0-based indexing for pypdf start_page = (first_page or 1) - 1 end_page = min(last_page or len(reader.pages), len(reader.pages)) for i in range(start_page, end_page): writer.add_page(reader.pages[i]) with open(partial_pdf_path, 'wb') as output_file: writer.write(output_file) pdf_path = partial_pdf_path # Extract images from the PDF try: images = convert_from_path( pdf_path, dpi=self.dpi, thread_count=4, grayscale=False ) except Exception as e: self.logger.error(f"Failed to convert PDF to images: {str(e)}") raise # Process each page individually page_pdfs = [] for i, image in enumerate(images): page_num = i + 1 image_path = os.path.join(temp_dir, f'page_{page_num}.png') pdf_output = os.path.join(temp_dir, f'page_{page_num}') try: # Save the image image.save(image_path, 'PNG', quality=100) # Use Tesseract directly to create a searchable PDF tesseract_cmd = [ pytesseract.pytesseract.tesseract_cmd, image_path, pdf_output, '-l', language, '--psm', '1', 'pdf' ] process = subprocess.run( tesseract_cmd, capture_output=True, text=True ) if process.returncode != 0: self.logger.error(f"Tesseract error on page {page_num}: {process.stderr}") raise Exception(f"Tesseract failed on page {page_num}: {process.stderr}") # Add the output PDF to our list page_pdf_path = f'{pdf_output}.pdf' if os.path.exists(page_pdf_path): page_pdfs.append(page_pdf_path) else: raise FileNotFoundError(f"Expected output PDF not found: {page_pdf_path}") except Exception as e: self.logger.error(f"Error processing page {page_num}: {str(e)}") raise # Merge all page PDFs into a single file if page_pdfs: # Create a PDF writer writer = pypdf.PdfWriter() for pdf in page_pdfs: reader = pypdf.PdfReader(pdf) for page in reader.pages: writer.add_page(page) # Write to the output path os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) with open(output_path, "wb") as output_file: writer.write(output_file) self.logger.info(f"Created searchable PDF at {output_path}") return output_path else: raise Exception("No pages were successfully processed") def pdf_to_searchable_pdf_ocrmypdf(self, pdf_path: str, output_path: str, language: str = 'eng', first_page: Optional[int] = None, last_page: Optional[int] = None, deskew: bool = True, optimize: bool = True, clean: bool = False, attempt_repair: bool = True) -> str: """ Convert a scanned PDF file to a searchable PDF using ocrmypdf. Args: pdf_path (str): Path to the input PDF file output_path (str): Path to save the searchable PDF language (str): Language for OCR (default: 'eng') first_page (int, optional): First page to process (1-based) last_page (int, optional): Last page to process deskew (bool): Whether to straighten pages optimize (bool): Whether to optimize the PDF clean (bool): Whether to clean the image before OCR attempt_repair (bool): Whether to attempt repairing corrupted PDFs Returns: str: Path to the output searchable PDF """ if not os.path.exists(pdf_path): raise FileNotFoundError(f"PDF file not found: {pdf_path}") # Use context manager for automatic cleanup with self.temp_manager.temp_directory() as temp_dir: # Validate PDF is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path) if not is_valid: self.logger.warning(f"PDF validation issue: {error_message}") if is_encrypted: raise Exception("Cannot process encrypted PDF files") if attempt_repair: try: pdf_path = self._repair_pdf(pdf_path, temp_dir) self.logger.info("Using repaired PDF file") except Exception as e: self.logger.error(f"Repair failed: {str(e)}") # Process partial PDFs if requested working_pdf_path = pdf_path if first_page is not None or last_page is not None: partial_pdf_path = os.path.join(temp_dir, 'partial.pdf') with open(pdf_path, 'rb') as input_file: reader = pypdf.PdfReader(input_file) writer = pypdf.PdfWriter() # Use 0-based indexing for pypdf start_page = (first_page or 1) - 1 end_page = min(last_page or len(reader.pages), len(reader.pages)) for i in range(start_page, end_page): writer.add_page(reader.pages[i]) with open(partial_pdf_path, 'wb') as output_file: writer.write(output_file) working_pdf_path = partial_pdf_path try: # Ensure the output directory exists output_dir = os.path.dirname(os.path.abspath(output_path)) os.makedirs(output_dir, exist_ok=True) # ocrmypdf has a rich set of options optimize_level = 1 if optimize else 0 # Run ocrmypdf result = ocrmypdf.ocr( working_pdf_path, output_path, language=language, optimize=optimize_level, skip_text=True, # Don't redo OCR on pages with text deskew=deskew, # Straighten pages clean=clean, # Clean pages before OCR progress_bar=False, use_threads=True, output_type="pdf", # Avoids Ghostscript jobs=os.cpu_count() or 4 ) if result == 0: # Success self.logger.info(f"Created searchable PDF at {output_path}") return output_path else: raise Exception(f"ocrmypdf returned non-zero exit code: {result}") except Exception as e: self.logger.error(f"Error creating searchable PDF with ocrmypdf: {str(e)}") raise def image_to_text(self, image_path: str, language: str = 'eng', psm: int = 3 ) -> str: """ Extract text from an image file using OCR. Args: image_path (str): Path to the image file language (str): Language for OCR (default: 'eng') psm (int): Page segmentation mode (default: 3) Returns: str: Extracted text from the image """ if not os.path.exists(image_path): raise FileNotFoundError(f"Image file not found: {image_path}") try: # Use Pillow to open the image from PIL import Image image = Image.open(image_path) # Perform OCR with specified parameters text = pytesseract.image_to_string( image, lang=language, config=f'--psm {psm} --oem 1' ) return text except Exception as e: self.logger.error(f"Error extracting text from image: {str(e)}") raise def image_to_searchable_pdf(self, image_path: str, output_path: str, language: str = 'eng', psm: int = 3 ) -> str: """ Convert an image file to a searchable PDF with OCR text. Args: image_path (str): Path to the image file output_path (str): Path to save the searchable PDF language (str): Language for OCR (default: 'eng') psm (int): Page segmentation mode (default: 3) Returns: str: Path to the output searchable PDF """ if not os.path.exists(image_path): raise FileNotFoundError(f"Image file not found: {image_path}") # Use context manager for automatic cleanup with self.temp_manager.temp_directory() as temp_dir: try: # Use Tesseract directly to create a searchable PDF pdf_output = os.path.join(temp_dir, 'output') tesseract_cmd = [ pytesseract.pytesseract.tesseract_cmd, image_path, pdf_output, '-l', language, '--psm', str(psm), 'pdf' ] process = subprocess.run( tesseract_cmd, capture_output=True, text=True ) if process.returncode != 0: self.logger.error(f"Tesseract error: {process.stderr}") raise Exception(f"Tesseract failed: {process.stderr}") # Check if the PDF was created temp_pdf_path = f'{pdf_output}.pdf' if not os.path.exists(temp_pdf_path): raise FileNotFoundError(f"Expected output PDF not found: {temp_pdf_path}") # Ensure output directory exists os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) # Copy the file to the desired output location shutil.copy(temp_pdf_path, output_path) self.logger.info(f"Created searchable PDF at {output_path}") return output_path except Exception as e: self.logger.error(f"Error creating searchable PDF from image: {str(e)}") raise def images_to_searchable_pdf(self, image_paths: List[str], output_path: str, language: str = 'eng', psm: int = 3 ) -> str: """ Convert multiple image files to a single searchable PDF with OCR text. Args: image_paths (List[str]): List of paths to image files output_path (str): Path to save the searchable PDF language (str): Language for OCR (default: 'eng') psm (int): Page segmentation mode (default: 3) Returns: str: Path to the output searchable PDF """ if not image_paths: raise ValueError("No image paths provided") # Use context manager for automatic cleanup with self.temp_manager.temp_directory() as temp_dir: try: # Process each image separately page_pdfs = [] for i, img_path in enumerate(image_paths): if not os.path.exists(img_path): raise FileNotFoundError(f"Image file not found: {img_path}") # Create PDF for this image pdf_output = os.path.join(temp_dir, f'page_{i+1}') tesseract_cmd = [ pytesseract.pytesseract.tesseract_cmd, img_path, pdf_output, '-l', language, '--psm', str(psm), 'pdf' ] process = subprocess.run( tesseract_cmd, capture_output=True, text=True ) if process.returncode != 0: self.logger.error(f"Tesseract error on image {i+1}: {process.stderr}") raise Exception(f"Tesseract failed on image {i+1}: {process.stderr}") # Add the output PDF to our list page_pdf_path = f'{pdf_output}.pdf' if os.path.exists(page_pdf_path): page_pdfs.append(page_pdf_path) else: raise FileNotFoundError(f"Expected output PDF not found: {page_pdf_path}") # Merge all page PDFs into a single file if page_pdfs: # Create a PDF writer writer = pypdf.PdfWriter() for pdf in page_pdfs: reader = pypdf.PdfReader(pdf) for page in reader.pages: writer.add_page(page) # Write to the output path os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True) with open(output_path, "wb") as output_file: writer.write(output_file) self.logger.info(f"Created searchable PDF at {output_path}") return output_path else: raise Exception("No pages were successfully processed") except Exception as e: self.logger.error(f"Error creating searchable PDF from images: {str(e)}") raise # # PDFScanner (singleton) # pdf_scanner = PDFScanner() def main(): """ Example usage of the PDFScanner class. """ pdf_file = "./pdfs/Non-text-searchable.pdf" # Create a temp manager with custom base temp directory temp_manager = PDFScannerTempManager(base_temp_dir='tmp') try: # Initialize scanner with temp manager scanner = PDFScanner(temp_manager=temp_manager) # Process PDF to extract text print("Extracting text from PDF...") results = scanner.pdf_to_text( pdf_file, attempt_repair=True ) # Print extracted text results for i, text in enumerate(results, 1): print(f"\n=== Page {i} ===") print(text) # Create searchable PDF using Tesseract's direct PDF output print("\nCreating searchable PDF using Tesseract...") output_path = "searchable_output_tesseract.pdf" scanner.pdf_to_searchable_pdf( pdf_file, output_path, attempt_repair=True ) print(f"Searchable PDF created at: {output_path}") # Create searchable PDF using ocrmypdf print("\nCreating searchable PDF using ocrmypdf...") output_path_ocrmypdf = "searchable_output_ocrmypdf.pdf" scanner.pdf_to_searchable_pdf_ocrmypdf( pdf_file, output_path_ocrmypdf, deskew=True, optimize=True, clean=False, attempt_repair=True ) print(f"Searchable PDF (ocrmypdf method) created at: {output_path_ocrmypdf}") # Extract text from a single image image_file = "./images/sample.png" print("Extracting text from image...") text = scanner.image_to_text(image_file) print("Extracted text:") print(text) # Create searchable PDF from a single image print("\nCreating searchable PDF from image...") output_path = "searchable_image.pdf" scanner.image_to_searchable_pdf(image_file, output_path) print(f"Searchable PDF created at: {output_path}") # Create searchable PDF from multiple images image_files = [ "./images/page1.png", "./images/page2.jpg", "./images/page3.tiff" ] print("\nCreating searchable PDF from multiple images...") output_path_multi = "searchable_multiple_images.pdf" scanner.images_to_searchable_pdf(image_files, output_path_multi) print(f"Multi-page searchable PDF created at: {output_path_multi}") except Exception as e: print(f"Error: {str(e)}") finally: # Explicitly clean up all temp directories temp_manager.cleanup_all() if __name__ == "__main__": main()