""" 디버깅을 위한 코드 추가 - 경로 관련 문제 해결 """ import os import time import hashlib import pickle import json import logging import glob from typing import List, Dict, Tuple, Any, Optional from logging.handlers import RotatingFileHandler from pathlib import Path from langchain.schema import Document from config import ( PDF_DIRECTORY, CACHE_DIRECTORY, CHUNK_SIZE, CHUNK_OVERLAP, LLM_MODEL, LOG_LEVEL, LOG_FILE, print_config, validate_config ) from optimized_document_processor import OptimizedDocumentProcessor from vector_store import VectorStore import sys print("===== Script starting =====") sys.stdout.flush() # 즉시 출력 강제 # 주요 함수/메서드 호출 전후에도 디버깅 출력 추가 print("Loading config...") sys.stdout.flush() # from config import ... 등의 코드 print("Config loaded!") sys.stdout.flush() # 로깅 설정 개선 def setup_logging(): """애플리케이션 로깅 설정""" # 로그 레벨 설정 log_level = getattr(logging, LOG_LEVEL.upper(), logging.INFO) # 로그 포맷 설정 log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' formatter = logging.Formatter(log_format) # 루트 로거 설정 root_logger = logging.getLogger() root_logger.setLevel(log_level) # 핸들러 초기화 # 콘솔 핸들러 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) # 파일 핸들러 (회전식) try: file_handler = RotatingFileHandler( LOG_FILE, maxBytes=10 * 1024 * 1024, # 10 MB backupCount=5 ) file_handler.setFormatter(formatter) root_logger.addHandler(file_handler) except Exception as e: console_handler.warning(f"로그 파일 설정 실패: {e}, 콘솔 로깅만 사용합니다.") return logging.getLogger("AutoRAG") # 로거 설정 logger = setup_logging() # 현재 작업 디렉토리 확인을 위한 디버깅 코드 current_dir = os.getcwd() logger.info(f"현재 작업 디렉토리: {current_dir}") # 설정된 PDF 디렉토리 확인 abs_pdf_dir = os.path.abspath(PDF_DIRECTORY) logger.info(f"설정된 PDF 디렉토리: {PDF_DIRECTORY}") logger.info(f"절대 경로로 변환된 PDF 디렉토리: {abs_pdf_dir}") # PDF 디렉토리 존재 확인 if os.path.exists(abs_pdf_dir): logger.info(f"PDF 디렉토리가 존재합니다: {abs_pdf_dir}") # 디렉토리 내용 확인 pdf_files = glob.glob(os.path.join(abs_pdf_dir, "*.pdf")) logger.info(f"디렉토리 내 PDF 파일 목록: {pdf_files}") else: logger.error(f"PDF 디렉토리가 존재하지 않습니다: {abs_pdf_dir}") # 상위 디렉토리 내용 확인 parent_dir = os.path.dirname(abs_pdf_dir) logger.info(f"상위 디렉토리: {parent_dir}") if os.path.exists(parent_dir): dir_contents = os.listdir(parent_dir) logger.info(f"상위 디렉토리 내용: {dir_contents}") # 설정 상태 확인 logger.info("애플리케이션 설정 검증 중...") config_status = validate_config() if config_status["status"] != "valid": for warning in config_status["warnings"]: logger.warning(f"설정 경고: {warning}") # 안전한 임포트 try: from rag_chain import RAGChain RAG_CHAIN_AVAILABLE = True print("RAG 체인 모듈 로드 성공!") except ImportError as e: logger.warning(f"RAG 체인 모듈을 로드할 수 없습니다: {e}") RAG_CHAIN_AVAILABLE = False except Exception as e: logger.warning(f"RAG 체인 모듈 로드 중 예상치 못한 오류: {e}") RAG_CHAIN_AVAILABLE = False # 폴백 RAG 관련 모듈도 미리 확인 try: from fallback_rag_chain import FallbackRAGChain FALLBACK_AVAILABLE = True print("폴백 RAG 체인 모듈 로드 성공!") except ImportError as e: logger.warning(f"폴백 RAG 체인 모듈을 로드할 수 없습니다: {e}") FALLBACK_AVAILABLE = False try: from offline_fallback_rag import OfflineFallbackRAG OFFLINE_FALLBACK_AVAILABLE = True print("오프라인 폴백 RAG 모듈 로드 성공!") except ImportError as e: logger.warning(f"오프라인 폴백 RAG 모듈을 로드할 수 없습니다: {e}") OFFLINE_FALLBACK_AVAILABLE = False class DocumentProcessingError(Exception): """문서 처리 중 발생하는 예외""" pass class VectorStoreError(Exception): """벡터 스토어 작업 중 발생하는 예외""" pass class RAGInitializationError(Exception): """RAG 체인 초기화 중 발생하는 예외""" pass class ConfigurationError(Exception): """설정 관련 오류""" pass class AutoRAGChatApp: """ documents 폴더의 PDF 파일을 자동으로 처리하는 RAG 챗봇 """ def __init__(self): """ RAG 챗봇 애플리케이션 초기화 """ try: logger.info("AutoRAGChatApp 초기화 시작") # 데이터 디렉토리 정의 (설정에서 가져옴) # 절대 경로로 변환하여 사용 self.pdf_directory = os.path.abspath(PDF_DIRECTORY) self.cache_directory = os.path.abspath(CACHE_DIRECTORY) self.index_file = os.path.join(self.cache_directory, "file_index.json") self.chunks_dir = os.path.join(self.cache_directory, "chunks") self.vector_index_dir = os.path.join(self.cache_directory, "vector_index") logger.info(f"설정된 PDF 디렉토리 (절대 경로): {self.pdf_directory}") # 디렉토리 검증 self._verify_pdf_directory() # 디렉토리 생성 self._ensure_directories_exist() logger.info(f"PDF 문서 디렉토리: '{self.pdf_directory}'") logger.info(f"캐시 디렉토리: '{self.cache_directory}'") # 컴포넌트 초기화 try: self.document_processor = OptimizedDocumentProcessor( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) except Exception as e: logger.error(f"문서 처리기 초기화 실패: {e}") raise DocumentProcessingError(f"문서 처리기 초기화 실패: {str(e)}") # 벡터 저장소 초기화 try: self.vector_store = VectorStore(use_milvus=False) except Exception as e: logger.error(f"벡터 저장소 초기화 실패: {e}") raise VectorStoreError(f"벡터 저장소 초기화 실패: {str(e)}") # 문서 인덱스 로드 self.file_index = self._load_file_index() # 기본 변수 초기화 self.documents = [] self.processed_files = [] self.is_initialized = False # 시작 시 자동으로 문서 로드 및 처리 logger.info("문서 자동 로드 및 처리 시작...") self.auto_process_documents() logger.info("AutoRAGChatApp 초기화 완료") except Exception as e: logger.critical(f"애플리케이션 초기화 중 심각한 오류: {e}", exc_info=True) # 기본 상태 설정으로 최소한의 기능 유지 self.pdf_directory = os.path.abspath(PDF_DIRECTORY) self.documents = [] self.processed_files = [] self.is_initialized = False self.file_index = {} def _ensure_directories_exist(self) -> None: """ 필요한 디렉토리가 존재하는지 확인하고 생성 """ directories = [ self.pdf_directory, self.cache_directory, self.chunks_dir, self.vector_index_dir ] for directory in directories: try: os.makedirs(directory, exist_ok=True) except Exception as e: logger.error(f"디렉토리 생성 실패 '{directory}': {e}") raise OSError(f"디렉토리 생성 실패 '{directory}': {str(e)}") def _process_pdf_file(self, file_path: str) -> List[Document]: """ PDF 파일 처리 - docling 실패 시 PyPDFLoader 사용 Args: file_path: 처리할 PDF 파일 경로 Returns: 처리된 문서 청크 리스트 """ if not os.path.exists(file_path): logger.error(f"파일이 존재하지 않음: {file_path}") raise FileNotFoundError(f"파일이 존재하지 않음: {file_path}") try: logger.info(f"docling으로 처리 시도: {file_path}") # docling 사용 시도 try: # 10초 타임아웃 설정 (옵션) import signal def timeout_handler(signum, frame): raise TimeoutError("docling 처리 시간 초과 (60초)") # 리눅스/맥에서만 작동 (윈도우에서는 무시됨) try: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(60) # 60초 타임아웃 except (AttributeError, ValueError) as se: logger.warning(f"시그널 설정 실패 (윈도우 환경일 수 있음): {se}") # docling으로 처리 시도 chunks = self.document_processor.process_pdf(file_path, use_docling=True) # 타임아웃 취소 try: signal.alarm(0) except (AttributeError, ValueError): pass return chunks except TimeoutError as te: logger.warning(f"docling 처리 시간 초과: {te}") logger.info("PyPDFLoader로 대체합니다.") # PyPDFLoader로 대체 try: return self.document_processor.process_pdf(file_path, use_docling=False) except Exception as inner_e: logger.error(f"PyPDFLoader 처리 오류: {inner_e}", exc_info=True) raise DocumentProcessingError(f"PDF 로딩 실패 (PyPDFLoader): {str(inner_e)}") except Exception as e: # docling 오류 확인 error_str = str(e) if "Invalid code point" in error_str or "RuntimeError" in error_str: logger.warning(f"docling 처리 오류 (코드 포인트 문제): {error_str}") logger.info("PyPDFLoader로 대체합니다.") else: logger.warning(f"docling 처리 오류: {error_str}") logger.info("PyPDFLoader로 대체합니다.") # PyPDFLoader로 대체 try: return self.document_processor.process_pdf(file_path, use_docling=False) except Exception as inner_e: logger.error(f"PyPDFLoader 처리 오류: {inner_e}", exc_info=True) raise DocumentProcessingError(f"PDF 로딩 실패 (PyPDFLoader): {str(inner_e)}") except DocumentProcessingError: # 이미 래핑된 예외는 그대로 전달 raise except Exception as e: logger.error(f"PDF 처리 중 심각한 오류: {e}", exc_info=True) # 빈 청크라도 반환하여 전체 처리가 중단되지 않도록 함 logger.warning(f"'{file_path}' 처리 실패로 빈 청크 목록 반환") return [] def _load_file_index(self) -> Dict[str, Dict[str, Any]]: """ 파일 인덱스 로드 Returns: 파일 경로 -> 메타데이터 매핑 """ if os.path.exists(self.index_file): try: with open(self.index_file, 'r', encoding='utf-8') as f: return json.load(f) except json.JSONDecodeError as e: logger.error(f"인덱스 파일 JSON 파싱 실패: {e}") logger.warning("손상된 인덱스 파일, 새로운 인덱스를 생성합니다.") return {} except Exception as e: logger.error(f"인덱스 파일 로드 실패: {e}") return {} return {} def _save_file_index(self) -> None: """ 파일 인덱스 저장 """ try: with open(self.index_file, 'w', encoding='utf-8') as f: json.dump(self.file_index, f, ensure_ascii=False, indent=2) logger.debug("파일 인덱스 저장 완료") except Exception as e: logger.error(f"파일 인덱스 저장 실패: {e}") raise IOError(f"파일 인덱스 저장 실패: {str(e)}") def _calculate_file_hash(self, file_path: str) -> str: """ 파일 해시 계산 Args: file_path: 파일 경로 Returns: MD5 해시값 """ if not os.path.exists(file_path): logger.error(f"해시 계산 실패 - 파일이 존재하지 않음: {file_path}") raise FileNotFoundError(f"파일이 존재하지 않음: {file_path}") try: hasher = hashlib.md5() with open(file_path, 'rb') as f: buf = f.read(65536) while len(buf) > 0: hasher.update(buf) buf = f.read(65536) return hasher.hexdigest() except Exception as e: logger.error(f"파일 해시 계산 중 오류: {e}") raise IOError(f"파일 해시 계산 실패: {str(e)}") def _is_file_processed(self, file_path: str) -> bool: """ 파일이 이미 처리되었고 변경되지 않았는지 확인 Args: file_path: 파일 경로 Returns: 처리 여부 """ # 파일 존재 확인 if not os.path.exists(file_path): logger.warning(f"파일이 존재하지 않음: {file_path}") return False # 인덱스에 파일 존재 여부 확인 if file_path not in self.file_index: return False try: # 현재 해시값 계산 current_hash = self._calculate_file_hash(file_path) # 저장된 해시값과 비교 if self.file_index[file_path]['hash'] != current_hash: logger.info(f"파일 변경 감지: {file_path}") return False # 청크 파일 존재 확인 chunks_path = self.file_index[file_path]['chunks_path'] if not os.path.exists(chunks_path): logger.warning(f"청크 파일이 존재하지 않음: {chunks_path}") return False return True except Exception as e: logger.error(f"파일 처리 상태 확인 중 오류: {e}") return False def _get_chunks_path(self, file_hash: str) -> str: """ 청크 파일 경로 생성 Args: file_hash: 파일 해시값 Returns: 청크 파일 경로 """ return os.path.join(self.chunks_dir, f"{file_hash}.pkl") def _save_chunks(self, file_path: str, chunks: List[Document]) -> None: """ 청크 데이터 저장 Args: file_path: 원본 파일 경로 chunks: 문서 청크 리스트 """ try: # 해시 계산 file_hash = self._calculate_file_hash(file_path) # 청크 파일 경로 chunks_path = self._get_chunks_path(file_hash) # 청크 데이터 저장 with open(chunks_path, 'wb') as f: pickle.dump(chunks, f) # 인덱스 업데이트 self.file_index[file_path] = { 'hash': file_hash, 'chunks_path': chunks_path, 'last_processed': time.time(), 'chunks_count': len(chunks), 'file_size': os.path.getsize(file_path), 'file_name': os.path.basename(file_path) } # 인덱스 저장 self._save_file_index() logger.info(f"청크 저장 완료: {file_path} ({len(chunks)}개 청크)") except Exception as e: logger.error(f"청크 저장 실패: {e}", exc_info=True) raise IOError(f"청크 저장 실패: {str(e)}") def _load_chunks(self, file_path: str) -> List[Document]: """ 저장된 청크 데이터 로드 Args: file_path: 파일 경로 Returns: 문서 청크 리스트 """ if file_path not in self.file_index: logger.error(f"인덱스에 파일이 존재하지 않음: {file_path}") raise KeyError(f"인덱스에 파일이 존재하지 않음: {file_path}") chunks_path = self.file_index[file_path]['chunks_path'] if not os.path.exists(chunks_path): logger.error(f"청크 파일이 존재하지 않음: {chunks_path}") raise FileNotFoundError(f"청크 파일이 존재하지 않음: {chunks_path}") try: with open(chunks_path, 'rb') as f: chunks = pickle.load(f) logger.info(f"청크 로드 완료: {file_path} ({len(chunks)}개 청크)") return chunks except pickle.UnpicklingError as e: logger.error(f"청크 파일 역직렬화 실패: {e}") raise IOError(f"청크 파일 손상: {str(e)}") except Exception as e: logger.error(f"청크 로드 실패: {e}", exc_info=True) raise IOError(f"청크 로드 실패: {str(e)}") def _verify_pdf_directory(self): """PDF 디렉토리 검증 및 파일 존재 확인""" try: # 디렉토리 존재 확인 if not os.path.exists(self.pdf_directory): try: logger.warning(f"PDF 디렉토리가 존재하지 않아 생성합니다: {self.pdf_directory}") os.makedirs(self.pdf_directory, exist_ok=True) except Exception as e: logger.error(f"PDF 디렉토리 생성 실패: {e}") raise # 디렉토리인지 확인 if not os.path.isdir(self.pdf_directory): logger.error(f"PDF 경로가 디렉토리가 아닙니다: {self.pdf_directory}") raise ConfigurationError(f"PDF 경로가 디렉토리가 아닙니다: {self.pdf_directory}") # PDF 파일 존재 확인 pdf_files = [f for f in os.listdir(self.pdf_directory) if f.lower().endswith('.pdf')] if pdf_files: logger.info(f"PDF 디렉토리에서 {len(pdf_files)}개의 PDF 파일을 찾았습니다: {pdf_files}") else: # 여러 경로에서 PDF 파일 탐색 시도 alternative_paths = [ "./documents", "../documents", "documents", os.path.join(os.getcwd(), "documents") ] found_pdfs = False for alt_path in alternative_paths: if os.path.exists(alt_path) and os.path.isdir(alt_path): alt_pdf_files = [f for f in os.listdir(alt_path) if f.lower().endswith('.pdf')] if alt_pdf_files: logger.warning(f"대체 경로 '{alt_path}'에서 PDF 파일을 찾았습니다. 이 경로를 사용합니다.") self.pdf_directory = os.path.abspath(alt_path) found_pdfs = True break if not found_pdfs: logger.warning(f"PDF 디렉토리에 PDF 파일이 없습니다: {self.pdf_directory}") logger.info("PDF 파일을 디렉토리에 추가해주세요.") except Exception as e: logger.error(f"PDF 디렉토리 검증 중 오류: {e}", exc_info=True) raise def auto_process_documents(self) -> str: """ documents 폴더의 PDF 파일 자동 처리 Returns: 처리 결과 메시지 """ try: start_time = time.time() # PDF 파일 목록 수집을 개선하여 다양한 경로 처리 try: pdf_files = [] # 설정된 디렉토리에서 PDF 파일 찾기 logger.info(f"PDF 파일 검색 경로: {self.pdf_directory}") if os.path.exists(self.pdf_directory) and os.path.isdir(self.pdf_directory): # 디렉토리 내용 출력 (디버깅용) dir_contents = os.listdir(self.pdf_directory) logger.info(f"디렉토리 내용: {dir_contents}") # PDF 파일만 필터링 for filename in os.listdir(self.pdf_directory): if filename.lower().endswith('.pdf'): file_path = os.path.join(self.pdf_directory, filename) if os.path.isfile(file_path): # 실제 파일인지 확인 pdf_files.append(file_path) logger.info(f"PDF 파일 찾음: {file_path}") # 발견된 모든 파일 로그 logger.info(f"발견된 모든 PDF 파일: {pdf_files}") except FileNotFoundError: logger.error(f"PDF 디렉토리를 찾을 수 없음: {self.pdf_directory}") return f"'{self.pdf_directory}' 디렉토리를 찾을 수 없습니다. 디렉토리가 존재하는지 확인하세요." except PermissionError: logger.error(f"PDF 디렉토리 접근 권한 없음: {self.pdf_directory}") return f"'{self.pdf_directory}' 디렉토리에 접근할 수 없습니다. 권한을 확인하세요." if not pdf_files: logger.warning(f"'{self.pdf_directory}' 폴더에 PDF 파일이 없습니다.") return f"'{self.pdf_directory}' 폴더에 PDF 파일이 없습니다." logger.info(f"발견된 PDF 파일: {len(pdf_files)}개") # 폴더 내 PDF 파일 처리 new_files = [] updated_files = [] cached_files = [] failed_files = [] all_chunks = [] for file_path in pdf_files: try: if self._is_file_processed(file_path): # 캐시에서 청크 로드 try: chunks = self._load_chunks(file_path) all_chunks.extend(chunks) cached_files.append(file_path) self.processed_files.append(os.path.basename(file_path)) except Exception as e: logger.error(f"캐시된 청크 로드 실패: {e}") # 파일을 다시 처리 logger.info(f"캐시 실패로 파일 재처리: {file_path}") chunks = self._process_pdf_file(file_path) if chunks: self._save_chunks(file_path, chunks) all_chunks.extend(chunks) updated_files.append(file_path) self.processed_files.append(os.path.basename(file_path)) else: failed_files.append(file_path) else: # 새 파일 또는 변경된 파일 처리 logger.info(f"처리 중: {file_path}") try: # 개선된 PDF 처리 메서드 사용 chunks = self._process_pdf_file(file_path) if chunks: # 청크가 있는 경우에만 저장 # 청크 저장 self._save_chunks(file_path, chunks) all_chunks.extend(chunks) if file_path in self.file_index: updated_files.append(file_path) else: new_files.append(file_path) self.processed_files.append(os.path.basename(file_path)) else: logger.warning(f"'{file_path}' 처리 실패: 추출된 청크 없음") failed_files.append(file_path) except Exception as e: logger.error(f"'{file_path}' 처리 중 오류: {e}", exc_info=True) failed_files.append(file_path) except Exception as e: logger.error(f"'{file_path}' 파일 처리 루프 중 오류: {e}", exc_info=True) failed_files.append(file_path) # 모든 청크 저장 self.documents = all_chunks processing_time = time.time() - start_time logger.info(f"문서 처리 완료: {len(all_chunks)}개 청크, {processing_time:.2f}초") # 벡터 인덱스 처리 try: self._process_vector_index(new_files, updated_files) except Exception as e: logger.error(f"벡터 인덱스 처리 실패: {e}", exc_info=True) return f"문서는 처리되었으나 벡터 인덱스 생성에 실패했습니다: {str(e)}" # RAG 체인 초기화 if RAG_CHAIN_AVAILABLE: try: logger.info("RAGChain으로 초기화를 시도합니다.") self.rag_chain = RAGChain(self.vector_store) self.is_initialized = True logger.info("RAG 체인 초기화 성공") except Exception as e: logger.error(f"RAG 체인 초기화 실패: {e}", exc_info=True) # FallbackRAGChain으로 대체 시도 try: logger.info("FallbackRAGChain으로 대체합니다...") from fallback_rag_chain import FallbackRAGChain self.rag_chain = FallbackRAGChain(self.vector_store) self.is_initialized = True logger.info("폴백 RAG 체인 초기화 성공") except Exception as fallback_e: logger.error(f"폴백 RAG 체인 초기화 실패: {fallback_e}", exc_info=True) # SimpleRAGChain 시도 (최후의 수단) try: logger.info("SimpleRAGChain으로 대체합니다...") from simple_rag_chain import SimpleRAGChain # API 정보 가져오기 try: from config import DEEPSEEK_API_KEY, DEEPSEEK_MODEL, DEEPSEEK_ENDPOINT logger.info(f"설정 파일에서 DeepSeek API 정보를 로드했습니다: 모델={DEEPSEEK_MODEL}") except ImportError: # 설정 파일에서 가져올 수 없는 경우 환경 변수 확인 DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY", "") DEEPSEEK_MODEL = os.environ.get("DEEPSEEK_MODEL", "deepseek-chat") DEEPSEEK_ENDPOINT = os.environ.get("DEEPSEEK_ENDPOINT", "https://api.deepseek.com/v1/chat/completions") logger.info(f"환경 변수에서 DeepSeek API 정보를 로드했습니다: 모델={DEEPSEEK_MODEL}") # SimpleRAGChain 초기화 시도 self.rag_chain = SimpleRAGChain(self.vector_store) self.is_initialized = True logger.info("SimpleRAGChain 초기화 성공") except Exception as simple_e: logger.error(f"모든 RAG 체인 초기화 실패: {simple_e}", exc_info=True) return f"문서와 벡터 인덱스는 처리되었으나 RAG 체인 초기화에 실패했습니다: {str(e)}" else: # RAGChain을 사용할 수 없는 경우 try: logger.info("기본 RAG Chain을 사용할 수 없어 대체 버전을 시도합니다...") # FallbackRAGChain 시도 try: from fallback_rag_chain import FallbackRAGChain self.rag_chain = FallbackRAGChain(self.vector_store) self.is_initialized = True logger.info("폴백 RAG 체인 초기화 성공") except Exception as fallback_e: logger.error(f"폴백 RAG 체인 초기화 실패: {fallback_e}", exc_info=True) # SimpleRAGChain 시도 (최후의 수단) try: from simple_rag_chain import SimpleRAGChain self.rag_chain = SimpleRAGChain(self.vector_store) self.is_initialized = True logger.info("SimpleRAGChain 초기화 성공") except Exception as simple_e: logger.error(f"모든 RAG 체인 초기화 실패: {simple_e}", exc_info=True) return f"문서와 벡터 인덱스는 처리되었으나 RAG 체인 초기화에 실패했습니다" except Exception as e: logger.error(f"RAG 체인 초기화 실패: {e}", exc_info=True) return f"문서와 벡터 인덱스는 처리되었으나 RAG 체인 초기화에 실패했습니다: {str(e)}" # 성공 메시지 생성 result_message = f"""문서 처리 완료! - 처리된 파일: {len(pdf_files)}개 - 캐시된 파일: {len(cached_files)}개 - 새 파일: {len(new_files)}개 - 업데이트된 파일: {len(updated_files)}개 - 실패한 파일: {len(failed_files)}개 - 총 청크 수: {len(all_chunks)}개 - 처리 시간: {processing_time:.2f}초 이제 질문할 준비가 되었습니다!""" return result_message except Exception as e: error_message = f"문서 처리 중 오류 발생: {str(e)}" logger.error(error_message, exc_info=True) return error_message def _process_vector_index(self, new_files: List[str], updated_files: List[str]) -> None: """ 벡터 인덱스 처리 Args: new_files: 새로 추가된 파일 목록 updated_files: 업데이트된 파일 목록 """ # 벡터 인덱스 저장 경로 확인 if os.path.exists(self.vector_index_dir) and any(os.listdir(self.vector_index_dir)): # 기존 벡터 인덱스 로드 try: logger.info("저장된 벡터 인덱스 로드 중...") vector_store_loaded = self.vector_store.load_local(self.vector_index_dir) # 인덱스 로드 성공 확인 if self.vector_store.vector_store is not None: # 새 문서나 변경된 문서가 있으면 인덱스 업데이트 if new_files or updated_files: logger.info("벡터 인덱스 업데이트 중...") self.vector_store.add_documents(self.documents) logger.info("벡터 인덱스 로드 완료") else: logger.warning("벡터 인덱스를 로드했으나 유효하지 않음, 새로 생성합니다.") self.vector_store.create_or_load(self.documents) except Exception as e: logger.error(f"벡터 인덱스 로드 실패, 새로 생성합니다: {e}", exc_info=True) # 새 벡터 인덱스 생성 self.vector_store.create_or_load(self.documents) else: # 새 벡터 인덱스 생성 logger.info("새 벡터 인덱스 생성 중...") self.vector_store.create_or_load(self.documents) # 벡터 인덱스 저장 if self.vector_store and self.vector_store.vector_store is not None: try: logger.info(f"벡터 인덱스 저장 중: {self.vector_index_dir}") save_result = self.vector_store.save_local(self.vector_index_dir) logger.info(f"벡터 인덱스 저장 완료: {self.vector_index_dir}") except Exception as e: logger.error(f"벡터 인덱스 저장 실패: {e}", exc_info=True) raise VectorStoreError(f"벡터 인덱스 저장 실패: {str(e)}") else: logger.warning("벡터 인덱스가 초기화되지 않아 저장하지 않습니다.") def reset_cache(self) -> str: """ 캐시 초기화 Returns: 결과 메시지 """ try: # 청크 파일 삭제 try: for filename in os.listdir(self.chunks_dir): file_path = os.path.join(self.chunks_dir, filename) if os.path.isfile(file_path): os.remove(file_path) logger.info("청크 캐시 파일 삭제 완료") except Exception as e: logger.error(f"청크 파일 삭제 중 오류: {e}") return f"청크 파일 삭제 중 오류 발생: {str(e)}" # 인덱스 초기화 self.file_index = {} try: self._save_file_index() logger.info("파일 인덱스 초기화 완료") except Exception as e: logger.error(f"인덱스 파일 초기화 중 오류: {e}") return f"인덱스 파일 초기화 중 오류 발생: {str(e)}" # 벡터 인덱스 삭제 try: for filename in os.listdir(self.vector_index_dir): file_path = os.path.join(self.vector_index_dir, filename) if os.path.isfile(file_path): os.remove(file_path) logger.info("벡터 인덱스 파일 삭제 완료") except Exception as e: logger.error(f"벡터 인덱스 파일 삭제 중 오류: {e}") return f"벡터 인덱스 파일 삭제 중 오류 발생: {str(e)}" self.documents = [] self.processed_files = [] self.is_initialized = False logger.info("캐시 초기화 완료") return "캐시가 초기화되었습니다. 다음 실행 시 모든 문서가 다시 처리됩니다." except Exception as e: error_msg = f"캐시 초기화 중 오류 발생: {str(e)}" logger.error(error_msg, exc_info=True) return error_msg def process_query(self, query: str, chat_history: List[Tuple[str, str]]) -> Tuple[str, List[Tuple[str, str]]]: """ 사용자 쿼리 처리 Args: query: 사용자 질문 chat_history: 대화 기록 Returns: 응답 및 업데이트된 대화 기록 """ if not query or not query.strip(): response = "질문이 비어 있습니다. 질문을 입력해 주세요." chat_history.append((query, response)) return "", chat_history if not self.is_initialized: response = "문서 로드가 초기화되지 않았습니다. 자동 로드를 시도합니다." chat_history.append((query, response)) # 자동 로드 시도 try: init_result = self.auto_process_documents() if not self.is_initialized: response = f"문서를 로드할 수 없습니다. 'documents' 폴더에 PDF 파일이 있는지 확인하세요. 초기화 결과: {init_result}" chat_history.append((query, response)) return "", chat_history except Exception as e: response = f"문서 로드 중 오류 발생: {str(e)}" logger.error(f"자동 로드 실패: {e}", exc_info=True) chat_history.append((query, response)) return "", chat_history try: # RAG 체인 실행 및 응답 생성 start_time = time.time() logger.info(f"쿼리 처리 시작: {query}") # rag_chain이 초기화되었는지 확인 if not hasattr(self, 'rag_chain') or self.rag_chain is None: raise RAGInitializationError("RAG 체인이 초기화되지 않았습니다") # 1. 먼저 표준 RAG 체인으로 시도 try: response = self.rag_chain.run(query) logger.info(f"기본 RAG 체인으로 응답 생성 성공") except Exception as rag_error: logger.error(f"기본 RAG 체인 실행 실패: {rag_error}, 대안 시도") # 2. DeepSeek API 직접 호출 시도 (RAG 체인 우회) try: # DeepSeek API 정보 가져오기 try: from config import DEEPSEEK_API_KEY, DEEPSEEK_MODEL, DEEPSEEK_ENDPOINT except ImportError: # 설정 모듈에서 가져올 수 없는 경우 기본값 설정 DEEPSEEK_API_KEY = os.environ.get("DEEPSEEK_API_KEY", "") DEEPSEEK_MODEL = os.environ.get("DEEPSEEK_MODEL", "deepseek-chat") DEEPSEEK_ENDPOINT = os.environ.get("DEEPSEEK_ENDPOINT", "https://api.deepseek.com/v1/chat/completions") # 직접 API 호출 함수 정의 (외부 모듈 의존성 제거) def direct_api_call(query, context, api_key, model_name, endpoint, max_retries=3, timeout=60): """DeepSeek API 직접 호출 함수""" import requests import json import time # 프롬프트 길이 제한 if len(context) > 6000: context = context[:2500] + "\n...(중략)...\n" + context[-2500:] # 프롬프트 구성 prompt = f""" 다음 정보를 기반으로 질문에 정확하게 답변해주세요. 질문: {query} 참고 정보: {context} 참고 정보에 답이 있으면 반드시 그 정보를 기반으로 답변하세요. 참고 정보에 답이 없는 경우에는 일반적인 지식을 활용하여 답변할 수 있지만, "제공된 문서에는 이 정보가 없으나, 일반적으로는..." 식으로 시작하세요. 답변은 정확하고 간결하게 제공하되, 가능한 참고 정보에서 근거를 찾아 설명해주세요. 참고 정보의 출처도 함께 알려주세요. """ # API 요청 시도 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } payload = { "model": model_name, "messages": [{"role": "user", "content": prompt}], "temperature": 0.3, "max_tokens": 1000 } # 재시도 로직 retry_delay = 1.0 for attempt in range(max_retries): try: logger.info(f"DeepSeek API 직접 호출 시도 ({attempt + 1}/{max_retries})...") response = requests.post( endpoint, headers=headers, json=payload, timeout=timeout ) if response.status_code == 200: result = response.json() content = result.get("choices", [{}])[0].get("message", {}).get("content", "") logger.info(f"DeepSeek API 직접 호출 성공") return content else: logger.warning(f"API 오류: 상태 코드 {response.status_code}") # 요청 한도인 경우 더 오래 대기 if response.status_code == 429: retry_delay = min(retry_delay * 3, 15) else: retry_delay = min(retry_delay * 2, 10) if attempt < max_retries - 1: logger.info(f"{retry_delay}초 후 재시도...") time.sleep(retry_delay) except Exception as e: logger.error(f"API 호출 오류: {e}") if attempt < max_retries - 1: logger.info(f"{retry_delay}초 후 재시도...") time.sleep(retry_delay) retry_delay = min(retry_delay * 2, 10) # 모든 시도 실패 raise Exception("최대 재시도 횟수 초과") # 벡터 검색 수행 if self.vector_store and hasattr(self.vector_store, "similarity_search"): logger.info("벡터 검색 수행...") docs = self.vector_store.similarity_search(query, k=5) # 검색 결과 컨텍스트 구성 context_parts = [] for i, doc in enumerate(docs, 1): source = doc.metadata.get("source", "알 수 없는 출처") page = doc.metadata.get("page", "") source_info = f"{source}" if page: source_info += f" (페이지: {page})" context_parts.append(f"[참고자료 {i}] - 출처: {source_info}\n{doc.page_content}\n") context = "\n".join(context_parts) # 직접 API 호출 logger.info("DeepSeek API 직접 호출 시도...") response = direct_api_call( query, context, DEEPSEEK_API_KEY, DEEPSEEK_MODEL, DEEPSEEK_ENDPOINT, max_retries=3, timeout=120 ) logger.info("DeepSeek API 직접 호출 성공") else: raise Exception("벡터 스토어가 초기화되지 않았습니다") except Exception as direct_api_error: logger.error(f"DeepSeek API 직접 호출 실패: {direct_api_error}, 검색 결과 반환") # 3. 검색 결과만이라도 반환 try: # 벡터 검색 수행 if self.vector_store and hasattr(self.vector_store, "similarity_search"): docs = self.vector_store.similarity_search(query, k=5) # 검색 결과 컨텍스트 구성 context_parts = [] for i, doc in enumerate(docs, 1): source = doc.metadata.get("source", "알 수 없는 출처") page = doc.metadata.get("page", "") source_info = f"{source}" if page: source_info += f" (페이지: {page})" context_parts.append(f"[참고자료 {i}] - 출처: {source_info}\n{doc.page_content}\n") context = "\n".join(context_parts) # 간단한 응답 생성 predefined_answers = { "대한민국의 수도": "대한민국의 수도는 서울입니다.", "수도": "대한민국의 수도는 서울입니다.", "누구야": "저는 RAG 기반 질의응답 시스템입니다. 문서를 검색하고 관련 정보를 찾아드립니다.", "안녕": "안녕하세요! 무엇을 도와드릴까요?", "뭐해": "사용자의 질문에 답변하기 위해 문서를 검색하고 있습니다. 무엇을 알려드릴까요?" } # 질문에 맞는 미리 정의된 응답이 있는지 확인 for key, answer in predefined_answers.items(): if key in query.lower(): response = answer logger.info(f"미리 정의된 응답 제공: {key}") break else: # 미리 정의된 응답이 없으면 검색 결과만 표시 response = f""" API 서버 연결에 문제가 있어 검색 결과만 표시합니다. 질문: {query} 검색된 관련 문서: {context} [참고] API 연결 문제로 인해 자동 요약이 제공되지 않습니다. 다시 시도하거나 다른 질문을 해보세요. """ logger.info("검색 결과만 표시") else: response = f"API 연결 및 벡터 검색에 모두 실패했습니다. 시스템 관리자에게 문의하세요." except Exception as fallback_error: logger.error(f"최종 폴백 응답 생성 실패: {fallback_error}") # 4. 최후의 방법: 오류 메시지를 응답으로 반환 if "Connection error" in str(rag_error) or "timeout" in str(rag_error).lower(): response = f""" API 서버 연결에 문제가 있습니다. 잠시 후 다시 시도해주세요. 질문: {query} [참고] 현재 DeepSeek API 서버와의 연결이 원활하지 않습니다. 이로 인해 질문에 대한 응답을 제공할 수 없습니다. """ else: response = f"쿼리 처리 중 오류가 발생했습니다: {str(rag_error)}" end_time = time.time() query_time = end_time - start_time logger.info(f"쿼리 처리 완료: {query_time:.2f}초") chat_history.append((query, response)) return "", chat_history except RAGInitializationError as e: error_msg = f"RAG 시스템 초기화 오류: {str(e)}. 'documents' 폴더에 PDF 파일이 있는지 확인하고, 재시작해 보세요." logger.error(f"쿼리 처리 중 RAG 초기화 오류: {e}", exc_info=True) chat_history.append((query, error_msg)) return "", chat_history except (VectorStoreError, DocumentProcessingError) as e: error_msg = f"문서 처리 시스템 오류: {str(e)}. 문서 형식이 올바른지 확인해 보세요." logger.error(f"쿼리 처리 중 문서/벡터 스토어 오류: {e}", exc_info=True) chat_history.append((query, error_msg)) return "", chat_history except Exception as e: error_msg = f"쿼리 처리 중 오류 발생: {str(e)}" logger.error(f"쿼리 처리 중 예상치 못한 오류: {e}", exc_info=True) chat_history.append((query, error_msg)) return "", chat_history def launch_app(self) -> None: """ Gradio 앱 실행 """ try: import gradio as gr except ImportError: logger.error("Gradio 라이브러리를 찾을 수 없습니다. pip install gradio로 설치하세요.") print("Gradio 라이브러리를 찾을 수 없습니다. pip install gradio로 설치하세요.") return app_instance = self try: with gr.Blocks(title="PDF 문서 기반 RAG 챗봇") as app: gr.Markdown("# PDF 문서 기반 RAG 챗봇") gr.Markdown(f"* 사용 중인 LLM 모델: **{LLM_MODEL}**") actual_pdf_dir = ( self.pdf_directory.replace("\\", "\\\\") if os.name == 'nt' else self.pdf_directory ) gr.Markdown(f"* PDF 문서 폴더: **{actual_pdf_dir}**") with gr.Row(): with gr.Column(scale=1): status_box = gr.Textbox( label="문서 처리 상태", value=self._get_status_message(), lines=5, interactive=False ) refresh_button = gr.Button("문서 새로 읽기", variant="primary") reset_button = gr.Button("캐시 초기화", variant="stop") status_info = gr.Markdown( value=f"시스템 상태: {'초기화됨' if self.is_initialized else '초기화되지 않음'}" ) with gr.Accordion("캐시 세부 정보", open=False): cache_info = gr.Textbox( label="캐시된 파일 정보", value=self._get_cache_info(), lines=5, interactive=False ) with gr.Column(scale=2): chatbot = gr.Chatbot( label="대화 내용", bubble_full_width=False, height=500, show_copy_button=True ) with gr.Row(): with gr.Column(scale=4): query_box = gr.Textbox( label="질문", placeholder="처리된 문서 내용에 대해 질문하세요...", lines=2 ) with gr.Column(scale=1): audio_input = gr.Audio( sources=["microphone"], type="numpy", label="음성으로 질문하기" ) with gr.Row(): submit_btn = gr.Button("전송", variant="primary") clear_chat_button = gr.Button("대화 초기화") # VITO STT용 음성 처리 함수 def process_audio(audio): logger.info("음성 인식 처리 시작...") try: from vito_stt import VitoSTT import soundfile as sf import tempfile import os if audio is None: return "음성이 녹음되지 않았습니다." sr, y = audio logger.info(f"오디오 녹음 데이터 수신: 샘플레이트={sr}Hz, 길이={len(y)}샘플") if len(y) / sr < 1.0: return "녹음된 음성이 너무 짧습니다. 다시 시도해주세요." with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp_path = tmp.name sf.write(tmp_path, y, sr, format="WAV") logger.info(f"임시 WAV 파일 저장됨: {tmp_path}") vito = VitoSTT() with open(tmp_path, "rb") as f: audio_bytes = f.read() result = vito.transcribe_audio(audio_bytes, language="ko") try: os.unlink(tmp_path) logger.info("임시 오디오 파일 삭제됨") except Exception as e: logger.warning(f"임시 파일 삭제 실패: {e}") if result.get("success"): recognized_text = result.get("text", "") logger.info(f"음성인식 성공: {recognized_text}") return recognized_text else: error_msg = f"음성 인식 실패: {result.get('error', '알 수 없는 오류')}" logger.error(error_msg) return error_msg except ImportError as e: logger.error(f"필요한 라이브러리 누락: {e}") return ("음성인식에 필요한 라이브러리가 설치되지 않았습니다. " "pip install soundfile numpy requests 를 실행해주세요.") except Exception as e: logger.error(f"음성 처리 중 오류 발생: {e}", exc_info=True) return f"음성 처리 중 오류 발생: {str(e)}" # 음성 인식 후 자동 질문 처리 def process_audio_and_submit(audio, chat_history): recognized_text = process_audio(audio) if (not recognized_text or recognized_text.startswith("음성 인식 실패") or recognized_text.startswith("음성 처리 중 오류")): return recognized_text, chat_history return app_instance.process_query(recognized_text, chat_history) def update_ui_after_refresh(result): return ( result, app_instance._get_status_message(), f"시스템 상태: {'초기화됨' if app_instance.is_initialized else '초기화되지 않음'}", app_instance._get_cache_info() ) # 이벤트 핸들러 바인딩 audio_input.stop_recording( fn=process_audio_and_submit, inputs=[audio_input, chatbot], outputs=[query_box, chatbot] ) audio_input.stop_recording( fn=process_audio, inputs=[audio_input], outputs=[query_box] ) refresh_button.click( fn=lambda: update_ui_after_refresh(self.auto_process_documents()), inputs=[], outputs=[status_box, status_box, status_info, cache_info] ) reset_button.click( fn=lambda: update_ui_after_refresh( f"{self.reset_cache()}\n\n{self.auto_process_documents()}" ), inputs=[], outputs=[status_box, status_box, status_info, cache_info] ) submit_btn.click( fn=self.process_query, inputs=[query_box, chatbot], outputs=[query_box, chatbot] ) query_box.submit( fn=self.process_query, inputs=[query_box, chatbot], outputs=[query_box, chatbot] ) clear_chat_button.click( fn=lambda: [], outputs=[chatbot] ) app.launch(share=False) except Exception as e: logger.error(f"Gradio 앱 실행 중 오류 발생: {e}", exc_info=True) print(f"Gradio 앱 실행 중 오류 발생: {e}") def _get_status_message(self) -> str: """ 현재 처리 상태 메시지 생성 Returns: 상태 메시지 """ if not self.processed_files: return "처리된 문서가 없습니다. '문서 새로 읽기' 버튼을 클릭하세요." # DeepSeek API 상태 확인 from config import USE_DEEPSEEK, DEEPSEEK_API_KEY, DEEPSEEK_MODEL model_info = "" if USE_DEEPSEEK and DEEPSEEK_API_KEY: # DeepSeek API 테스트 수행 try: # 테스트 함수 가져오기 시도 try: from deepseek_utils import test_deepseek_api # DeepSeek 설정 가져오기 from config import DEEPSEEK_ENDPOINT # API 테스트 test_result = test_deepseek_api(DEEPSEEK_API_KEY, DEEPSEEK_ENDPOINT, DEEPSEEK_MODEL) if test_result["success"]: model_info = f"\nDeepSeek API 상태: 정상 ({DEEPSEEK_MODEL})" else: model_info = f"\nDeepSeek API 상태: 오류 - {test_result['message']}" except ImportError: # 직접 테스트 실행 import requests import json # DeepSeek 설정 가져오기 from config import DEEPSEEK_ENDPOINT # 테스트용 간단한 프롬프트 test_prompt = "Hello, please respond with a short greeting." # API 요청 헤더 및 데이터 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {DEEPSEEK_API_KEY}" } payload = { "model": DEEPSEEK_MODEL, "messages": [{"role": "user", "content": test_prompt}], "temperature": 0.7, "max_tokens": 50 } # API 요청 전송 try: response = requests.post( DEEPSEEK_ENDPOINT, headers=headers, data=json.dumps(payload), timeout=5 # 5초 타임아웃 (UI 반응성 유지) ) # 응답 확인 if response.status_code == 200: model_info = f"\nDeepSeek API 상태: 정상 ({DEEPSEEK_MODEL})" else: error_message = response.text[:100] model_info = f"\nDeepSeek API 상태: 오류 (상태 코드: {response.status_code})" except Exception as e: model_info = f"\nDeepSeek API 상태: 연결 실패 ({str(e)[:100]})" except Exception as e: model_info = f"\nDeepSeek API 상태 확인 실패: {str(e)[:100]}" return f"처리된 문서 ({len(self.processed_files)}개): {', '.join(self.processed_files)}{model_info}" def _get_cache_info(self) -> str: """ 캐시 세부 정보 메시지 생성 Returns: 캐시 정보 메시지 """ if not self.file_index: return "캐시된 파일이 없습니다." file_info = "" for file_path, info in self.file_index.items(): file_name = info.get('file_name', os.path.basename(file_path)) chunks_count = info.get('chunks_count', 0) file_size = info.get('file_size', 0) last_processed = info.get('last_processed', 0) # 파일 크기를 사람이 읽기 쉬운 형태로 변환 if file_size < 1024: size_str = f"{file_size} bytes" elif file_size < 1024 * 1024: size_str = f"{file_size / 1024:.1f} KB" else: size_str = f"{file_size / (1024 * 1024):.1f} MB" # 마지막 처리 시간을 날짜/시간 형식으로 변환 if last_processed: from datetime import datetime last_time = datetime.fromtimestamp(last_processed).strftime('%Y-%m-%d %H:%M:%S') else: last_time = "알 수 없음" file_info += f"- {file_name}: {chunks_count}개 청크, {size_str}, 마지막 처리: {last_time}\n" return file_info if __name__ == "__main__": app = AutoRAGChatApp() app.launch_app()