# Cell 2: Import necessary libraries import os import re import json import time import nltk import spacy import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import plotly.express as px import plotly.graph_objects as go from IPython.display import display, HTML, clear_output from datetime import datetime from tqdm.auto import tqdm import tempfile import shutil import logging import warnings from pathlib import Path import gradio as gr # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) logger = logging.getLogger("UnstructuredApp") # Suppress warnings warnings.filterwarnings('ignore') # Download required NLTK data nltk.download('punkt', quiet=True) nltk.download('stopwords', quiet=True) nltk.download('wordnet', quiet=True) # Load spaCy model nlp = spacy.load("en_core_web_sm") # Import Unstructured components from unstructured.partition.auto import partition from unstructured.partition.pdf import partition_pdf from unstructured.partition.html import partition_html from unstructured.partition.pptx import partition_pptx from unstructured.partition.docx import partition_docx from unstructured.partition.xlsx import partition_xlsx from unstructured.partition.image import partition_image from unstructured.partition.email import partition_email from unstructured.partition.json import partition_json from unstructured.partition.csv import partition_csv from unstructured.partition.xml import partition_xml from unstructured.cleaners.core import ( clean_extra_whitespace, replace_unicode_quotes, clean_bullets, group_broken_paragraphs, clean_dashes, remove_punctuation ) # Use regex patterns instead of unavailable extract functions import re from unstructured.staging.base import elements_to_json from unstructured.chunking.title import chunk_by_title from unstructured.staging.base import convert_to_dict from unstructured.documents.elements import ( Title, Text, NarrativeText, ListItem, Table, Image, PageBreak, Footer, Header, Address ) # Define our own regex patterns for extraction EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' URL_PATTERN = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*/?' PHONE_PATTERN = r'(\+\d{1,3}[- ]?)?\(?\d{3}\)?[- ]?\d{3}[- ]?\d{4}' IP_PATTERN = r'\b(?:\d{1,3}\.){3}\d{1,3}\b' from sentence_transformers import SentenceTransformer, util # Cell 3: Define utility functions for file handling and processing def create_temp_dir(): """Create a temporary directory for file uploads""" temp_dir = tempfile.mkdtemp() return temp_dir def save_uploaded_file(file, temp_dir): """Save uploaded file to temporary directory""" if file is None: return None file_path = os.path.join(temp_dir, file.name) with open(file_path, 'wb') as f: f.write(file.read()) return file_path def get_file_extension(file_path): """Get file extension from path""" if file_path is None: return None return os.path.splitext(file_path)[1].lower() def identify_file_type(file_path): """Identify file type based on extension""" if file_path is None: return None ext = get_file_extension(file_path) file_types = { '.pdf': 'PDF', '.html': 'HTML', '.htm': 'HTML', '.docx': 'DOCX', '.doc': 'DOC', '.pptx': 'PPTX', '.ppt': 'PPT', '.xlsx': 'XLSX', '.xls': 'XLS', '.txt': 'TXT', '.csv': 'CSV', '.json': 'JSON', '.xml': 'XML', '.eml': 'EMAIL', '.msg': 'EMAIL', '.jpg': 'IMAGE', '.jpeg': 'IMAGE', '.png': 'IMAGE', '.tiff': 'IMAGE', '.tif': 'IMAGE' } return file_types.get(ext, 'UNKNOWN') def partition_file(file_path, partition_kwargs=None): """ Partition file using appropriate method based on file type Args: file_path: Path to the file partition_kwargs: Dictionary of kwargs for partition function Returns: List of elements """ if file_path is None: return [] if partition_kwargs is None: partition_kwargs = {} file_type = identify_file_type(file_path) try: if file_type == 'PDF': # Add PDF-specific kwargs pdf_kwargs = { 'extract_images': True, 'infer_table_structure': True, 'include_page_breaks': True, **partition_kwargs } return partition_pdf(filename=file_path, **pdf_kwargs) elif file_type == 'HTML': # Add HTML-specific kwargs html_kwargs = { 'extract_links': True, **partition_kwargs } return partition_html(filename=file_path, **html_kwargs) elif file_type == 'DOCX': return partition_docx(filename=file_path, **partition_kwargs) elif file_type == 'PPTX': return partition_pptx(filename=file_path, **partition_kwargs) elif file_type == 'XLSX': return partition_xlsx(filename=file_path, **partition_kwargs) elif file_type == 'IMAGE': # Add image-specific kwargs image_kwargs = { 'strategy': 'hi_res', 'languages': ['eng'], **partition_kwargs } return partition_image(filename=file_path, **image_kwargs) elif file_type == 'EMAIL': return partition_email(filename=file_path, **partition_kwargs) elif file_type == 'JSON': return partition_json(filename=file_path, **partition_kwargs) elif file_type == 'CSV': return partition_csv(filename=file_path, **partition_kwargs) elif file_type == 'XML': return partition_xml(filename=file_path, **partition_kwargs) else: # Use auto partition for other file types return partition(filename=file_path, **partition_kwargs) except Exception as e: logger.error(f"Error partitioning file {file_path}: {str(e)}") raise Exception(f"Error processing {file_path}: {str(e)}") # Cell 4: Define element cleaning and processing functions def clean_elements(elements, cleaning_options=None): """ Clean elements based on selected options Args: elements: List of elements to clean cleaning_options: Dictionary of cleaning options to apply Returns: Cleaned elements """ if cleaning_options is None or not elements: return elements cleaned_elements = [] for element in elements: # Skip non-text elements if not hasattr(element, 'text'): cleaned_elements.append(element) continue # Apply cleaning operations based on selected options cleaned_text = element.text if cleaning_options.get('extra_whitespace', False): cleaned_text = clean_extra_whitespace(cleaned_text) if cleaning_options.get('unicode_quotes', False): cleaned_text = replace_unicode_quotes(cleaned_text) if cleaning_options.get('bullets', False): cleaned_text = clean_bullets(cleaned_text) if cleaning_options.get('dashes', False): cleaned_text = clean_dashes(cleaned_text) if cleaning_options.get('group_paragraphs', False): cleaned_text = group_broken_paragraphs(cleaned_text) if cleaning_options.get('remove_punctuation', False): cleaned_text = remove_punctuation(cleaned_text) # Update the element's text element.text = cleaned_text cleaned_elements.append(element) return cleaned_elements def extract_entities(elements, extraction_options=None): """ Extract entities from elements based on selected options using regex Args: elements: List of elements extraction_options: Dictionary of extraction options to apply Returns: Elements with extracted entities in metadata """ if extraction_options is None or not elements: return elements processed_elements = [] for element in elements: # Skip non-text elements if not hasattr(element, 'text'): processed_elements.append(element) continue # Initialize metadata if doesn't exist if not hasattr(element, 'metadata'): element.metadata = {} element.metadata['extracted_entities'] = {} # Extract entities based on selected options using regex if extraction_options.get('emails', False): element.metadata['extracted_entities']['emails'] = re.findall(EMAIL_PATTERN, element.text) if extraction_options.get('urls', False): element.metadata['extracted_entities']['urls'] = re.findall(URL_PATTERN, element.text) if extraction_options.get('phone_numbers', False): element.metadata['extracted_entities']['phone_numbers'] = re.findall(PHONE_PATTERN, element.text) if extraction_options.get('ip_addresses', False): element.metadata['extracted_entities']['ip_addresses'] = re.findall(IP_PATTERN, element.text) # Use spaCy for NER if selected if extraction_options.get('ner', False): doc = nlp(element.text) element.metadata['extracted_entities']['named_entities'] = [ {'text': ent.text, 'label': ent.label_} for ent in doc.ents ] processed_elements.append(element) return processed_elements def categorize_elements(elements): """ Categorize elements by type and provide statistics Args: elements: List of elements Returns: Dictionary with element statistics """ if not elements: return {} element_types = {} for element in elements: element_type = type(element).__name__ if element_type not in element_types: element_types[element_type] = 0 element_types[element_type] += 1 total_elements = len(elements) element_stats = { 'total': total_elements, 'by_type': element_types, 'type_percentages': {k: round(v/total_elements*100, 2) for k, v in element_types.items()} } return element_stats def chunk_elements(elements, chunking_method, **kwargs): """ Chunk elements using specified method Args: elements: List of elements to chunk chunking_method: Method to use for chunking **kwargs: Additional arguments for chunking method Returns: List of chunks """ if not elements: return [] try: if chunking_method == 'by_title': return chunk_by_title(elements, **kwargs) elif chunking_method == 'by_token': # Implement a simple version of token-based chunking from unstructured.chunking.base import Chunk max_chars = kwargs.get('max_characters', 2000) chunks = [] current_chunk = [] current_char_count = 0 for element in elements: if not hasattr(element, 'text'): # If the element has no text, just add it to the current chunk current_chunk.append(element) continue element_text_len = len(element.text) # If adding this element would exceed the max chars, start a new chunk if current_char_count + element_text_len > max_chars and current_chunk: chunks.append(Chunk(elements=current_chunk)) current_chunk = [element] current_char_count = element_text_len else: current_chunk.append(element) current_char_count += element_text_len # Add the last chunk if it's not empty if current_chunk: chunks.append(Chunk(elements=current_chunk)) return chunks else: # Default to title chunking return chunk_by_title(elements, **kwargs) except Exception as e: logger.error(f"Error chunking elements: {str(e)}") # If chunking fails, return single chunk with all elements from unstructured.chunking.base import Chunk return [Chunk(elements=elements)] # Cell 5: Define functions for visualization and analysis def visualize_element_distribution(element_stats): """ Create a bar chart of element type distribution Args: element_stats: Dictionary with element statistics Returns: Plotly figure """ if not element_stats or 'by_type' not in element_stats: return None element_types = list(element_stats['by_type'].keys()) element_counts = list(element_stats['by_type'].values()) fig = px.bar( x=element_types, y=element_counts, labels={'x': 'Element Type', 'y': 'Count'}, title='Distribution of Element Types', color=element_types, text=element_counts ) fig.update_layout( xaxis_title='Element Type', yaxis_title='Count', showlegend=False ) return fig def generate_embeddings(chunks, model_name): """ Generate embeddings for chunks Args: chunks: List of chunks model_name: Name of the embedding model to use Returns: Dictionary with chunk texts and embeddings """ if not chunks: return {} # Load model try: model = SentenceTransformer(model_name) except Exception as e: logger.error(f"Error loading embedding model: {str(e)}") raise Exception(f"Error loading embedding model {model_name}: {str(e)}") # Generate text for embedding chunk_texts = [] for chunk in chunks: chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) chunk_texts.append(chunk_text) # Generate embeddings embeddings = model.encode(chunk_texts, show_progress_bar=True) return { 'texts': chunk_texts, 'embeddings': embeddings, 'model': model_name, 'dimension': embeddings.shape[1] } def visualize_embeddings_tsne(embedding_data): """ Visualize embeddings using t-SNE Args: embedding_data: Dictionary with embeddings Returns: Plotly figure """ if not embedding_data or 'embeddings' not in embedding_data: return None from sklearn.manifold import TSNE # Apply t-SNE to reduce dimensions for visualization tsne = TSNE(n_components=2, random_state=42) reduced_embeddings = tsne.fit_transform(embedding_data['embeddings']) # Create DataFrame for plotting df = pd.DataFrame({ 'x': reduced_embeddings[:, 0], 'y': reduced_embeddings[:, 1], 'chunk_id': [f"Chunk {i+1}" for i in range(len(reduced_embeddings))] }) # Add text length as size df['text_length'] = [len(text) for text in embedding_data['texts']] # Normalize text length for sizing max_length = df['text_length'].max() df['size'] = df['text_length'].apply(lambda x: max(10, min(40, x / max_length * 40))) # Create plot fig = px.scatter( df, x='x', y='y', text='chunk_id', size='size', title=f"t-SNE Visualization of Document Embeddings ({embedding_data['model']})", hover_data=['text_length'] ) fig.update_traces( textposition='top center', marker=dict(sizemode='diameter') ) fig.update_layout( xaxis_title='t-SNE Dimension 1', yaxis_title='t-SNE Dimension 2', showlegend=False ) return fig def generate_similarity_matrix(embedding_data): """ Generate similarity matrix for chunks Args: embedding_data: Dictionary with embeddings Returns: Plotly figure with similarity matrix """ if not embedding_data or 'embeddings' not in embedding_data: return None # Calculate cosine similarity embeddings = embedding_data['embeddings'] similarity_matrix = util.cos_sim(embeddings, embeddings).numpy() # Create labels for each chunk labels = [f"Chunk {i+1}" for i in range(similarity_matrix.shape[0])] # Create heatmap fig = go.Figure(data=go.Heatmap( z=similarity_matrix, x=labels, y=labels, colorscale='Viridis', zmin=0, zmax=1 )) fig.update_layout( title='Semantic Similarity Between Chunks', xaxis_title='Chunk ID', yaxis_title='Chunk ID', ) return fig def extract_top_keywords(chunks, top_n=10): """ Extract top keywords from chunks using TF-IDF Args: chunks: List of chunks top_n: Number of top keywords to extract Returns: Dictionary with top keywords for each chunk """ if not chunks: return {} from sklearn.feature_extraction.text import TfidfVectorizer from nltk.corpus import stopwords # Get text from each chunk chunk_texts = [] for chunk in chunks: chunk_text = " ".join([e.text for e in chunk.elements if hasattr(e, 'text')]) chunk_texts.append(chunk_text) # Get English stopwords stop_words = set(stopwords.words('english')) # Initialize vectorizer vectorizer = TfidfVectorizer( max_features=1000, stop_words=stop_words, ngram_range=(1, 2) ) # Fit vectorizer try: tfidf_matrix = vectorizer.fit_transform(chunk_texts) except Exception as e: logger.error(f"Error extracting keywords: {str(e)}") return {} # Get feature names feature_names = vectorizer.get_feature_names_out() # Extract top keywords for each chunk top_keywords = {} for i, chunk_vec in enumerate(tfidf_matrix): # Convert sparse matrix to dense and get top indices dense = chunk_vec.todense() dense_list = dense.tolist()[0] sorted_indices = np.argsort(dense_list)[::-1][:top_n] # Get keywords and scores keywords = [(feature_names[idx], dense_list[idx]) for idx in sorted_indices] top_keywords[f"Chunk {i+1}"] = keywords return top_keywords def visualize_keywords(keywords_data): """ Visualize top keywords across chunks Args: keywords_data: Dictionary with keywords for each chunk Returns: Plotly figure """ if not keywords_data: return None # Prepare data for visualization data = [] for chunk_id, keywords in keywords_data.items(): for keyword, score in keywords: data.append({ 'chunk': chunk_id, 'keyword': keyword, 'score': score }) # Create DataFrame df = pd.DataFrame(data) # Create heatmap pivot_df = df.pivot(index='keyword', columns='chunk', values='score') # Sort by average score pivot_df['avg'] = pivot_df.mean(axis=1) pivot_df = pivot_df.sort_values('avg', ascending=False).drop('avg', axis=1) # Create figure fig = px.imshow( pivot_df, labels=dict(x="Chunk", y="Keyword", color="TF-IDF Score"), x=pivot_df.columns, y=pivot_df.index, color_continuous_scale="Viridis", aspect="auto" ) fig.update_layout( title='Top Keywords Across Chunks', height=600 ) return fig # Cell 6: Define functions for the final output formats def generate_final_output(chunks, embedding_data=None, processing_stats=None): """ Generate final structured output Args: chunks: List of chunks embedding_data: Dictionary with embeddings processing_stats: Dictionary with processing statistics Returns: Dictionary with final structured data """ if not chunks: return {} # Initialize final data structure final_data = { 'metadata': { 'timestamp': datetime.now().isoformat(), 'num_chunks': len(chunks), 'processing_stats': processing_stats or {} }, 'chunks': [] } # Get embeddings if available embeddings = embedding_data.get('embeddings', []) if embedding_data else [] # Process each chunk for i, chunk in enumerate(chunks): # Get text from chunk chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) # Get element types in chunk element_types = {} for e in chunk.elements: element_type = type(e).__name__ if element_type not in element_types: element_types[element_type] = 0 element_types[element_type] += 1 # Add chunk data chunk_data = { 'chunk_id': f"chunk_{i+1}", 'metadata': { 'element_types': element_types, 'num_elements': len(chunk.elements), 'text_length': len(chunk_text) }, 'text': chunk_text, 'elements': [convert_to_dict(e) for e in chunk.elements] } # Add embedding if available if i < len(embeddings): chunk_data['embedding'] = embeddings[i].tolist() final_data['chunks'].append(chunk_data) return final_data def format_for_qa(chunks): """ Format chunks for question answering Args: chunks: List of chunks Returns: List of documents in format suitable for QA systems """ if not chunks: return [] qa_docs = [] for i, chunk in enumerate(chunks): # Get text from chunk chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) # Create document doc = { 'id': f"chunk_{i+1}", 'content': chunk_text, 'metadata': { 'num_elements': len(chunk.elements), 'element_types': [type(e).__name__ for e in chunk.elements] } } qa_docs.append(doc) return qa_docs def format_for_transformers(chunks): """ Format chunks for HuggingFace transformers Args: chunks: List of chunks Returns: Dictionary with data formatted for transformers """ if not chunks: return {} # Create a simple format for transformers try: # Extract text from chunks texts = [] for chunk in chunks: chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) texts.append(chunk_text) # Create dataset structure transformer_data = { "text": texts, "metadata": [{"chunk_id": f"chunk_{i}"} for i in range(len(texts))] } return transformer_data except Exception as e: logger.error(f"Error formatting for transformers: {str(e)}") return {} def format_for_label_studio(elements): """ Format elements for Label Studio Args: elements: List of elements Returns: Dictionary with data formatted for Label Studio """ if not elements: return {} try: # Create a basic format for Label Studio label_studio_data = [] for i, element in enumerate(elements): if hasattr(element, 'text'): label_studio_data.append({ "id": i, "text": element.text, "element_type": type(element).__name__, "metadata": element.metadata if hasattr(element, 'metadata') else {} }) return label_studio_data except Exception as e: logger.error(f"Error formatting for Label Studio: {str(e)}") return {} # Cell 7: Build the Gradio interface components def process_files( files, partition_options, cleaning_options, extraction_options, chunking_method, chunking_options, embedding_model, output_format ): """ Main processing function for the Gradio interface Args: files: List of uploaded files partition_options: Dictionary of partitioning options cleaning_options: Dictionary of cleaning options extraction_options: Dictionary of extraction options chunking_method: Method to use for chunking chunking_options: Dictionary of chunking options embedding_model: Model to use for embeddings output_format: Format for final output Returns: Tuple of ( status_html, log_html, element_stats, element_chart, similarity_matrix, embedding_viz, keyword_viz, output_data ) """ # Create temp directory for uploads temp_dir = create_temp_dir() # Initialize status and logs status_html = "
Initializing processing pipeline...
" log_html = "
" log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Starting document processing pipeline\n" try: # Save uploaded files file_paths = [] for file in files: if file is None: continue file_path = save_uploaded_file(file, temp_dir) file_paths.append(file_path) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Saved {file.name} to temporary directory\n" if not file_paths: status_html = "
No files were uploaded. Please upload at least one file.
" log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Error: No files were uploaded\n" log_html += "
" return status_html, log_html, None, None, None, None, None, None # Process each file all_elements = [] for file_path in file_paths: file_name = os.path.basename(file_path) file_type = identify_file_type(file_path) status_html = f"
Processing {file_name} ({file_type})...
" log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Processing {file_name} ({file_type})\n" # Partition file partition_kwargs = {k: v for k, v in partition_options.items() if v} elements = partition_file(file_path, partition_kwargs) # Add source information to elements for element in elements: if not hasattr(element, 'metadata'): element.metadata = {} element.metadata.update({ 'source_filename': file_name, 'source_filetype': file_type }) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Extracted {len(elements)} elements from {file_name}\n" all_elements.extend(elements) # Process all elements status_html = "
Cleaning and processing elements...
" log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Processing {len(all_elements)} elements\n" # Clean elements cleaning_kwargs = {k: v for k, v in cleaning_options.items() if v} if cleaning_kwargs: cleaned_elements = clean_elements(all_elements, cleaning_kwargs) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Applied {len(cleaning_kwargs)} cleaning operations\n" else: cleaned_elements = all_elements log_html += f"[{datetime.now().strftime('%H:%M:%S')}] No cleaning operations selected\n" # Extract entities extraction_kwargs = {k: v for k, v in extraction_options.items() if v} if extraction_kwargs: processed_elements = extract_entities(cleaned_elements, extraction_kwargs) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Applied {len(extraction_kwargs)} extraction operations\n" else: processed_elements = cleaned_elements log_html += f"[{datetime.now().strftime('%H:%M:%S')}] No extraction operations selected\n" # Categorize elements element_stats = categorize_elements(processed_elements) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Categorized {element_stats['total']} elements into {len(element_stats['by_type'])} types\n" # Create element distribution chart element_chart = visualize_element_distribution(element_stats) # Chunk elements status_html = "
Chunking elements...
" chunking_kwargs = {k: v for k, v in chunking_options.items() if v} chunks = chunk_elements(processed_elements, chunking_method, **chunking_kwargs) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Created {len(chunks)} chunks using {chunking_method} method\n" # Extract keywords status_html = "
Extracting keywords...
" keywords_data = extract_top_keywords(chunks) keyword_viz = visualize_keywords(keywords_data) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Extracted keywords from {len(keywords_data)} chunks\n" # Generate embeddings if embedding_model: status_html = f"
Generating embeddings using {embedding_model}...
" embedding_data = generate_embeddings(chunks, embedding_model) # Create embedding visualizations embedding_viz = visualize_embeddings_tsne(embedding_data) similarity_matrix = generate_similarity_matrix(embedding_data) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated {embedding_data['dimension']}-dimensional embeddings\n" else: embedding_data = None embedding_viz = None similarity_matrix = None log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Skipped embedding generation (no model selected)\n" # Generate final output status_html = "
Generating final output...
" processing_stats = { 'num_files': len(file_paths), 'file_types': [identify_file_type(fp) for fp in file_paths], 'total_elements': element_stats['total'], 'element_types': element_stats['by_type'], 'num_chunks': len(chunks) } if output_format == 'json': output_data = generate_final_output(chunks, embedding_data, processing_stats) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated JSON output with {len(output_data['chunks'])} chunks\n" elif output_format == 'qa': output_data = format_for_qa(chunks) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated Q&A format with {len(output_data)} documents\n" elif output_format == 'transformers': output_data = format_for_transformers(chunks) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated Transformer format\n" elif output_format == 'label_studio': output_data = format_for_label_studio(processed_elements) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated Label Studio format\n" else: # Default to JSON output_data = generate_final_output(chunks, embedding_data, processing_stats) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated default JSON output\n" status_html = "
Processing complete! ✅
" log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Successfully completed document processing pipeline\n" except Exception as e: status_html = f"
Error in processing: {str(e)}
" log_html += f"[{datetime.now().strftime('%H:%M:%S')}] ERROR: {str(e)}\n" element_stats = None element_chart = None embedding_viz = None similarity_matrix = None keyword_viz = None output_data = None finally: # Clean up temp directory try: shutil.rmtree(temp_dir) log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Cleaned up temporary files\n" except Exception as e: log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Warning: Failed to clean temporary files: {str(e)}\n" log_html += "" return status_html, log_html, element_stats, element_chart, similarity_matrix, embedding_viz, keyword_viz, output_data # Cell 8: Define the Gradio interface def build_gradio_interface(): """ Build and launch the Gradio interface """ # Define theme custom_theme = gr.themes.Default( primary_hue="indigo", secondary_hue="purple", ) # Create interface with gr.Blocks(theme=custom_theme, title="Unstructured Document Processing") as app: gr.Markdown(""" # 📄 Unstructured Document Processing Pipeline This application demonstrates a comprehensive document processing pipeline using the [Unstructured](https://unstructured.io/) library. Upload one or more documents to process them through partitioning, cleaning, extraction, chunking, and embedding. **Supported file formats**: PDF, DOCX, PPTX, XLSX, HTML, CSV, JSON, XML, Email, Images (JPG, PNG) """) # File upload section with gr.Row(): with gr.Column(scale=3): files = gr.File( file_count="multiple", label="Upload Documents", type="binary", file_types=[ ".pdf", ".docx", ".pptx", ".xlsx", ".html", ".htm", ".csv", ".json", ".xml", ".eml", ".msg", ".jpg", ".jpeg", ".png", ".txt" ] ) with gr.Column(scale=2): with gr.Accordion("Status", open=True): status = gr.HTML(value="
Waiting for files...
") with gr.Accordion("Processing Log", open=True): log = gr.HTML(value="
Processing log will appear here...
") # Processing options with gr.Tabs(): # Partitioning options with gr.TabItem("Partitioning"): gr.Markdown("### Document Partitioning Options") with gr.Row(): with gr.Column(): partition_options = { "extract_images": gr.Checkbox(value=True, label="Extract Images", info="Extract images from documents"), "infer_table_structure": gr.Checkbox(value=True, label="Infer Table Structure", info="Extract tables with structure"), "include_page_breaks": gr.Checkbox(value=True, label="Include Page Breaks", info="Include page break elements"), "include_metadata": gr.Checkbox(value=True, label="Include Metadata", info="Extract document metadata"), "strategy": gr.Radio(choices=["fast", "hi_res", "ocr_only"], value="hi_res", label="OCR Strategy (for images/scanned docs)", info="Fast is quicker but less accurate") } # Cleaning options with gr.TabItem("Cleaning"): gr.Markdown("### Text Cleaning Options") with gr.Row(): with gr.Column(): cleaning_options = { "extra_whitespace": gr.Checkbox(value=True, label="Clean Extra Whitespace", info="Remove redundant whitespace"), "unicode_quotes": gr.Checkbox(value=True, label="Replace Unicode Quotes", info="Normalize quotes to ASCII"), "bullets": gr.Checkbox(value=True, label="Clean Bullets", info="Standardize bullet points"), "dashes": gr.Checkbox(value=True, label="Clean Dashes", info="Standardize dashes"), "group_paragraphs": gr.Checkbox(value=False, label="Group Broken Paragraphs", info="Combine paragraphs split across pages"), } with gr.Column(): cleaning_options.update({ "remove_punctuation": gr.Checkbox(value=False, label="Remove Punctuation", info="Remove all punctuation") }) # Extraction options with gr.TabItem("Extraction"): gr.Markdown("### Entity Extraction Options") with gr.Row(): with gr.Column(): extraction_options = { "emails": gr.Checkbox(value=True, label="Extract Emails", info="Extract email addresses"), "urls": gr.Checkbox(value=True, label="Extract URLs", info="Extract URLs"), "phone_numbers": gr.Checkbox(value=True, label="Extract Phone Numbers", info="Extract phone numbers"), "ip_addresses": gr.Checkbox(value=False, label="Extract IP Addresses", info="Extract IP addresses"), "ner": gr.Checkbox(value=True, label="Named Entity Recognition", info="Extract named entities (people, orgs, locations)") } # Chunking options with gr.TabItem("Chunking"): gr.Markdown("### Text Chunking Options") with gr.Row(): with gr.Column(): chunking_method = gr.Radio( choices=["by_title", "by_token"], value="by_title", label="Chunking Method", info="How to divide the document into chunks" ) with gr.Column(): chunking_options = { "max_characters": gr.Number(value=2000, label="Max Characters (by_token)", info="Maximum characters per chunk"), "combine_text_under_n_chars": gr.Number(value=300, label="Combine Small Text (by_title)", info="Combine sections smaller than this") } # Embedding options with gr.TabItem("Embedding"): gr.Markdown("### Embedding Generation Options") with gr.Row(): embedding_model = gr.Dropdown( choices=[ "all-MiniLM-L6-v2", "paraphrase-multilingual-MiniLM-L12-v2", "all-mpnet-base-v2", "sentence-t5-base", "" # Empty option to skip embedding ], value="all-MiniLM-L6-v2", label="Embedding Model", info="Select a model for generating embeddings (or empty to skip)" ) # Output format options with gr.TabItem("Output Format"): gr.Markdown("### Output Format Options") with gr.Row(): output_format = gr.Radio( choices=["json", "qa", "transformers", "label_studio"], value="json", label="Output Format", info="Format for the final processed output" ) # Process button process_btn = gr.Button("Process Documents", variant="primary") # Results section with gr.Tabs(): with gr.TabItem("Element Analysis"): with gr.Row(): element_stats_json = gr.JSON(label="Element Statistics") element_dist_chart = gr.Plot(label="Element Distribution") with gr.TabItem("Semantic Analysis"): with gr.Row(): keyword_viz_plot = gr.Plot(label="Keyword Analysis") with gr.Row(): embedding_viz_plot = gr.Plot(label="Embedding Visualization") similarity_matrix_plot = gr.Plot(label="Semantic Similarity Matrix") with gr.TabItem("Processed Output"): output_data_json = gr.JSON(label="Processed Data") # Set up event handlers process_btn.click( fn=process_files, inputs=[ files, gr.Group(list(partition_options.values())), gr.Group(list(cleaning_options.values())), gr.Group(list(extraction_options.values())), chunking_method, gr.Group(list(chunking_options.values())), embedding_model, output_format ], outputs=[ status, log, element_stats_json, element_dist_chart, similarity_matrix_plot, embedding_viz_plot, keyword_viz_plot, output_data_json ] ) # Examples gr.Examples( examples=[ [ # Example with default settings - user would upload their own files None ] ], inputs=[files], ) # Add markdown with instructions with gr.Accordion("Instructions", open=False): gr.Markdown(""" ## How to Use This App 1. **Upload Documents**: Start by uploading one or more documents in the supported formats. 2. **Configure Processing Options**: - **Partitioning**: Control how documents are broken into elements - **Cleaning**: Select text cleaning operations to apply - **Extraction**: Choose entities to extract from the text - **Chunking**: Set how elements are grouped into chunks - **Embedding**: Select a model for generating vector embeddings - **Output Format**: Choose the format of the final processed data 3. **Process Documents**: Click the "Process Documents" button to start the pipeline 4. **Analyze Results**: - **Element Analysis**: View statistics and distribution of document elements - **Semantic Analysis**: Explore keyword distribution and semantic relationships - **Processed Output**: View the final structured data ready for use with LLMs ## Typical Use Cases - **Content Extraction**: Extract structured content from unstructured documents - **Document Understanding**: Analyze and categorize document components - **Text Preprocessing**: Prepare text for further NLP or machine learning - **Knowledge Base Creation**: Convert documents into semantic chunks for retrieval - **LLM Integration**: Structure documents for use with large language models """) return app # Cell 9: Launch the application # Create and launch the app app = build_gradio_interface() app.launch(debug=True)