File size: 11,916 Bytes
3500b1e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 |
"""
CPU에 최적화된 문서 처리 모듈 - 병렬 처리 적용
"""
import os
import time
from typing import List, Dict, Any, Optional
from langchain.schema import Document
from concurrent.futures import ThreadPoolExecutor
# 멀티프로세싱 가져오기
import multiprocessing
try:
CPU_COUNT = multiprocessing.cpu_count()
except:
CPU_COUNT = 4
print(f"CPU 코어 수: {CPU_COUNT}")
# docling 라이브러리 존재 여부 확인
try:
from docling.datamodel.base_models import InputFormat
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode
from docling.chunking import HybridChunker
DOCLING_AVAILABLE = True
print("docling 라이브러리 사용 가능")
except ImportError:
print("docling 라이브러리를 찾을 수 없습니다. PyPDFLoader만 사용합니다.")
DOCLING_AVAILABLE = False
# LangChain 문서 로더
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
class OptimizedDocumentProcessor:
"""
CPU에 최적화된 병렬 처리 문서 처리 클래스
"""
def __init__(self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
tokenizer: str = "Alibaba-NLP/gte-multilingual-base", # 올바른 모델 경로로 수정
max_workers: int = CPU_COUNT):
"""
문서 처리기 초기화
Args:
chunk_size: 텍스트 청크 크기
chunk_overlap: 청크 간 겹침 크기
tokenizer: HybridChunker에서 사용할 토크나이저
max_workers: 병렬 처리시 최대 작업자 수
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.tokenizer = tokenizer
self.max_workers = max(1, min(max_workers, CPU_COUNT)) # CPU 코어 수 초과하지 않도록
print(f"병렬 처리 작업자 수: {self.max_workers}")
# LangChain 텍스트 스플리터
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""],
)
# docling 관련 컴포넌트 초기화
if DOCLING_AVAILABLE:
# 파이프라인 옵션 설정
self.pipeline_options = PdfPipelineOptions(do_table_structure=True)
self.pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE
# 문서 변환기 초기화
self.doc_converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=self.pipeline_options)
}
)
# HybridChunker 초기화 (trust_remote_code=True 추가)
self.hybrid_chunker = HybridChunker(
tokenizer=tokenizer,
chunk_size=chunk_size,
overlap=chunk_overlap,
tokenizer_kwargs={"trust_remote_code": True} # 원격 코드 실행 허용
)
print(f"docling 초기화 완료: HybridChunker(청크 크기={chunk_size}, 오버랩={chunk_overlap})")
def process_with_docling(self, pdf_path: str) -> Dict[str, Any]:
"""
docling을 사용하여 PDF 문서 처리
Args:
pdf_path: PDF 파일 경로
Returns:
처리된 문서 데이터
"""
if not DOCLING_AVAILABLE:
raise ImportError("docling 라이브러리가 설치되지 않았습니다.")
try:
start_time = time.time()
# 문서 변환
conv_res = self.doc_converter.convert(pdf_path)
doc = conv_res.document
# 성능 측정
conversion_time = time.time() - start_time
print(f"PDF 변환 시간: {conversion_time:.2f}초")
# 메타데이터 추출
metadata = {
"source": pdf_path,
"title": os.path.basename(pdf_path),
"processing_time": conversion_time
}
return {
"content": doc.export_to_markdown(),
"metadata": metadata,
"raw_document": doc,
}
except Exception as e:
print(f"docling으로 문서 처리 중 오류 발생: {e}")
raise
def chunk_with_hybrid_chunker(self, doc: Any) -> List[Dict[str, Any]]:
"""
HybridChunker를 사용하여 문서를 청크로 분할
Args:
doc: docling 문서 객체
Returns:
청크 리스트
"""
start_time = time.time()
# 청킹 수행
chunk_iter = self.hybrid_chunker.chunk(doc)
chunks = list(chunk_iter)
chunking_time = time.time() - start_time
print(f"청킹 시간: {chunking_time:.2f}초 (청크 수: {len(chunks)})")
return chunks
def create_langchain_documents_from_chunks(self,
chunks: List[Dict[str, Any]],
metadata: Dict[str, Any]) -> List[Document]:
"""
docling 청크를 LangChain Document 객체로 변환
Args:
chunks: docling HybridChunker로 생성한 청크 리스트
metadata: 문서 메타데이터
Returns:
LangChain Document 객체 리스트
"""
documents = []
for i, chunk in enumerate(chunks):
# 각 청크에 대한 메타데이터
chunk_metadata = metadata.copy()
chunk_metadata["chunk_id"] = i
# 청크 내용 추출
if hasattr(chunk, "text"):
content = chunk.text
elif hasattr(chunk, "content"):
content = chunk.content
else:
content = str(chunk)
document = Document(
page_content=content,
metadata=chunk_metadata
)
documents.append(document)
return documents
def process_with_langchain(self, pdf_path: str) -> List[Document]:
"""
LangChain의 PyPDFLoader를 사용하여 PDF 문서 로드
Args:
pdf_path: PDF 파일 경로
Returns:
LangChain Document 객체 리스트
"""
start_time = time.time()
try:
loader = PyPDFLoader(pdf_path)
documents = loader.load()
processing_time = time.time() - start_time
print(f"PyPDFLoader 처리 시간: {processing_time:.2f}초")
return documents
except Exception as e:
print(f"PyPDFLoader로 문서 처리 중 오류 발생: {e}")
raise
def process_pdf(self, pdf_path: str, use_docling: bool = True) -> List[Document]:
"""
PDF 파일 처리
Args:
pdf_path: PDF 파일 경로
use_docling: docling 사용 여부
Returns:
처리된 문서의 청크 리스트
"""
total_start_time = time.time()
# docling 사용 가능 여부 확인
can_use_docling = use_docling and DOCLING_AVAILABLE
if can_use_docling:
try:
# 1. docling으로 PDF 처리
docling_result = self.process_with_docling(pdf_path)
doc = docling_result["raw_document"]
metadata = docling_result["metadata"]
# 2. HybridChunker로 청크 생성
chunks = self.chunk_with_hybrid_chunker(doc)
# 3. 청크를 LangChain Document로 변환
documents = self.create_langchain_documents_from_chunks(chunks, metadata)
total_time = time.time() - total_start_time
print(f"docling 처리 완료: '{pdf_path}', {len(documents)} 청크, 총 {total_time:.2f}초")
return documents
except Exception as e:
print(f"docling 처리 실패, PyPDFLoader로 대체: {e}")
can_use_docling = False
if not can_use_docling:
# PyPDFLoader로 처리 (대체 방안)
documents = self.process_with_langchain(pdf_path)
chunks = self.text_splitter.split_documents(documents)
total_time = time.time() - total_start_time
print(f"PyPDFLoader 처리 완료: '{pdf_path}', {len(chunks)} 청크, 총 {total_time:.2f}초")
return chunks
def process_directory_parallel(self, directory: str, use_docling: bool = True) -> List[Document]:
"""
디렉토리 내 모든 PDF 파일 병렬 처리 (멀티스레딩)
Args:
directory: PDF 파일 디렉토리 경로
use_docling: docling 사용 여부
Returns:
처리된 모든 문서의 청크 리스트
"""
all_documents = []
pdf_files = []
# PDF 파일 목록 수집
for file in os.listdir(directory):
if file.endswith(".pdf"):
pdf_path = os.path.join(directory, file)
pdf_files.append(pdf_path)
if not pdf_files:
print(f"'{directory}' 디렉토리에 PDF 파일이 없습니다.")
return []
print(f"총 {len(pdf_files)}개 PDF 파일 병렬 처리 시작 (최대 {self.max_workers} 작업자)")
start_time = time.time()
# 병렬 처리 실행
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 각 PDF 파일에 대해 process_pdf 함수 병렬 실행
future_to_pdf = {executor.submit(self.process_pdf, pdf_path, use_docling): pdf_path
for pdf_path in pdf_files}
# 결과 수집
for future in future_to_pdf:
pdf_path = future_to_pdf[future]
try:
# 결과 가져오기
chunks = future.result()
all_documents.extend(chunks)
print(f"'{os.path.basename(pdf_path)}' 처리 완료: {len(chunks)} 청크")
except Exception as e:
print(f"'{pdf_path}' 처리 중 오류 발생: {e}")
total_time = time.time() - start_time
print(f"병렬 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초")
return all_documents
def process_directory(self, directory: str, use_docling: bool = True, parallel: bool = True) -> List[Document]:
"""
디렉토리 내 모든 PDF 파일 처리
Args:
directory: PDF 파일 디렉토리 경로
use_docling: docling 사용 여부
parallel: 병렬 처리 사용 여부
Returns:
처리된 모든 문서의 청크 리스트
"""
# 병렬 처리 사용
if parallel:
return self.process_directory_parallel(directory, use_docling)
# 순차 처리
all_documents = []
start_time = time.time()
for file in os.listdir(directory):
if file.endswith(".pdf"):
pdf_path = os.path.join(directory, file)
print(f"처리 중: {pdf_path}")
try:
chunks = self.process_pdf(pdf_path, use_docling=use_docling)
all_documents.extend(chunks)
except Exception as e:
print(f"'{pdf_path}' 처리 중 오류 발생: {e}")
total_time = time.time() - start_time
print(f"순차 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초")
return all_documents |