# Cell 2: Import necessary libraries import os import re import json import time import nltk import spacy import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import plotly.express as px import plotly.graph_objects as go from IPython.display import display, HTML, clear_output from datetime import datetime from tqdm.auto import tqdm import tempfile import shutil import logging import warnings from pathlib import Path import gradio as gr # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) logger = logging.getLogger("UnstructuredApp") # Suppress warnings warnings.filterwarnings('ignore') # Download required NLTK data nltk.download('punkt', quiet=True) nltk.download('stopwords', quiet=True) nltk.download('wordnet', quiet=True) # Load spaCy model nlp = spacy.load("en_core_web_sm") # Import Unstructured components from unstructured.partition.auto import partition from unstructured.partition.pdf import partition_pdf from unstructured.partition.html import partition_html from unstructured.partition.pptx import partition_pptx from unstructured.partition.docx import partition_docx from unstructured.partition.xlsx import partition_xlsx from unstructured.partition.image import partition_image from unstructured.partition.email import partition_email from unstructured.partition.json import partition_json from unstructured.partition.csv import partition_csv from unstructured.partition.xml import partition_xml from unstructured.cleaners.core import ( clean_extra_whitespace, replace_unicode_quotes, clean_bullets, group_broken_paragraphs, clean_dashes, remove_punctuation ) # Use regex patterns instead of unavailable extract functions import re from unstructured.staging.base import elements_to_json from unstructured.chunking.title import chunk_by_title from unstructured.staging.base import convert_to_dict from unstructured.documents.elements import ( Title, Text, NarrativeText, ListItem, Table, Image, PageBreak, Footer, Header, Address ) # Define our own regex patterns for extraction EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' URL_PATTERN = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*/?' PHONE_PATTERN = r'(\+\d{1,3}[- ]?)?\(?\d{3}\)?[- ]?\d{3}[- ]?\d{4}' IP_PATTERN = r'\b(?:\d{1,3}\.){3}\d{1,3}\b' from sentence_transformers import SentenceTransformer, util # Cell 3: Define utility functions for file handling and processing def create_temp_dir(): """Create a temporary directory for file uploads""" temp_dir = tempfile.mkdtemp() return temp_dir def save_uploaded_file(file, temp_dir): """Save uploaded file to temporary directory""" if file is None: return None file_path = os.path.join(temp_dir, file.name) with open(file_path, 'wb') as f: f.write(file.read()) return file_path def get_file_extension(file_path): """Get file extension from path""" if file_path is None: return None return os.path.splitext(file_path)[1].lower() def identify_file_type(file_path): """Identify file type based on extension""" if file_path is None: return None ext = get_file_extension(file_path) file_types = { '.pdf': 'PDF', '.html': 'HTML', '.htm': 'HTML', '.docx': 'DOCX', '.doc': 'DOC', '.pptx': 'PPTX', '.ppt': 'PPT', '.xlsx': 'XLSX', '.xls': 'XLS', '.txt': 'TXT', '.csv': 'CSV', '.json': 'JSON', '.xml': 'XML', '.eml': 'EMAIL', '.msg': 'EMAIL', '.jpg': 'IMAGE', '.jpeg': 'IMAGE', '.png': 'IMAGE', '.tiff': 'IMAGE', '.tif': 'IMAGE' } return file_types.get(ext, 'UNKNOWN') def partition_file(file_path, partition_kwargs=None): """ Partition file using appropriate method based on file type Args: file_path: Path to the file partition_kwargs: Dictionary of kwargs for partition function Returns: List of elements """ if file_path is None: return [] if partition_kwargs is None: partition_kwargs = {} file_type = identify_file_type(file_path) try: if file_type == 'PDF': # Add PDF-specific kwargs pdf_kwargs = { 'extract_images': True, 'infer_table_structure': True, 'include_page_breaks': True, **partition_kwargs } return partition_pdf(filename=file_path, **pdf_kwargs) elif file_type == 'HTML': # Add HTML-specific kwargs html_kwargs = { 'extract_links': True, **partition_kwargs } return partition_html(filename=file_path, **html_kwargs) elif file_type == 'DOCX': return partition_docx(filename=file_path, **partition_kwargs) elif file_type == 'PPTX': return partition_pptx(filename=file_path, **partition_kwargs) elif file_type == 'XLSX': return partition_xlsx(filename=file_path, **partition_kwargs) elif file_type == 'IMAGE': # Add image-specific kwargs image_kwargs = { 'strategy': 'hi_res', 'languages': ['eng'], **partition_kwargs } return partition_image(filename=file_path, **image_kwargs) elif file_type == 'EMAIL': return partition_email(filename=file_path, **partition_kwargs) elif file_type == 'JSON': return partition_json(filename=file_path, **partition_kwargs) elif file_type == 'CSV': return partition_csv(filename=file_path, **partition_kwargs) elif file_type == 'XML': return partition_xml(filename=file_path, **partition_kwargs) else: # Use auto partition for other file types return partition(filename=file_path, **partition_kwargs) except Exception as e: logger.error(f"Error partitioning file {file_path}: {str(e)}") raise Exception(f"Error processing {file_path}: {str(e)}") # Cell 4: Define element cleaning and processing functions def clean_elements(elements, cleaning_options=None): """ Clean elements based on selected options Args: elements: List of elements to clean cleaning_options: Dictionary of cleaning options to apply Returns: Cleaned elements """ if cleaning_options is None or not elements: return elements cleaned_elements = [] for element in elements: # Skip non-text elements if not hasattr(element, 'text'): cleaned_elements.append(element) continue # Apply cleaning operations based on selected options cleaned_text = element.text if cleaning_options.get('extra_whitespace', False): cleaned_text = clean_extra_whitespace(cleaned_text) if cleaning_options.get('unicode_quotes', False): cleaned_text = replace_unicode_quotes(cleaned_text) if cleaning_options.get('bullets', False): cleaned_text = clean_bullets(cleaned_text) if cleaning_options.get('dashes', False): cleaned_text = clean_dashes(cleaned_text) if cleaning_options.get('group_paragraphs', False): cleaned_text = group_broken_paragraphs(cleaned_text) if cleaning_options.get('remove_punctuation', False): cleaned_text = remove_punctuation(cleaned_text) # Update the element's text element.text = cleaned_text cleaned_elements.append(element) return cleaned_elements def extract_entities(elements, extraction_options=None): """ Extract entities from elements based on selected options using regex Args: elements: List of elements extraction_options: Dictionary of extraction options to apply Returns: Elements with extracted entities in metadata """ if extraction_options is None or not elements: return elements processed_elements = [] for element in elements: # Skip non-text elements if not hasattr(element, 'text'): processed_elements.append(element) continue # Initialize metadata if doesn't exist if not hasattr(element, 'metadata'): element.metadata = {} element.metadata['extracted_entities'] = {} # Extract entities based on selected options using regex if extraction_options.get('emails', False): element.metadata['extracted_entities']['emails'] = re.findall(EMAIL_PATTERN, element.text) if extraction_options.get('urls', False): element.metadata['extracted_entities']['urls'] = re.findall(URL_PATTERN, element.text) if extraction_options.get('phone_numbers', False): element.metadata['extracted_entities']['phone_numbers'] = re.findall(PHONE_PATTERN, element.text) if extraction_options.get('ip_addresses', False): element.metadata['extracted_entities']['ip_addresses'] = re.findall(IP_PATTERN, element.text) # Use spaCy for NER if selected if extraction_options.get('ner', False): doc = nlp(element.text) element.metadata['extracted_entities']['named_entities'] = [ {'text': ent.text, 'label': ent.label_} for ent in doc.ents ] processed_elements.append(element) return processed_elements def categorize_elements(elements): """ Categorize elements by type and provide statistics Args: elements: List of elements Returns: Dictionary with element statistics """ if not elements: return {} element_types = {} for element in elements: element_type = type(element).__name__ if element_type not in element_types: element_types[element_type] = 0 element_types[element_type] += 1 total_elements = len(elements) element_stats = { 'total': total_elements, 'by_type': element_types, 'type_percentages': {k: round(v/total_elements*100, 2) for k, v in element_types.items()} } return element_stats def chunk_elements(elements, chunking_method, **kwargs): """ Chunk elements using specified method Args: elements: List of elements to chunk chunking_method: Method to use for chunking **kwargs: Additional arguments for chunking method Returns: List of chunks """ if not elements: return [] try: if chunking_method == 'by_title': return chunk_by_title(elements, **kwargs) elif chunking_method == 'by_token': # Implement a simple version of token-based chunking from unstructured.chunking.base import Chunk max_chars = kwargs.get('max_characters', 2000) chunks = [] current_chunk = [] current_char_count = 0 for element in elements: if not hasattr(element, 'text'): # If the element has no text, just add it to the current chunk current_chunk.append(element) continue element_text_len = len(element.text) # If adding this element would exceed the max chars, start a new chunk if current_char_count + element_text_len > max_chars and current_chunk: chunks.append(Chunk(elements=current_chunk)) current_chunk = [element] current_char_count = element_text_len else: current_chunk.append(element) current_char_count += element_text_len # Add the last chunk if it's not empty if current_chunk: chunks.append(Chunk(elements=current_chunk)) return chunks else: # Default to title chunking return chunk_by_title(elements, **kwargs) except Exception as e: logger.error(f"Error chunking elements: {str(e)}") # If chunking fails, return single chunk with all elements from unstructured.chunking.base import Chunk return [Chunk(elements=elements)] # Cell 5: Define functions for visualization and analysis def visualize_element_distribution(element_stats): """ Create a bar chart of element type distribution Args: element_stats: Dictionary with element statistics Returns: Plotly figure """ if not element_stats or 'by_type' not in element_stats: return None element_types = list(element_stats['by_type'].keys()) element_counts = list(element_stats['by_type'].values()) fig = px.bar( x=element_types, y=element_counts, labels={'x': 'Element Type', 'y': 'Count'}, title='Distribution of Element Types', color=element_types, text=element_counts ) fig.update_layout( xaxis_title='Element Type', yaxis_title='Count', showlegend=False ) return fig def generate_embeddings(chunks, model_name): """ Generate embeddings for chunks Args: chunks: List of chunks model_name: Name of the embedding model to use Returns: Dictionary with chunk texts and embeddings """ if not chunks: return {} # Load model try: model = SentenceTransformer(model_name) except Exception as e: logger.error(f"Error loading embedding model: {str(e)}") raise Exception(f"Error loading embedding model {model_name}: {str(e)}") # Generate text for embedding chunk_texts = [] for chunk in chunks: chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) chunk_texts.append(chunk_text) # Generate embeddings embeddings = model.encode(chunk_texts, show_progress_bar=True) return { 'texts': chunk_texts, 'embeddings': embeddings, 'model': model_name, 'dimension': embeddings.shape[1] } def visualize_embeddings_tsne(embedding_data): """ Visualize embeddings using t-SNE Args: embedding_data: Dictionary with embeddings Returns: Plotly figure """ if not embedding_data or 'embeddings' not in embedding_data: return None from sklearn.manifold import TSNE # Apply t-SNE to reduce dimensions for visualization tsne = TSNE(n_components=2, random_state=42) reduced_embeddings = tsne.fit_transform(embedding_data['embeddings']) # Create DataFrame for plotting df = pd.DataFrame({ 'x': reduced_embeddings[:, 0], 'y': reduced_embeddings[:, 1], 'chunk_id': [f"Chunk {i+1}" for i in range(len(reduced_embeddings))] }) # Add text length as size df['text_length'] = [len(text) for text in embedding_data['texts']] # Normalize text length for sizing max_length = df['text_length'].max() df['size'] = df['text_length'].apply(lambda x: max(10, min(40, x / max_length * 40))) # Create plot fig = px.scatter( df, x='x', y='y', text='chunk_id', size='size', title=f"t-SNE Visualization of Document Embeddings ({embedding_data['model']})", hover_data=['text_length'] ) fig.update_traces( textposition='top center', marker=dict(sizemode='diameter') ) fig.update_layout( xaxis_title='t-SNE Dimension 1', yaxis_title='t-SNE Dimension 2', showlegend=False ) return fig def generate_similarity_matrix(embedding_data): """ Generate similarity matrix for chunks Args: embedding_data: Dictionary with embeddings Returns: Plotly figure with similarity matrix """ if not embedding_data or 'embeddings' not in embedding_data: return None # Calculate cosine similarity embeddings = embedding_data['embeddings'] similarity_matrix = util.cos_sim(embeddings, embeddings).numpy() # Create labels for each chunk labels = [f"Chunk {i+1}" for i in range(similarity_matrix.shape[0])] # Create heatmap fig = go.Figure(data=go.Heatmap( z=similarity_matrix, x=labels, y=labels, colorscale='Viridis', zmin=0, zmax=1 )) fig.update_layout( title='Semantic Similarity Between Chunks', xaxis_title='Chunk ID', yaxis_title='Chunk ID', ) return fig def extract_top_keywords(chunks, top_n=10): """ Extract top keywords from chunks using TF-IDF Args: chunks: List of chunks top_n: Number of top keywords to extract Returns: Dictionary with top keywords for each chunk """ if not chunks: return {} from sklearn.feature_extraction.text import TfidfVectorizer from nltk.corpus import stopwords # Get text from each chunk chunk_texts = [] for chunk in chunks: chunk_text = " ".join([e.text for e in chunk.elements if hasattr(e, 'text')]) chunk_texts.append(chunk_text) # Get English stopwords stop_words = set(stopwords.words('english')) # Initialize vectorizer vectorizer = TfidfVectorizer( max_features=1000, stop_words=stop_words, ngram_range=(1, 2) ) # Fit vectorizer try: tfidf_matrix = vectorizer.fit_transform(chunk_texts) except Exception as e: logger.error(f"Error extracting keywords: {str(e)}") return {} # Get feature names feature_names = vectorizer.get_feature_names_out() # Extract top keywords for each chunk top_keywords = {} for i, chunk_vec in enumerate(tfidf_matrix): # Convert sparse matrix to dense and get top indices dense = chunk_vec.todense() dense_list = dense.tolist()[0] sorted_indices = np.argsort(dense_list)[::-1][:top_n] # Get keywords and scores keywords = [(feature_names[idx], dense_list[idx]) for idx in sorted_indices] top_keywords[f"Chunk {i+1}"] = keywords return top_keywords def visualize_keywords(keywords_data): """ Visualize top keywords across chunks Args: keywords_data: Dictionary with keywords for each chunk Returns: Plotly figure """ if not keywords_data: return None # Prepare data for visualization data = [] for chunk_id, keywords in keywords_data.items(): for keyword, score in keywords: data.append({ 'chunk': chunk_id, 'keyword': keyword, 'score': score }) # Create DataFrame df = pd.DataFrame(data) # Create heatmap pivot_df = df.pivot(index='keyword', columns='chunk', values='score') # Sort by average score pivot_df['avg'] = pivot_df.mean(axis=1) pivot_df = pivot_df.sort_values('avg', ascending=False).drop('avg', axis=1) # Create figure fig = px.imshow( pivot_df, labels=dict(x="Chunk", y="Keyword", color="TF-IDF Score"), x=pivot_df.columns, y=pivot_df.index, color_continuous_scale="Viridis", aspect="auto" ) fig.update_layout( title='Top Keywords Across Chunks', height=600 ) return fig # Cell 6: Define functions for the final output formats def generate_final_output(chunks, embedding_data=None, processing_stats=None): """ Generate final structured output Args: chunks: List of chunks embedding_data: Dictionary with embeddings processing_stats: Dictionary with processing statistics Returns: Dictionary with final structured data """ if not chunks: return {} # Initialize final data structure final_data = { 'metadata': { 'timestamp': datetime.now().isoformat(), 'num_chunks': len(chunks), 'processing_stats': processing_stats or {} }, 'chunks': [] } # Get embeddings if available embeddings = embedding_data.get('embeddings', []) if embedding_data else [] # Process each chunk for i, chunk in enumerate(chunks): # Get text from chunk chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) # Get element types in chunk element_types = {} for e in chunk.elements: element_type = type(e).__name__ if element_type not in element_types: element_types[element_type] = 0 element_types[element_type] += 1 # Add chunk data chunk_data = { 'chunk_id': f"chunk_{i+1}", 'metadata': { 'element_types': element_types, 'num_elements': len(chunk.elements), 'text_length': len(chunk_text) }, 'text': chunk_text, 'elements': [convert_to_dict(e) for e in chunk.elements] } # Add embedding if available if i < len(embeddings): chunk_data['embedding'] = embeddings[i].tolist() final_data['chunks'].append(chunk_data) return final_data def format_for_qa(chunks): """ Format chunks for question answering Args: chunks: List of chunks Returns: List of documents in format suitable for QA systems """ if not chunks: return [] qa_docs = [] for i, chunk in enumerate(chunks): # Get text from chunk chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) # Create document doc = { 'id': f"chunk_{i+1}", 'content': chunk_text, 'metadata': { 'num_elements': len(chunk.elements), 'element_types': [type(e).__name__ for e in chunk.elements] } } qa_docs.append(doc) return qa_docs def format_for_transformers(chunks): """ Format chunks for HuggingFace transformers Args: chunks: List of chunks Returns: Dictionary with data formatted for transformers """ if not chunks: return {} # Create a simple format for transformers try: # Extract text from chunks texts = [] for chunk in chunks: chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) texts.append(chunk_text) # Create dataset structure transformer_data = { "text": texts, "metadata": [{"chunk_id": f"chunk_{i}"} for i in range(len(texts))] } return transformer_data except Exception as e: logger.error(f"Error formatting for transformers: {str(e)}") return {} def format_for_label_studio(elements): """ Format elements for Label Studio Args: elements: List of elements Returns: Dictionary with data formatted for Label Studio """ if not elements: return {} try: # Create a basic format for Label Studio label_studio_data = [] for i, element in enumerate(elements): if hasattr(element, 'text'): label_studio_data.append({ "id": i, "text": element.text, "element_type": type(element).__name__, "metadata": element.metadata if hasattr(element, 'metadata') else {} }) return label_studio_data except Exception as e: logger.error(f"Error formatting for Label Studio: {str(e)}") return {} # Cell 7: Build the Gradio interface components def process_files( files, partition_options, cleaning_options, extraction_options, chunking_method, chunking_options, embedding_model, output_format ): """ Main processing function for the Gradio interface Args: files: List of uploaded files partition_options: Dictionary of partitioning options cleaning_options: Dictionary of cleaning options extraction_options: Dictionary of extraction options chunking_method: Method to use for chunking chunking_options: Dictionary of chunking options embedding_model: Model to use for embeddings output_format: Format for final output Returns: Tuple of ( status_html, log_html, element_stats, element_chart, similarity_matrix, embedding_viz, keyword_viz, output_data ) """ # Create temp directory for uploads temp_dir = create_temp_dir() # Initialize status and logs status_html = "