Didier's picture
Update ocr.py
4d5e90f verified
"""
File: ocr.py
Description: (Traditional) Optical Character Recognition (OCR) using tesseract.
Author: Didier Guillevic
Date: 2024-11-23
"""
import os
os.system("bash setup.sh") # Ensure setup script runs before importing pytesseract
import pytesseract
from pdf2image import convert_from_path
from pdf2image.exceptions import PDFPageCountError, PDFSyntaxError
import os
import uuid
import shutil
import logging
import pypdf
import subprocess
import ocrmypdf
from typing import List, Optional, Tuple, Union
from contextlib import contextmanager
tesseract_psm_modes = {
0: "Orientation and script detection (OSD) only.",
1: "Automatic page segmentation with OSD.",
2: "Automatic page segmentation, but no OSD, or OCR.",
3: "Fully automatic page segmentation, but no OSD. (**default**)",
4: "Assume a single column of text of variable sizes.",
5: "Assume a single uniform block of vertically aligned text.",
6: "Assume a single uniform block of text.",
7: "Treat the image as a single text line.",
8: "Treat the image as a single word.",
9: "Treat the image as a single word in a circle.",
10: "Treat the image as a single character.",
11: "Sparse text. Find as much text as possible in no particular order.",
12: "Sparse text with OSD.",
13: "Raw line. Treat the image as a single text line, bypassing hacks that are Tesseract-specific."
}
tesseract_psm_descriptions = {
"0: Orientation and script detection (OSD) only.": 0,
"1: Automatic page segmentation with OSD.": 1,
"2: Automatic page segmentation, but no OSD, or OCR.": 2,
"3: Fully automatic page segmentation, but no OSD. (**default**)": 3,
"4: Assume a single column of text of variable sizes.": 4,
"5: Assume a single uniform block of vertically aligned text.": 5,
"6: Assume a single uniform block of text.": 6,
"7: Treat the image as a single text line.": 7,
"8: Treat the image as a single word.": 8,
"9: Treat the image as a single word in a circle.": 9,
"10: Treat the image as a single character.": 10,
"11: Sparse text. Find as much text as possible in no particular order.": 11,
"12: Sparse text with OSD.": 12,
"13: Raw line. Treat the image as a single text line, bypassing hacks that are Tesseract-specific.": 13
}
class PDFScannerTempManager:
"""
Manages temporary directory creation and cleanup for PDF scanning operations.
"""
def __init__(self, base_temp_dir: str = 'tmp'):
"""
Initialize temporary directory manager.
Args:
base_temp_dir (str): Base directory for temporary files
"""
self.base_temp_dir = base_temp_dir
self.active_temp_dirs: list[str] = []
# Ensure base temporary directory exists
os.makedirs(base_temp_dir, exist_ok=True)
# Set up logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
@contextmanager
def temp_directory(self) -> str:
"""
Create a temporary directory with UUID and manage its lifecycle.
Yields:
str: Path to the temporary directory
"""
# Generate unique directory name
dir_uuid = str(uuid.uuid4())
temp_dir = os.path.join(self.base_temp_dir, dir_uuid)
try:
# Create directory
os.makedirs(temp_dir, exist_ok=False)
self.active_temp_dirs.append(temp_dir)
# Yield directory path
yield temp_dir
finally:
# Remove directory and its contents
self._cleanup_directory(temp_dir)
def _cleanup_directory(self, directory: str) -> None:
"""
Safely remove a temporary directory.
Args:
directory (str): Path to directory to remove
"""
try:
if os.path.exists(directory):
shutil.rmtree(directory)
# Remove from active directories
if directory in self.active_temp_dirs:
self.active_temp_dirs.remove(directory)
except Exception as e:
self.logger.error(f"Error cleaning up directory {directory}: {e}")
def cleanup_all(self) -> None:
"""
Clean up all temporary directories created during the session.
"""
for directory in list(self.active_temp_dirs):
self._cleanup_directory(directory)
class PDFScanner:
"""
A class to perform OCR on PDF files using Tesseract with robust temp management.
"""
def __init__(self, tesseract_cmd: str = 'tesseract', dpi: int = 300,
temp_manager: Optional[PDFScannerTempManager] = None):
"""
Initialize the PDFScanner.
Args:
tesseract_cmd (str): Path to tesseract executable
dpi (int): DPI for PDF conversion
temp_manager (PDFScannerTempManager, optional): Temp directory manager
"""
self.dpi = dpi
self.temp_manager = temp_manager or PDFScannerTempManager()
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
self.logger = logging.getLogger(__name__)
def _validate_pdf(self, pdf_path: str) -> Tuple[bool, str, bool]:
"""
Validate PDF file and check for encryption.
Returns:
Tuple[bool, str, bool]: (is_valid, error_message, is_encrypted)
"""
try:
with open(pdf_path, 'rb') as file:
# Check if file starts with PDF signature
if not file.read(4) == b'%PDF':
return False, "Not a valid PDF file (missing PDF signature)", False
# Reset file pointer
file.seek(0)
try:
pdf_reader = pypdf.PdfReader(file, strict=False)
is_encrypted = pdf_reader.is_encrypted
if is_encrypted:
return False, "PDF is encrypted and requires password", True
num_pages = len(pdf_reader.pages)
return True, f"Valid PDF with {num_pages} pages", False
except pypdf.errors.PdfReadError as e:
return False, f"Invalid PDF structure: {str(e)}", False
except Exception as e:
return False, f"Error validating PDF: {str(e)}", False
def _repair_pdf(self, pdf_path: str, temp_dir: str) -> str:
"""
Attempt to repair a corrupted PDF file.
Args:
pdf_path (str): Path to original PDF
temp_dir (str): Temporary directory for repair
Returns:
str: Path to repaired PDF
"""
repaired_pdf = os.path.join(temp_dir, 'repaired.pdf')
try:
# pypdf repair attempt
with open(pdf_path, 'rb') as file:
reader = pypdf.PdfReader(file, strict=False)
writer = pypdf.PdfWriter()
for page in reader.pages:
writer.add_page(page)
with open(repaired_pdf, 'wb') as output_file:
writer.write(output_file)
if os.path.exists(repaired_pdf):
return repaired_pdf
except Exception as e:
self.logger.warning(f"pypdf repair failed: {str(e)}")
# Ghostscript repair attempt
try:
gs_command = [
'gs',
'-o', repaired_pdf,
'-sDEVICE=pdfwrite',
'-dPDFSETTINGS=/prepress',
pdf_path
]
process = subprocess.run(
gs_command,
capture_output=True,
text=True
)
if process.returncode == 0 and os.path.exists(repaired_pdf):
return repaired_pdf
else:
raise Exception(f"Ghostscript repair failed: {process.stderr}")
except Exception as e:
self.logger.error(f"PDF repair failed: {str(e)}")
raise
def _process_images(
self,
images: list,
temp_dir: str,
language: str
) -> list[str]:
"""Helper method to process converted images."""
extracted_text = []
for i, image in enumerate(images):
image_path = os.path.join(temp_dir, f'page_{i+1}.png')
try:
# Save with higher quality
image.save(image_path, 'PNG', quality=100)
# Perform OCR with additional configuration
text = pytesseract.image_to_string(
image,
lang=language,
config='--psm 1 --oem 1'
)
extracted_text.append(text)
except Exception as e:
self.logger.error(f"Error processing page {i+1}: {str(e)}")
extracted_text.append(f"[ERROR ON PAGE {i+1}]")
return extracted_text
def pdf_to_text(
self,
pdf_path: str,
language: str = 'eng',
first_page: Optional[int] = None,
last_page: Optional[int] = None,
attempt_repair: bool = True
) -> list[str]:
"""
Convert a PDF file to text using OCR with robust error handling.
Args:
pdf_path (str): Path to the PDF file
language (str): Language for OCR (default: 'eng')
first_page (int, optional): First page to process (1-based)
last_page (int, optional): Last page to process
attempt_repair (bool): Whether to attempt repairing corrupted PDFs
Returns:
list[str]: List of extracted text for each page
"""
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
# Use context manager for automatic cleanup
with self.temp_manager.temp_directory() as temp_dir:
# Validate PDF
is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path)
if not is_valid:
self.logger.warning(f"PDF validation issue: {error_message}")
if is_encrypted:
raise Exception("Cannot process encrypted PDF files")
if attempt_repair:
try:
pdf_path = self._repair_pdf(pdf_path, temp_dir)
self.logger.info("Using repaired PDF file")
except Exception as e:
self.logger.error(f"Repair failed: {str(e)}")
# Conversion methods with increasing complexity
conversion_methods = [
{'use_pdftocairo': True, 'strict': False},
{'use_pdftocairo': False, 'strict': False},
{'use_pdftocairo': True, 'strict': False, 'dpi': self.dpi * 2},
{'use_pdftocairo': False, 'strict': False, 'dpi': self.dpi * 3}
]
last_error = None
for method in conversion_methods:
try:
self.logger.info(f"Trying conversion method: {method}")
images = convert_from_path(
pdf_path,
dpi=method.get('dpi', self.dpi),
first_page=first_page,
last_page=last_page,
thread_count=4,
grayscale=True,
**{k: v for k, v in method.items() if k != 'dpi'}
)
if images:
return self._process_images(images, temp_dir, language)
except Exception as e:
last_error = e
self.logger.warning(f"Method failed: {str(e)}")
continue
if last_error:
raise Exception(f"All conversion methods failed. Last error: {str(last_error)}")
def pdf_to_searchable_pdf(self,
pdf_path: str,
output_path: str,
language: str = 'eng',
first_page: Optional[int] = None,
last_page: Optional[int] = None,
attempt_repair: bool = True) -> str:
"""
Convert a scanned PDF file to a searchable PDF using Tesseract.
Args:
pdf_path (str): Path to the input PDF file
output_path (str): Path to save the searchable PDF
language (str): Language for OCR (default: 'eng')
first_page (int, optional): First page to process (1-based)
last_page (int, optional): Last page to process
attempt_repair (bool): Whether to attempt repairing corrupted PDFs
Returns:
str: Path to the output searchable PDF
"""
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
# Use context manager for automatic cleanup
with self.temp_manager.temp_directory() as temp_dir:
# Validate PDF
is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path)
if not is_valid:
self.logger.warning(f"PDF validation issue: {error_message}")
if is_encrypted:
raise Exception("Cannot process encrypted PDF files")
if attempt_repair:
try:
pdf_path = self._repair_pdf(pdf_path, temp_dir)
self.logger.info("Using repaired PDF file")
except Exception as e:
self.logger.error(f"Repair failed: {str(e)}")
# Process partial PDFs if requested
if first_page is not None or last_page is not None:
partial_pdf_path = os.path.join(temp_dir, 'partial.pdf')
with open(pdf_path, 'rb') as input_file:
reader = pypdf.PdfReader(input_file)
writer = pypdf.PdfWriter()
# Use 0-based indexing for pypdf
start_page = (first_page or 1) - 1
end_page = min(last_page or len(reader.pages), len(reader.pages))
for i in range(start_page, end_page):
writer.add_page(reader.pages[i])
with open(partial_pdf_path, 'wb') as output_file:
writer.write(output_file)
pdf_path = partial_pdf_path
# Extract images from the PDF
try:
images = convert_from_path(
pdf_path,
dpi=self.dpi,
thread_count=4,
grayscale=False
)
except Exception as e:
self.logger.error(f"Failed to convert PDF to images: {str(e)}")
raise
# Process each page individually
page_pdfs = []
for i, image in enumerate(images):
page_num = i + 1
image_path = os.path.join(temp_dir, f'page_{page_num}.png')
pdf_output = os.path.join(temp_dir, f'page_{page_num}')
try:
# Save the image
image.save(image_path, 'PNG', quality=100)
# Use Tesseract directly to create a searchable PDF
tesseract_cmd = [
pytesseract.pytesseract.tesseract_cmd,
image_path,
pdf_output,
'-l', language,
'--psm', '1',
'pdf'
]
process = subprocess.run(
tesseract_cmd,
capture_output=True,
text=True
)
if process.returncode != 0:
self.logger.error(f"Tesseract error on page {page_num}: {process.stderr}")
raise Exception(f"Tesseract failed on page {page_num}: {process.stderr}")
# Add the output PDF to our list
page_pdf_path = f'{pdf_output}.pdf'
if os.path.exists(page_pdf_path):
page_pdfs.append(page_pdf_path)
else:
raise FileNotFoundError(f"Expected output PDF not found: {page_pdf_path}")
except Exception as e:
self.logger.error(f"Error processing page {page_num}: {str(e)}")
raise
# Merge all page PDFs into a single file
if page_pdfs:
# Create a PDF writer
writer = pypdf.PdfWriter()
for pdf in page_pdfs:
reader = pypdf.PdfReader(pdf)
for page in reader.pages:
writer.add_page(page)
# Write to the output path
os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True)
with open(output_path, "wb") as output_file:
writer.write(output_file)
self.logger.info(f"Created searchable PDF at {output_path}")
return output_path
else:
raise Exception("No pages were successfully processed")
def pdf_to_searchable_pdf_ocrmypdf(self,
pdf_path: str,
output_path: str,
language: str = 'eng',
first_page: Optional[int] = None,
last_page: Optional[int] = None,
deskew: bool = True,
optimize: bool = True,
clean: bool = False,
attempt_repair: bool = True) -> str:
"""
Convert a scanned PDF file to a searchable PDF using ocrmypdf.
Args:
pdf_path (str): Path to the input PDF file
output_path (str): Path to save the searchable PDF
language (str): Language for OCR (default: 'eng')
first_page (int, optional): First page to process (1-based)
last_page (int, optional): Last page to process
deskew (bool): Whether to straighten pages
optimize (bool): Whether to optimize the PDF
clean (bool): Whether to clean the image before OCR
attempt_repair (bool): Whether to attempt repairing corrupted PDFs
Returns:
str: Path to the output searchable PDF
"""
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
# Use context manager for automatic cleanup
with self.temp_manager.temp_directory() as temp_dir:
# Validate PDF
is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path)
if not is_valid:
self.logger.warning(f"PDF validation issue: {error_message}")
if is_encrypted:
raise Exception("Cannot process encrypted PDF files")
if attempt_repair:
try:
pdf_path = self._repair_pdf(pdf_path, temp_dir)
self.logger.info("Using repaired PDF file")
except Exception as e:
self.logger.error(f"Repair failed: {str(e)}")
# Process partial PDFs if requested
working_pdf_path = pdf_path
if first_page is not None or last_page is not None:
partial_pdf_path = os.path.join(temp_dir, 'partial.pdf')
with open(pdf_path, 'rb') as input_file:
reader = pypdf.PdfReader(input_file)
writer = pypdf.PdfWriter()
# Use 0-based indexing for pypdf
start_page = (first_page or 1) - 1
end_page = min(last_page or len(reader.pages), len(reader.pages))
for i in range(start_page, end_page):
writer.add_page(reader.pages[i])
with open(partial_pdf_path, 'wb') as output_file:
writer.write(output_file)
working_pdf_path = partial_pdf_path
try:
# Ensure the output directory exists
output_dir = os.path.dirname(os.path.abspath(output_path))
os.makedirs(output_dir, exist_ok=True)
# ocrmypdf has a rich set of options
optimize_level = 1 if optimize else 0
# Run ocrmypdf
result = ocrmypdf.ocr(
working_pdf_path,
output_path,
language=language,
optimize=optimize_level,
skip_text=True, # Don't redo OCR on pages with text
deskew=deskew, # Straighten pages
clean=clean, # Clean pages before OCR
progress_bar=False,
use_threads=True,
output_type="pdf", # Avoids Ghostscript
jobs=os.cpu_count() or 4
)
if result == 0: # Success
self.logger.info(f"Created searchable PDF at {output_path}")
return output_path
else:
raise Exception(f"ocrmypdf returned non-zero exit code: {result}")
except Exception as e:
self.logger.error(f"Error creating searchable PDF with ocrmypdf: {str(e)}")
raise
def image_to_text(self,
image_path: str,
language: str = 'eng',
psm: int = 3
) -> str:
"""
Extract text from an image file using OCR.
Args:
image_path (str): Path to the image file
language (str): Language for OCR (default: 'eng')
psm (int): Page segmentation mode (default: 3)
Returns:
str: Extracted text from the image
"""
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image file not found: {image_path}")
try:
# Use Pillow to open the image
from PIL import Image
image = Image.open(image_path)
# Perform OCR with specified parameters
text = pytesseract.image_to_string(
image,
lang=language,
config=f'--psm {psm} --oem 1'
)
return text
except Exception as e:
self.logger.error(f"Error extracting text from image: {str(e)}")
raise
def image_to_searchable_pdf(self,
image_path: str,
output_path: str,
language: str = 'eng',
psm: int = 3
) -> str:
"""
Convert an image file to a searchable PDF with OCR text.
Args:
image_path (str): Path to the image file
output_path (str): Path to save the searchable PDF
language (str): Language for OCR (default: 'eng')
psm (int): Page segmentation mode (default: 3)
Returns:
str: Path to the output searchable PDF
"""
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image file not found: {image_path}")
# Use context manager for automatic cleanup
with self.temp_manager.temp_directory() as temp_dir:
try:
# Use Tesseract directly to create a searchable PDF
pdf_output = os.path.join(temp_dir, 'output')
tesseract_cmd = [
pytesseract.pytesseract.tesseract_cmd,
image_path,
pdf_output,
'-l', language,
'--psm', str(psm),
'pdf'
]
process = subprocess.run(
tesseract_cmd,
capture_output=True,
text=True
)
if process.returncode != 0:
self.logger.error(f"Tesseract error: {process.stderr}")
raise Exception(f"Tesseract failed: {process.stderr}")
# Check if the PDF was created
temp_pdf_path = f'{pdf_output}.pdf'
if not os.path.exists(temp_pdf_path):
raise FileNotFoundError(f"Expected output PDF not found: {temp_pdf_path}")
# Ensure output directory exists
os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True)
# Copy the file to the desired output location
shutil.copy(temp_pdf_path, output_path)
self.logger.info(f"Created searchable PDF at {output_path}")
return output_path
except Exception as e:
self.logger.error(f"Error creating searchable PDF from image: {str(e)}")
raise
def images_to_searchable_pdf(self,
image_paths: List[str],
output_path: str,
language: str = 'eng',
psm: int = 3
) -> str:
"""
Convert multiple image files to a single searchable PDF with OCR text.
Args:
image_paths (List[str]): List of paths to image files
output_path (str): Path to save the searchable PDF
language (str): Language for OCR (default: 'eng')
psm (int): Page segmentation mode (default: 3)
Returns:
str: Path to the output searchable PDF
"""
if not image_paths:
raise ValueError("No image paths provided")
# Use context manager for automatic cleanup
with self.temp_manager.temp_directory() as temp_dir:
try:
# Process each image separately
page_pdfs = []
for i, img_path in enumerate(image_paths):
if not os.path.exists(img_path):
raise FileNotFoundError(f"Image file not found: {img_path}")
# Create PDF for this image
pdf_output = os.path.join(temp_dir, f'page_{i+1}')
tesseract_cmd = [
pytesseract.pytesseract.tesseract_cmd,
img_path,
pdf_output,
'-l', language,
'--psm', str(psm),
'pdf'
]
process = subprocess.run(
tesseract_cmd,
capture_output=True,
text=True
)
if process.returncode != 0:
self.logger.error(f"Tesseract error on image {i+1}: {process.stderr}")
raise Exception(f"Tesseract failed on image {i+1}: {process.stderr}")
# Add the output PDF to our list
page_pdf_path = f'{pdf_output}.pdf'
if os.path.exists(page_pdf_path):
page_pdfs.append(page_pdf_path)
else:
raise FileNotFoundError(f"Expected output PDF not found: {page_pdf_path}")
# Merge all page PDFs into a single file
if page_pdfs:
# Create a PDF writer
writer = pypdf.PdfWriter()
for pdf in page_pdfs:
reader = pypdf.PdfReader(pdf)
for page in reader.pages:
writer.add_page(page)
# Write to the output path
os.makedirs(os.path.dirname(os.path.abspath(output_path)), exist_ok=True)
with open(output_path, "wb") as output_file:
writer.write(output_file)
self.logger.info(f"Created searchable PDF at {output_path}")
return output_path
else:
raise Exception("No pages were successfully processed")
except Exception as e:
self.logger.error(f"Error creating searchable PDF from images: {str(e)}")
raise
#
# PDFScanner (singleton)
#
pdf_scanner = PDFScanner()
def main():
"""
Example usage of the PDFScanner class.
"""
pdf_file = "./pdfs/Non-text-searchable.pdf"
# Create a temp manager with custom base temp directory
temp_manager = PDFScannerTempManager(base_temp_dir='tmp')
try:
# Initialize scanner with temp manager
scanner = PDFScanner(temp_manager=temp_manager)
# Process PDF to extract text
print("Extracting text from PDF...")
results = scanner.pdf_to_text(
pdf_file,
attempt_repair=True
)
# Print extracted text results
for i, text in enumerate(results, 1):
print(f"\n=== Page {i} ===")
print(text)
# Create searchable PDF using Tesseract's direct PDF output
print("\nCreating searchable PDF using Tesseract...")
output_path = "searchable_output_tesseract.pdf"
scanner.pdf_to_searchable_pdf(
pdf_file,
output_path,
attempt_repair=True
)
print(f"Searchable PDF created at: {output_path}")
# Create searchable PDF using ocrmypdf
print("\nCreating searchable PDF using ocrmypdf...")
output_path_ocrmypdf = "searchable_output_ocrmypdf.pdf"
scanner.pdf_to_searchable_pdf_ocrmypdf(
pdf_file,
output_path_ocrmypdf,
deskew=True,
optimize=True,
clean=False,
attempt_repair=True
)
print(f"Searchable PDF (ocrmypdf method) created at: {output_path_ocrmypdf}")
# Extract text from a single image
image_file = "./images/sample.png"
print("Extracting text from image...")
text = scanner.image_to_text(image_file)
print("Extracted text:")
print(text)
# Create searchable PDF from a single image
print("\nCreating searchable PDF from image...")
output_path = "searchable_image.pdf"
scanner.image_to_searchable_pdf(image_file, output_path)
print(f"Searchable PDF created at: {output_path}")
# Create searchable PDF from multiple images
image_files = [
"./images/page1.png",
"./images/page2.jpg",
"./images/page3.tiff"
]
print("\nCreating searchable PDF from multiple images...")
output_path_multi = "searchable_multiple_images.pdf"
scanner.images_to_searchable_pdf(image_files, output_path_multi)
print(f"Multi-page searchable PDF created at: {output_path_multi}")
except Exception as e:
print(f"Error: {str(e)}")
finally:
# Explicitly clean up all temp directories
temp_manager.cleanup_all()
if __name__ == "__main__":
main()