Didier commited on
Commit
8cc1c22
·
verified ·
1 Parent(s): c24aa10

Upload 2 files

Browse files
Files changed (2) hide show
  1. module_ocr.py +62 -0
  2. ocr.py +407 -0
module_ocr.py ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ File: module_ocr.py
3
+ Description: Use a vision language model for Optical Character Recognition (OCR) tasks.
4
+ Author: Didier Guillevic
5
+ Date: 2025-04-06
6
+ """
7
+
8
+ import gradio as gr
9
+ import ocr
10
+
11
+ #
12
+ # Process one file
13
+ #
14
+ def process(input_file: str):
15
+ """Process given file with OCR."
16
+ """
17
+ return ocr.process_file(input_file)
18
+
19
+
20
+ #
21
+ # User interface
22
+ #
23
+ with gr.Blocks() as demo:
24
+
25
+ # Upload file to process
26
+ with gr.Row():
27
+ input_file = gr.File(label="Upload a PDF file", scale=1)
28
+ output_text = gr.Textbox(label="OCR output", scale=2)
29
+
30
+ # Buttons
31
+ with gr.Row():
32
+ ocr_btn = gr.Button(value="OCR", variant="primary")
33
+ clear_btn = gr.Button("Clear", variant="secondary")
34
+
35
+ # Examples
36
+ with gr.Accordion("Examples", open=False):
37
+ examples = gr.Examples(
38
+ [
39
+ ['./scanned_doc.pdf',],
40
+ ['./passport_jp.png',]
41
+ ],
42
+ inputs=[input_file,],
43
+ outputs=[output_text,],
44
+ fn=process,
45
+ cache_examples=False,
46
+ label="Examples"
47
+ )
48
+
49
+ # Functions
50
+ ocr_btn.click(
51
+ fn=process,
52
+ inputs=[input_file,],
53
+ outputs=[output_text,]
54
+ )
55
+ clear_btn.click(
56
+ fn=lambda : (None, ''),
57
+ inputs=[],
58
+ outputs=[input_file, output_text] # input_file, output_text
59
+ )
60
+
61
+ if __name__ == '__main__':
62
+ demo.launch()
ocr.py ADDED
@@ -0,0 +1,407 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ File: ocr.py
3
+ Description: Optical Character Recognition (OCR) using software 2.0 models
4
+ Author: Didier Guillevic
5
+ Date: 2025-04-06
6
+ """
7
+
8
+ import os
9
+ import magic
10
+ import vlm
11
+
12
+ import uuid
13
+ import shutil
14
+ import threading
15
+ import time
16
+ import pathlib
17
+
18
+ import pdf2image
19
+ from pdf2image.exceptions import PDFPageCountError, PDFSyntaxError
20
+ import pypdf
21
+ import base64
22
+ from contextlib import contextmanager
23
+ from typing import List, Optional, Tuple, Union
24
+
25
+ import logging
26
+
27
+ class PDFScannerTempManager:
28
+ """
29
+ Manages temporary directory creation and cleanup for PDF scanning operations.
30
+ """
31
+
32
+ def __init__(self, base_temp_dir: str = 'tmp'):
33
+ """
34
+ Initialize temporary directory manager.
35
+
36
+ Args:
37
+ base_temp_dir (str): Base directory for temporary files
38
+ """
39
+ self.base_temp_dir = base_temp_dir
40
+ self.active_temp_dirs: list[str] = []
41
+
42
+ # Ensure base temporary directory exists
43
+ os.makedirs(base_temp_dir, exist_ok=True)
44
+
45
+ # Set up logging
46
+ logging.basicConfig(level=logging.INFO)
47
+ self.logger = logging.getLogger(__name__)
48
+
49
+ @contextmanager
50
+ def temp_directory(self) -> str:
51
+ """
52
+ Create a temporary directory with UUID and manage its lifecycle.
53
+
54
+ Yields:
55
+ str: Path to the temporary directory
56
+ """
57
+ # Generate unique directory name
58
+ dir_uuid = str(uuid.uuid4())
59
+ temp_dir = os.path.join(self.base_temp_dir, dir_uuid)
60
+
61
+ try:
62
+ # Create directory
63
+ os.makedirs(temp_dir, exist_ok=False)
64
+ self.active_temp_dirs.append(temp_dir)
65
+
66
+ # Yield directory path
67
+ yield temp_dir
68
+
69
+ finally:
70
+ # Remove directory and its contents
71
+ self._cleanup_directory(temp_dir)
72
+
73
+ def _cleanup_directory(self, directory: str) -> None:
74
+ """
75
+ Safely remove a temporary directory.
76
+
77
+ Args:
78
+ directory (str): Path to directory to remove
79
+ """
80
+ try:
81
+ if os.path.exists(directory):
82
+ shutil.rmtree(directory)
83
+
84
+ # Remove from active directories
85
+ if directory in self.active_temp_dirs:
86
+ self.active_temp_dirs.remove(directory)
87
+
88
+ except Exception as e:
89
+ self.logger.error(f"Error cleaning up directory {directory}: {e}")
90
+
91
+ def cleanup_all(self) -> None:
92
+ """
93
+ Clean up all temporary directories created during the session.
94
+ """
95
+ for directory in list(self.active_temp_dirs):
96
+ self._cleanup_directory(directory)
97
+
98
+
99
+ class PDFScanner:
100
+ """
101
+ A class to perform OCR on PDF files with robust temp management.
102
+ """
103
+
104
+ def __init__(self,
105
+ dpi: int = 300,
106
+ temp_manager: Optional[PDFScannerTempManager] = None
107
+ ):
108
+ """
109
+ Initialize the PDFScanner.
110
+
111
+ Args:
112
+ dpi (int): DPI for PDF conversion
113
+ temp_manager (PDFScannerTempManager, optional): Temp directory manager
114
+ """
115
+ self.dpi = dpi
116
+ self.temp_manager = temp_manager or PDFScannerTempManager()
117
+ self.logger = logging.getLogger(__name__)
118
+
119
+ def _validate_pdf(self, pdf_path: str) -> Tuple[bool, str, bool]:
120
+ """
121
+ Validate PDF file and check for encryption.
122
+
123
+ Returns:
124
+ Tuple[bool, str, bool]: (is_valid, error_message, is_encrypted)
125
+ """
126
+ try:
127
+ with open(pdf_path, 'rb') as file:
128
+ # Check if file starts with PDF signature
129
+ if not file.read(4) == b'%PDF':
130
+ return False, "Not a valid PDF file (missing PDF signature)", False
131
+
132
+ # Reset file pointer
133
+ file.seek(0)
134
+
135
+ try:
136
+ pdf_reader = pypdf.PdfReader(file, strict=False)
137
+ is_encrypted = pdf_reader.is_encrypted
138
+
139
+ if is_encrypted:
140
+ return False, "PDF is encrypted and requires password", True
141
+
142
+ num_pages = len(pdf_reader.pages)
143
+ return True, f"Valid PDF with {num_pages} pages", False
144
+
145
+ except pypdf.errors.PdfReadError as e:
146
+ return False, f"Invalid PDF structure: {str(e)}", False
147
+
148
+ except Exception as e:
149
+ return False, f"Error validating PDF: {str(e)}", False
150
+
151
+ def _repair_pdf(self, pdf_path: str, temp_dir: str) -> str:
152
+ """
153
+ Attempt to repair a corrupted PDF file.
154
+
155
+ Args:
156
+ pdf_path (str): Path to original PDF
157
+ temp_dir (str): Temporary directory for repair
158
+
159
+ Returns:
160
+ str: Path to repaired PDF
161
+ """
162
+ repaired_pdf = os.path.join(temp_dir, 'repaired.pdf')
163
+
164
+ try:
165
+ # pypdf repair attempt
166
+ with open(pdf_path, 'rb') as file:
167
+ reader = pypdf.PdfReader(file, strict=False)
168
+ writer = pypdf.PdfWriter()
169
+
170
+ for page in reader.pages:
171
+ writer.add_page(page)
172
+
173
+ with open(repaired_pdf, 'wb') as output_file:
174
+ writer.write(output_file)
175
+
176
+ if os.path.exists(repaired_pdf):
177
+ return repaired_pdf
178
+
179
+ except Exception as e:
180
+ self.logger.warning(f"pypdf repair failed: {str(e)}")
181
+
182
+ # Ghostscript repair attempt
183
+ try:
184
+ gs_command = [
185
+ 'gs',
186
+ '-o', repaired_pdf,
187
+ '-sDEVICE=pdfwrite',
188
+ '-dPDFSETTINGS=/prepress',
189
+ pdf_path
190
+ ]
191
+
192
+ process = subprocess.run(
193
+ gs_command,
194
+ capture_output=True,
195
+ text=True
196
+ )
197
+
198
+ if process.returncode == 0 and os.path.exists(repaired_pdf):
199
+ return repaired_pdf
200
+ else:
201
+ raise Exception(f"Ghostscript repair failed: {process.stderr}")
202
+
203
+ except Exception as e:
204
+ self.logger.error(f"PDF repair failed: {str(e)}")
205
+ raise
206
+
207
+ def _process_images(
208
+ self,
209
+ images: list,
210
+ temp_dir: str,
211
+ language: str
212
+ ) -> list[str]:
213
+ """Helper method to process converted images."""
214
+ extracted_text = []
215
+
216
+ for i, image in enumerate(images):
217
+ image_path = os.path.join(temp_dir, f'page_{i+1}.png')
218
+ try:
219
+ # Save with higher quality
220
+ image.save(image_path, 'PNG', quality=100)
221
+
222
+ # Perform OCR
223
+ text = process_image_file(image_path)
224
+ extracted_text.append(text)
225
+
226
+ except Exception as e:
227
+ self.logger.error(f"Error processing page {i+1}: {str(e)}")
228
+ extracted_text.append(f"[ERROR ON PAGE {i+1}]")
229
+
230
+ return extracted_text
231
+
232
+ def pdf_to_text(
233
+ self,
234
+ pdf_path: str,
235
+ language: str = 'eng',
236
+ first_page: Optional[int] = None,
237
+ last_page: Optional[int] = None,
238
+ attempt_repair: bool = True
239
+ ) -> list[str]:
240
+ """
241
+ Convert a PDF file to text using OCR with robust error handling.
242
+
243
+ Args:
244
+ pdf_path (str): Path to the PDF file
245
+ language (str): Language for OCR (default: 'eng')
246
+ first_page (int, optional): First page to process (1-based)
247
+ last_page (int, optional): Last page to process
248
+ attempt_repair (bool): Whether to attempt repairing corrupted PDFs
249
+
250
+ Returns:
251
+ list[str]: List of extracted text for each page
252
+ """
253
+ if not os.path.exists(pdf_path):
254
+ raise FileNotFoundError(f"PDF file not found: {pdf_path}")
255
+
256
+ # Use context manager for automatic cleanup
257
+ with self.temp_manager.temp_directory() as temp_dir:
258
+ # Validate PDF
259
+ is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path)
260
+ if not is_valid:
261
+ self.logger.warning(f"PDF validation issue: {error_message}")
262
+
263
+ if is_encrypted:
264
+ raise Exception("Cannot process encrypted PDF files")
265
+
266
+ if attempt_repair:
267
+ try:
268
+ pdf_path = self._repair_pdf(pdf_path, temp_dir)
269
+ self.logger.info("Using repaired PDF file")
270
+ except Exception as e:
271
+ self.logger.error(f"Repair failed: {str(e)}")
272
+
273
+ # Conversion methods with increasing complexity
274
+ conversion_methods = [
275
+ {'use_pdftocairo': True, 'strict': False},
276
+ {'use_pdftocairo': False, 'strict': False},
277
+ {'use_pdftocairo': True, 'strict': False, 'dpi': self.dpi * 2},
278
+ {'use_pdftocairo': False, 'strict': False, 'dpi': self.dpi * 3}
279
+ ]
280
+
281
+ last_error = None
282
+ for method in conversion_methods:
283
+ try:
284
+ self.logger.info(f"Trying conversion method: {method}")
285
+ images = pdf2image.convert_from_path(
286
+ pdf_path,
287
+ dpi=method.get('dpi', self.dpi),
288
+ first_page=first_page,
289
+ last_page=last_page,
290
+ thread_count=4,
291
+ grayscale=True,
292
+ **{k: v for k, v in method.items() if k != 'dpi'}
293
+ )
294
+
295
+ if images:
296
+ return self._process_images(images, temp_dir, language)
297
+
298
+ except Exception as e:
299
+ last_error = e
300
+ self.logger.warning(f"Method failed: {str(e)}")
301
+ continue
302
+
303
+ if last_error:
304
+ raise Exception(f"All conversion methods failed. Last error: {str(last_error)}")
305
+
306
+ #
307
+ # PDFScanner (singleton)
308
+ #
309
+ pdf_scanner = PDFScanner()
310
+
311
+
312
+ #
313
+ # Process one file
314
+ #
315
+ def process_file(input_file: str):
316
+ """Process given file with OCR"
317
+ """
318
+ file_type = get_file_type(input_file)
319
+
320
+ if file_type == "Image":
321
+ return process_image_file(input_file)
322
+ elif file_type == "PDF":
323
+ return process_pdf_file(input_file)
324
+ else:
325
+ return "Unsupported file type. Please upload a PDF, or an image file."
326
+
327
+
328
+ def process_image_file(input_file: str):
329
+ """Process image file with OCR
330
+ """
331
+ messages = [
332
+ {
333
+ "role": "user",
334
+ "content": [
335
+ {
336
+ "type": "text",
337
+ "text": (
338
+ #"Could you extract the information present in the image. "
339
+ #"No need to repeat the task description. Simply respond."
340
+ "Could you perform optical characer recognition (OCR) on the image? "
341
+ "Simply return the text without any additional comments. "
342
+ "The exception would be if the image represents an ID card. "
343
+ "In such a case, please return the information in a structured format. "
344
+ )
345
+ },
346
+ {
347
+ "type": "image_url",
348
+ "image_url": f"data:image/jpeg;base64,{encode_image(input_file)}"
349
+ }
350
+ ]
351
+ }
352
+ ]
353
+ return vlm.get_response(messages)
354
+
355
+
356
+ def process_pdf_file(input_file: str):
357
+ """Process PDF file with OCR
358
+
359
+ Args:
360
+ input_file: the PDF file to process with OCR
361
+
362
+ Returns:
363
+ the text OCR result
364
+
365
+ Note:
366
+ Each page of the PDF is processed as an image.
367
+ """
368
+ texts = pdf_scanner.pdf_to_text(pdf_path=input_file.name)
369
+ output_text = '\n\n'.join(texts)
370
+ return output_text
371
+
372
+
373
+ #
374
+ # Get file type: PDF or Image or something else
375
+ #
376
+ def get_file_type(file_path):
377
+ # Check file extension
378
+ file_extension = os.path.splitext(file_path)[1].lower()
379
+
380
+ # Check MIME type
381
+ mime = magic.Magic(mime=True)
382
+ mime_type = mime.from_file(file_path)
383
+
384
+ # Determine file type
385
+ if file_extension == '.pdf' or mime_type == 'application/pdf':
386
+ return 'PDF'
387
+ elif file_extension in ['.jpg', '.jpeg', '.png', '.gif'] or mime_type.startswith('image/'):
388
+ return 'Image'
389
+ elif file_extension == '.pptx' or mime_type == 'application/vnd.openxmlformats-officedocument.presentationml.presentation':
390
+ return 'PowerPoint'
391
+ else:
392
+ return 'Other'
393
+
394
+ #
395
+ # Encode images as base64
396
+ #
397
+ def encode_image(image_path):
398
+ """Encode the image to base64."""
399
+ try:
400
+ with open(image_path, "rb") as image_file:
401
+ return base64.b64encode(image_file.read()).decode('utf-8')
402
+ except FileNotFoundError:
403
+ print(f"Error: The file {image_path} was not found.")
404
+ return None
405
+ except Exception as e: # Added general exception handling
406
+ print(f"Error: {e}")
407
+ return None