Spaces:
Sleeping
Sleeping
""" | |
معالج ملفات PDF | |
""" | |
import os | |
import io | |
import re | |
import PyPDF2 | |
import fitz # PyMuPDF | |
import pdfplumber | |
import numpy as np | |
from PIL import Image | |
import pytesseract | |
import pandas as pd | |
import traceback | |
from utils.helpers import create_directory_if_not_exists, extract_numbers_from_text | |
def extract_text_from_pdf(file_path, method='pymupdf'): | |
""" | |
استخراج النص من ملف PDF | |
المعلمات: | |
file_path: مسار ملف PDF | |
method: طريقة الاستخراج ('pymupdf', 'pypdf2', 'pdfplumber') | |
الإرجاع: | |
نص مستخرج من ملف PDF | |
""" | |
try: | |
# التحقق من وجود الملف | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
# استخراج النص حسب الطريقة المطلوبة | |
if method.lower() == 'pymupdf': | |
return _extract_text_with_pymupdf(file_path) | |
elif method.lower() == 'pypdf2': | |
return _extract_text_with_pypdf2(file_path) | |
elif method.lower() == 'pdfplumber': | |
return _extract_text_with_pdfplumber(file_path) | |
else: | |
# استخدام PyMuPDF كطريقة افتراضية | |
return _extract_text_with_pymupdf(file_path) | |
except Exception as e: | |
error_msg = f"خطأ في استخراج النص من ملف PDF: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
raise Exception(error_msg) | |
def _extract_text_with_pymupdf(file_path): | |
"""استخراج النص باستخدام PyMuPDF""" | |
document = fitz.open(file_path) | |
text = "" | |
for page_number in range(len(document)): | |
page = document.load_page(page_number) | |
text += page.get_text("text") + "\n\n" | |
document.close() | |
return text | |
def _extract_text_with_pypdf2(file_path): | |
"""استخراج النص باستخدام PyPDF2""" | |
with open(file_path, 'rb') as file: | |
reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page_number in range(len(reader.pages)): | |
page = reader.pages[page_number] | |
text += page.extract_text() + "\n\n" | |
return text | |
def _extract_text_with_pdfplumber(file_path): | |
"""استخراج النص باستخدام pdfplumber""" | |
with pdfplumber.open(file_path) as pdf: | |
text = "" | |
for page in pdf.pages: | |
text += page.extract_text() + "\n\n" | |
return text | |
def extract_tables_from_pdf(file_path, page_numbers=None): | |
""" | |
استخراج الجداول من ملف PDF | |
المعلمات: | |
file_path: مسار ملف PDF | |
page_numbers: قائمة بأرقام الصفحات للاستخراج منها (افتراضي: None لجميع الصفحات) | |
الإرجاع: | |
قائمة من DataFrames تمثل الجداول المستخرجة | |
""" | |
try: | |
# التحقق من وجود الملف | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
# استخراج الجداول باستخدام pdfplumber | |
tables = [] | |
with pdfplumber.open(file_path) as pdf: | |
# تحديد الصفحات المراد استخراج الجداول منها | |
if page_numbers is None: | |
pages_to_extract = range(len(pdf.pages)) | |
else: | |
pages_to_extract = [p-1 for p in page_numbers if 1 <= p <= len(pdf.pages)] | |
# استخراج الجداول من كل صفحة | |
for page_idx in pages_to_extract: | |
page = pdf.pages[page_idx] | |
page_tables = page.extract_tables() | |
if page_tables: | |
for table in page_tables: | |
if table: # التحقق من أن الجدول ليس فارغًا | |
# تحويل الجدول إلى DataFrame | |
df = pd.DataFrame(table[1:], columns=table[0]) | |
# تنظيف البيانات | |
df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x) | |
# إضافة إلى قائمة الجداول | |
tables.append(df) | |
return tables | |
except Exception as e: | |
error_msg = f"خطأ في استخراج الجداول من ملف PDF: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
raise Exception(error_msg) | |
def extract_images_from_pdf(file_path, output_dir=None, prefix='image'): | |
""" | |
استخراج الصور من ملف PDF | |
المعلمات: | |
file_path: مسار ملف PDF | |
output_dir: دليل الإخراج (افتراضي: None للإرجاع كقائمة من الصور) | |
prefix: بادئة أسماء ملفات الصور | |
الإرجاع: | |
قائمة من مسارات الصور المستخرجة إذا تم تحديد دليل الإخراج، وإلا قائمة من الصور | |
""" | |
try: | |
# التحقق من وجود الملف | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
# إنشاء دليل الإخراج إذا تم تحديده | |
if output_dir: | |
create_directory_if_not_exists(output_dir) | |
# استخراج الصور باستخدام PyMuPDF | |
document = fitz.open(file_path) | |
images = [] | |
image_paths = [] | |
for page_idx in range(len(document)): | |
page = document.load_page(page_idx) | |
# استخراج الصور من الصفحة | |
image_list = page.get_images(full=True) | |
for img_idx, img_info in enumerate(image_list): | |
xref = img_info[0] | |
base_image = document.extract_image(xref) | |
image_bytes = base_image["image"] | |
# إنشاء كائن الصورة | |
image = Image.open(io.BytesIO(image_bytes)) | |
if output_dir: | |
# حفظ الصورة في الدليل المحدد | |
image_filename = f"{prefix}_{page_idx+1}_{img_idx+1}.{base_image['ext']}" | |
image_path = os.path.join(output_dir, image_filename) | |
image.save(image_path) | |
image_paths.append(image_path) | |
else: | |
# إضافة الصورة إلى القائمة | |
images.append(image) | |
document.close() | |
return image_paths if output_dir else images | |
except Exception as e: | |
error_msg = f"خطأ في استخراج الصور من ملف PDF: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
raise Exception(error_msg) | |
def extract_text_from_image(image, lang='ara+eng'): | |
""" | |
استخراج النص من صورة باستخدام OCR | |
المعلمات: | |
image: كائن الصورة أو مسار الصورة | |
lang: لغة النص (افتراضي: 'ara+eng' للعربية والإنجليزية) | |
الإرجاع: | |
النص المستخرج من الصورة | |
""" | |
try: | |
# إذا كان مسار صورة، قم بفتحها | |
if isinstance(image, str): | |
image = Image.open(image) | |
# استخراج النص باستخدام pytesseract | |
text = pytesseract.image_to_string(image, lang=lang) | |
return text | |
except Exception as e: | |
error_msg = f"خطأ في استخراج النص من الصورة: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
return "" | |
def ocr_pdf(file_path, lang='ara+eng'): | |
""" | |
تنفيذ OCR على ملف PDF | |
المعلمات: | |
file_path: مسار ملف PDF | |
lang: لغة النص (افتراضي: 'ara+eng' للعربية والإنجليزية) | |
الإرجاع: | |
النص المستخرج من ملف PDF | |
""" | |
try: | |
# التحقق من وجود الملف | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
# فتح ملف PDF | |
document = fitz.open(file_path) | |
text = "" | |
for page_idx in range(len(document)): | |
page = document.load_page(page_idx) | |
# تحويل الصفحة إلى صورة | |
pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72)) | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
# استخراج النص من الصورة | |
page_text = extract_text_from_image(img, lang=lang) | |
text += page_text + "\n\n" | |
document.close() | |
return text | |
except Exception as e: | |
error_msg = f"خطأ في تنفيذ OCR على ملف PDF: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
raise Exception(error_msg) | |
def search_in_pdf(file_path, search_text): | |
""" | |
البحث عن نص في ملف PDF | |
المعلمات: | |
file_path: مسار ملف PDF | |
search_text: النص المراد البحث عنه | |
الإرجاع: | |
قائمة من النتائج {page_number, text_snippet, matched_text} | |
""" | |
try: | |
# التحقق من وجود الملف | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
# استخراج النص من ملف PDF | |
document = fitz.open(file_path) | |
results = [] | |
for page_idx in range(len(document)): | |
page = document.load_page(page_idx) | |
page_text = page.get_text("text") | |
# البحث عن النص | |
if search_text.lower() in page_text.lower(): | |
# استخراج المقتطفات التي تحتوي على النص المطلوب | |
lines = page_text.split('\n') | |
for line in lines: | |
if search_text.lower() in line.lower(): | |
results.append({ | |
'page_number': page_idx + 1, | |
'text_snippet': line, | |
'matched_text': search_text | |
}) | |
document.close() | |
return results | |
except Exception as e: | |
error_msg = f"خطأ في البحث في ملف PDF: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
raise Exception(error_msg) | |
def extract_quantities_from_pdf(file_path): | |
""" | |
استخراج الكميات من ملف PDF | |
المعلمات: | |
file_path: مسار ملف PDF | |
الإرجاع: | |
DataFrame يحتوي على البنود والكميات المستخرجة | |
""" | |
try: | |
# استخراج النص والجداول | |
text = extract_text_from_pdf(file_path) | |
tables = extract_tables_from_pdf(file_path) | |
quantities = [] | |
# استخراج الكميات من الجداول | |
for table in tables: | |
# البحث عن أعمدة تحتوي على "الكمية" أو "الوحدة" أو "البند" | |
quantity_cols = [col for col in table.columns if any(term in col.lower() for term in ['كمية', 'عدد', 'الكمية'])] | |
unit_cols = [col for col in table.columns if any(term in col.lower() for term in ['وحدة', 'الوحدة'])] | |
item_cols = [col for col in table.columns if any(term in col.lower() for term in ['بند', 'وصف', 'البند', 'العمل'])] | |
if quantity_cols and (unit_cols or item_cols): | |
quantity_col = quantity_cols[0] | |
unit_col = unit_cols[0] if unit_cols else None | |
item_col = item_cols[0] if item_cols else None | |
# استخراج الكميات | |
for _, row in table.iterrows(): | |
if pd.notna(row[quantity_col]) and (item_col is None or pd.notna(row[item_col])): | |
quantity_value = extract_numbers_from_text(row[quantity_col]) | |
quantity = quantity_value[0] if quantity_value else None | |
quantities.append({ | |
'البند': row[item_col] if item_col else "غير محدد", | |
'الوحدة': row[unit_col] if unit_col else "غير محدد", | |
'الكمية': quantity | |
}) | |
# استخراج الكميات من النص | |
lines = text.split('\n') | |
for line in lines: | |
# البحث عن الخطوط التي تحتوي على أرقام ووحدات قياس | |
if re.search(r'\d+(?:,\d+)*(?:\.\d+)?', line) and any(unit in line for unit in ['م2', 'م3', 'متر', 'طن', 'كجم', 'عدد']): | |
numbers = extract_numbers_from_text(line) | |
if numbers: | |
# استخراج وحدة القياس | |
unit_match = re.search(r'\b(م2|م3|متر مربع|متر مكعب|م\.ط|طن|كجم|عدد|قطعة)\b', line) | |
unit = unit_match.group(1) if unit_match else "غير محدد" | |
quantities.append({ | |
'البند': line, | |
'الوحدة': unit, | |
'الكمية': numbers[0] | |
}) | |
# إنشاء DataFrame | |
if quantities: | |
quantities_df = pd.DataFrame(quantities) | |
return quantities_df | |
else: | |
return pd.DataFrame(columns=['البند', 'الوحدة', 'الكمية']) | |
except Exception as e: | |
error_msg = f"خطأ في استخراج الكميات من ملف PDF: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
raise Exception(error_msg) | |
def merge_pdfs(input_paths, output_path): | |
""" | |
دمج ملفات PDF متعددة في ملف واحد | |
المعلمات: | |
input_paths: قائمة من مسارات ملفات PDF المراد دمجها | |
output_path: مسار ملف PDF الناتج | |
الإرجاع: | |
True في حالة النجاح | |
""" | |
try: | |
# التحقق من وجود الملفات | |
for file_path in input_paths: | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"الملف غير موجود: {file_path}") | |
# إنشاء مجلد الإخراج إذا لم يكن موجودًا | |
output_dir = os.path.dirname(output_path) | |
create_directory_if_not_exists(output_dir) | |
# دمج ملفات PDF | |
merger = PyPDF2.PdfMerger() | |
for file_path in input_paths: | |
merger.append(file_path) | |
merger.write(output_path) | |
merger.close() | |
return True | |
except Exception as e: | |
error_msg = f"خطأ في دمج ملفات PDF: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
raise Exception(error_msg) | |
def split_pdf(input_path, output_dir, prefix='page'): | |
""" | |
تقسيم ملف PDF إلى ملفات منفصلة لكل صفحة | |
المعلمات: | |
input_path: مسار ملف PDF المراد تقسيمه | |
output_dir: دليل الإخراج | |
prefix: بادئة أسماء ملفات الإخراج | |
الإرجاع: | |
قائمة من مسارات ملفات PDF الناتجة | |
""" | |
try: | |
# التحقق من وجود الملف | |
if not os.path.exists(input_path): | |
raise FileNotFoundError(f"الملف غير موجود: {input_path}") | |
# إنشاء دليل الإخراج | |
create_directory_if_not_exists(output_dir) | |
# قراءة ملف PDF | |
with open(input_path, 'rb') as file: | |
reader = PyPDF2.PdfReader(file) | |
output_files = [] | |
# تقسيم كل صفحة إلى ملف منفصل | |
for page_idx in range(len(reader.pages)): | |
writer = PyPDF2.PdfWriter() | |
writer.add_page(reader.pages[page_idx]) | |
output_filename = f"{prefix}_{page_idx+1}.pdf" | |
output_path = os.path.join(output_dir, output_filename) | |
with open(output_path, 'wb') as output_file: | |
writer.write(output_file) | |
output_files.append(output_path) | |
return output_files | |
except Exception as e: | |
error_msg = f"خطأ في تقسيم ملف PDF: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
raise Exception(error_msg) |