Spaces:
Runtime error
Runtime error
# Cell 2: Import necessary libraries | |
import os | |
import re | |
import json | |
import time | |
import nltk | |
import spacy | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from IPython.display import display, HTML, clear_output | |
from datetime import datetime | |
from tqdm.auto import tqdm | |
import tempfile | |
import shutil | |
import logging | |
import warnings | |
from pathlib import Path | |
import gradio as gr | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[logging.StreamHandler()] | |
) | |
logger = logging.getLogger("UnstructuredApp") | |
# Suppress warnings | |
warnings.filterwarnings('ignore') | |
# Download required NLTK data | |
nltk.download('punkt', quiet=True) | |
nltk.download('stopwords', quiet=True) | |
nltk.download('wordnet', quiet=True) | |
# Load spaCy model | |
nlp = spacy.load("en_core_web_sm") | |
# Import Unstructured components | |
from unstructured.partition.auto import partition | |
from unstructured.partition.pdf import partition_pdf | |
from unstructured.partition.html import partition_html | |
from unstructured.partition.pptx import partition_pptx | |
from unstructured.partition.docx import partition_docx | |
from unstructured.partition.xlsx import partition_xlsx | |
from unstructured.partition.image import partition_image | |
from unstructured.partition.email import partition_email | |
from unstructured.partition.json import partition_json | |
from unstructured.partition.csv import partition_csv | |
from unstructured.partition.xml import partition_xml | |
from unstructured.cleaners.core import ( | |
clean_extra_whitespace, | |
replace_unicode_quotes, | |
clean_bullets, | |
group_broken_paragraphs, | |
clean_dashes, | |
remove_punctuation | |
) | |
# Use regex patterns instead of unavailable extract functions | |
import re | |
from unstructured.staging.base import elements_to_json | |
from unstructured.chunking.title import chunk_by_title | |
from unstructured.staging.base import convert_to_dict | |
from unstructured.documents.elements import ( | |
Title, Text, NarrativeText, ListItem, | |
Table, Image, PageBreak, Footer, Header, | |
Address | |
) | |
# Define our own regex patterns for extraction | |
EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' | |
URL_PATTERN = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*/?' | |
PHONE_PATTERN = r'(\+\d{1,3}[- ]?)?\(?\d{3}\)?[- ]?\d{3}[- ]?\d{4}' | |
IP_PATTERN = r'\b(?:\d{1,3}\.){3}\d{1,3}\b' | |
from sentence_transformers import SentenceTransformer, util | |
# Cell 3: Define utility functions for file handling and processing | |
def create_temp_dir(): | |
"""Create a temporary directory for file uploads""" | |
temp_dir = tempfile.mkdtemp() | |
return temp_dir | |
def save_uploaded_file(file, temp_dir): | |
"""Save uploaded file to temporary directory""" | |
if file is None: | |
return None | |
file_path = os.path.join(temp_dir, file.name) | |
with open(file_path, 'wb') as f: | |
f.write(file.read()) | |
return file_path | |
def get_file_extension(file_path): | |
"""Get file extension from path""" | |
if file_path is None: | |
return None | |
return os.path.splitext(file_path)[1].lower() | |
def identify_file_type(file_path): | |
"""Identify file type based on extension""" | |
if file_path is None: | |
return None | |
ext = get_file_extension(file_path) | |
file_types = { | |
'.pdf': 'PDF', | |
'.html': 'HTML', | |
'.htm': 'HTML', | |
'.docx': 'DOCX', | |
'.doc': 'DOC', | |
'.pptx': 'PPTX', | |
'.ppt': 'PPT', | |
'.xlsx': 'XLSX', | |
'.xls': 'XLS', | |
'.txt': 'TXT', | |
'.csv': 'CSV', | |
'.json': 'JSON', | |
'.xml': 'XML', | |
'.eml': 'EMAIL', | |
'.msg': 'EMAIL', | |
'.jpg': 'IMAGE', | |
'.jpeg': 'IMAGE', | |
'.png': 'IMAGE', | |
'.tiff': 'IMAGE', | |
'.tif': 'IMAGE' | |
} | |
return file_types.get(ext, 'UNKNOWN') | |
def partition_file(file_path, partition_kwargs=None): | |
""" | |
Partition file using appropriate method based on file type | |
Args: | |
file_path: Path to the file | |
partition_kwargs: Dictionary of kwargs for partition function | |
Returns: | |
List of elements | |
""" | |
if file_path is None: | |
return [] | |
if partition_kwargs is None: | |
partition_kwargs = {} | |
file_type = identify_file_type(file_path) | |
try: | |
if file_type == 'PDF': | |
# Add PDF-specific kwargs | |
pdf_kwargs = { | |
'extract_images': True, | |
'infer_table_structure': True, | |
'include_page_breaks': True, | |
**partition_kwargs | |
} | |
return partition_pdf(filename=file_path, **pdf_kwargs) | |
elif file_type == 'HTML': | |
# Add HTML-specific kwargs | |
html_kwargs = { | |
'extract_links': True, | |
**partition_kwargs | |
} | |
return partition_html(filename=file_path, **html_kwargs) | |
elif file_type == 'DOCX': | |
return partition_docx(filename=file_path, **partition_kwargs) | |
elif file_type == 'PPTX': | |
return partition_pptx(filename=file_path, **partition_kwargs) | |
elif file_type == 'XLSX': | |
return partition_xlsx(filename=file_path, **partition_kwargs) | |
elif file_type == 'IMAGE': | |
# Add image-specific kwargs | |
image_kwargs = { | |
'strategy': 'hi_res', | |
'languages': ['eng'], | |
**partition_kwargs | |
} | |
return partition_image(filename=file_path, **image_kwargs) | |
elif file_type == 'EMAIL': | |
return partition_email(filename=file_path, **partition_kwargs) | |
elif file_type == 'JSON': | |
return partition_json(filename=file_path, **partition_kwargs) | |
elif file_type == 'CSV': | |
return partition_csv(filename=file_path, **partition_kwargs) | |
elif file_type == 'XML': | |
return partition_xml(filename=file_path, **partition_kwargs) | |
else: | |
# Use auto partition for other file types | |
return partition(filename=file_path, **partition_kwargs) | |
except Exception as e: | |
logger.error(f"Error partitioning file {file_path}: {str(e)}") | |
raise Exception(f"Error processing {file_path}: {str(e)}") | |
# Cell 4: Define element cleaning and processing functions | |
def clean_elements(elements, cleaning_options=None): | |
""" | |
Clean elements based on selected options | |
Args: | |
elements: List of elements to clean | |
cleaning_options: Dictionary of cleaning options to apply | |
Returns: | |
Cleaned elements | |
""" | |
if cleaning_options is None or not elements: | |
return elements | |
cleaned_elements = [] | |
for element in elements: | |
# Skip non-text elements | |
if not hasattr(element, 'text'): | |
cleaned_elements.append(element) | |
continue | |
# Apply cleaning operations based on selected options | |
cleaned_text = element.text | |
if cleaning_options.get('extra_whitespace', False): | |
cleaned_text = clean_extra_whitespace(cleaned_text) | |
if cleaning_options.get('unicode_quotes', False): | |
cleaned_text = replace_unicode_quotes(cleaned_text) | |
if cleaning_options.get('bullets', False): | |
cleaned_text = clean_bullets(cleaned_text) | |
if cleaning_options.get('dashes', False): | |
cleaned_text = clean_dashes(cleaned_text) | |
if cleaning_options.get('group_paragraphs', False): | |
cleaned_text = group_broken_paragraphs(cleaned_text) | |
if cleaning_options.get('remove_punctuation', False): | |
cleaned_text = remove_punctuation(cleaned_text) | |
# Update the element's text | |
element.text = cleaned_text | |
cleaned_elements.append(element) | |
return cleaned_elements | |
def extract_entities(elements, extraction_options=None): | |
""" | |
Extract entities from elements based on selected options using regex | |
Args: | |
elements: List of elements | |
extraction_options: Dictionary of extraction options to apply | |
Returns: | |
Elements with extracted entities in metadata | |
""" | |
if extraction_options is None or not elements: | |
return elements | |
processed_elements = [] | |
for element in elements: | |
# Skip non-text elements | |
if not hasattr(element, 'text'): | |
processed_elements.append(element) | |
continue | |
# Initialize metadata if doesn't exist | |
if not hasattr(element, 'metadata'): | |
element.metadata = {} | |
element.metadata['extracted_entities'] = {} | |
# Extract entities based on selected options using regex | |
if extraction_options.get('emails', False): | |
element.metadata['extracted_entities']['emails'] = re.findall(EMAIL_PATTERN, element.text) | |
if extraction_options.get('urls', False): | |
element.metadata['extracted_entities']['urls'] = re.findall(URL_PATTERN, element.text) | |
if extraction_options.get('phone_numbers', False): | |
element.metadata['extracted_entities']['phone_numbers'] = re.findall(PHONE_PATTERN, element.text) | |
if extraction_options.get('ip_addresses', False): | |
element.metadata['extracted_entities']['ip_addresses'] = re.findall(IP_PATTERN, element.text) | |
# Use spaCy for NER if selected | |
if extraction_options.get('ner', False): | |
doc = nlp(element.text) | |
element.metadata['extracted_entities']['named_entities'] = [ | |
{'text': ent.text, 'label': ent.label_} for ent in doc.ents | |
] | |
processed_elements.append(element) | |
return processed_elements | |
def categorize_elements(elements): | |
""" | |
Categorize elements by type and provide statistics | |
Args: | |
elements: List of elements | |
Returns: | |
Dictionary with element statistics | |
""" | |
if not elements: | |
return {} | |
element_types = {} | |
for element in elements: | |
element_type = type(element).__name__ | |
if element_type not in element_types: | |
element_types[element_type] = 0 | |
element_types[element_type] += 1 | |
total_elements = len(elements) | |
element_stats = { | |
'total': total_elements, | |
'by_type': element_types, | |
'type_percentages': {k: round(v/total_elements*100, 2) for k, v in element_types.items()} | |
} | |
return element_stats | |
def chunk_elements(elements, chunking_method, **kwargs): | |
""" | |
Chunk elements using specified method | |
Args: | |
elements: List of elements to chunk | |
chunking_method: Method to use for chunking | |
**kwargs: Additional arguments for chunking method | |
Returns: | |
List of chunks | |
""" | |
if not elements: | |
return [] | |
try: | |
if chunking_method == 'by_title': | |
return chunk_by_title(elements, **kwargs) | |
elif chunking_method == 'by_token': | |
# Implement a simple version of token-based chunking | |
from unstructured.chunking.base import Chunk | |
max_chars = kwargs.get('max_characters', 2000) | |
chunks = [] | |
current_chunk = [] | |
current_char_count = 0 | |
for element in elements: | |
if not hasattr(element, 'text'): | |
# If the element has no text, just add it to the current chunk | |
current_chunk.append(element) | |
continue | |
element_text_len = len(element.text) | |
# If adding this element would exceed the max chars, start a new chunk | |
if current_char_count + element_text_len > max_chars and current_chunk: | |
chunks.append(Chunk(elements=current_chunk)) | |
current_chunk = [element] | |
current_char_count = element_text_len | |
else: | |
current_chunk.append(element) | |
current_char_count += element_text_len | |
# Add the last chunk if it's not empty | |
if current_chunk: | |
chunks.append(Chunk(elements=current_chunk)) | |
return chunks | |
else: | |
# Default to title chunking | |
return chunk_by_title(elements, **kwargs) | |
except Exception as e: | |
logger.error(f"Error chunking elements: {str(e)}") | |
# If chunking fails, return single chunk with all elements | |
from unstructured.chunking.base import Chunk | |
return [Chunk(elements=elements)] | |
# Cell 5: Define functions for visualization and analysis | |
def visualize_element_distribution(element_stats): | |
""" | |
Create a bar chart of element type distribution | |
Args: | |
element_stats: Dictionary with element statistics | |
Returns: | |
Plotly figure | |
""" | |
if not element_stats or 'by_type' not in element_stats: | |
return None | |
element_types = list(element_stats['by_type'].keys()) | |
element_counts = list(element_stats['by_type'].values()) | |
fig = px.bar( | |
x=element_types, | |
y=element_counts, | |
labels={'x': 'Element Type', 'y': 'Count'}, | |
title='Distribution of Element Types', | |
color=element_types, | |
text=element_counts | |
) | |
fig.update_layout( | |
xaxis_title='Element Type', | |
yaxis_title='Count', | |
showlegend=False | |
) | |
return fig | |
def generate_embeddings(chunks, model_name): | |
""" | |
Generate embeddings for chunks | |
Args: | |
chunks: List of chunks | |
model_name: Name of the embedding model to use | |
Returns: | |
Dictionary with chunk texts and embeddings | |
""" | |
if not chunks: | |
return {} | |
# Load model | |
try: | |
model = SentenceTransformer(model_name) | |
except Exception as e: | |
logger.error(f"Error loading embedding model: {str(e)}") | |
raise Exception(f"Error loading embedding model {model_name}: {str(e)}") | |
# Generate text for embedding | |
chunk_texts = [] | |
for chunk in chunks: | |
chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) | |
chunk_texts.append(chunk_text) | |
# Generate embeddings | |
embeddings = model.encode(chunk_texts, show_progress_bar=True) | |
return { | |
'texts': chunk_texts, | |
'embeddings': embeddings, | |
'model': model_name, | |
'dimension': embeddings.shape[1] | |
} | |
def visualize_embeddings_tsne(embedding_data): | |
""" | |
Visualize embeddings using t-SNE | |
Args: | |
embedding_data: Dictionary with embeddings | |
Returns: | |
Plotly figure | |
""" | |
if not embedding_data or 'embeddings' not in embedding_data: | |
return None | |
from sklearn.manifold import TSNE | |
# Apply t-SNE to reduce dimensions for visualization | |
tsne = TSNE(n_components=2, random_state=42) | |
reduced_embeddings = tsne.fit_transform(embedding_data['embeddings']) | |
# Create DataFrame for plotting | |
df = pd.DataFrame({ | |
'x': reduced_embeddings[:, 0], | |
'y': reduced_embeddings[:, 1], | |
'chunk_id': [f"Chunk {i+1}" for i in range(len(reduced_embeddings))] | |
}) | |
# Add text length as size | |
df['text_length'] = [len(text) for text in embedding_data['texts']] | |
# Normalize text length for sizing | |
max_length = df['text_length'].max() | |
df['size'] = df['text_length'].apply(lambda x: max(10, min(40, x / max_length * 40))) | |
# Create plot | |
fig = px.scatter( | |
df, x='x', y='y', | |
text='chunk_id', | |
size='size', | |
title=f"t-SNE Visualization of Document Embeddings ({embedding_data['model']})", | |
hover_data=['text_length'] | |
) | |
fig.update_traces( | |
textposition='top center', | |
marker=dict(sizemode='diameter') | |
) | |
fig.update_layout( | |
xaxis_title='t-SNE Dimension 1', | |
yaxis_title='t-SNE Dimension 2', | |
showlegend=False | |
) | |
return fig | |
def generate_similarity_matrix(embedding_data): | |
""" | |
Generate similarity matrix for chunks | |
Args: | |
embedding_data: Dictionary with embeddings | |
Returns: | |
Plotly figure with similarity matrix | |
""" | |
if not embedding_data or 'embeddings' not in embedding_data: | |
return None | |
# Calculate cosine similarity | |
embeddings = embedding_data['embeddings'] | |
similarity_matrix = util.cos_sim(embeddings, embeddings).numpy() | |
# Create labels for each chunk | |
labels = [f"Chunk {i+1}" for i in range(similarity_matrix.shape[0])] | |
# Create heatmap | |
fig = go.Figure(data=go.Heatmap( | |
z=similarity_matrix, | |
x=labels, | |
y=labels, | |
colorscale='Viridis', | |
zmin=0, zmax=1 | |
)) | |
fig.update_layout( | |
title='Semantic Similarity Between Chunks', | |
xaxis_title='Chunk ID', | |
yaxis_title='Chunk ID', | |
) | |
return fig | |
def extract_top_keywords(chunks, top_n=10): | |
""" | |
Extract top keywords from chunks using TF-IDF | |
Args: | |
chunks: List of chunks | |
top_n: Number of top keywords to extract | |
Returns: | |
Dictionary with top keywords for each chunk | |
""" | |
if not chunks: | |
return {} | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from nltk.corpus import stopwords | |
# Get text from each chunk | |
chunk_texts = [] | |
for chunk in chunks: | |
chunk_text = " ".join([e.text for e in chunk.elements if hasattr(e, 'text')]) | |
chunk_texts.append(chunk_text) | |
# Get English stopwords | |
stop_words = set(stopwords.words('english')) | |
# Initialize vectorizer | |
vectorizer = TfidfVectorizer( | |
max_features=1000, | |
stop_words=stop_words, | |
ngram_range=(1, 2) | |
) | |
# Fit vectorizer | |
try: | |
tfidf_matrix = vectorizer.fit_transform(chunk_texts) | |
except Exception as e: | |
logger.error(f"Error extracting keywords: {str(e)}") | |
return {} | |
# Get feature names | |
feature_names = vectorizer.get_feature_names_out() | |
# Extract top keywords for each chunk | |
top_keywords = {} | |
for i, chunk_vec in enumerate(tfidf_matrix): | |
# Convert sparse matrix to dense and get top indices | |
dense = chunk_vec.todense() | |
dense_list = dense.tolist()[0] | |
sorted_indices = np.argsort(dense_list)[::-1][:top_n] | |
# Get keywords and scores | |
keywords = [(feature_names[idx], dense_list[idx]) for idx in sorted_indices] | |
top_keywords[f"Chunk {i+1}"] = keywords | |
return top_keywords | |
def visualize_keywords(keywords_data): | |
""" | |
Visualize top keywords across chunks | |
Args: | |
keywords_data: Dictionary with keywords for each chunk | |
Returns: | |
Plotly figure | |
""" | |
if not keywords_data: | |
return None | |
# Prepare data for visualization | |
data = [] | |
for chunk_id, keywords in keywords_data.items(): | |
for keyword, score in keywords: | |
data.append({ | |
'chunk': chunk_id, | |
'keyword': keyword, | |
'score': score | |
}) | |
# Create DataFrame | |
df = pd.DataFrame(data) | |
# Create heatmap | |
pivot_df = df.pivot(index='keyword', columns='chunk', values='score') | |
# Sort by average score | |
pivot_df['avg'] = pivot_df.mean(axis=1) | |
pivot_df = pivot_df.sort_values('avg', ascending=False).drop('avg', axis=1) | |
# Create figure | |
fig = px.imshow( | |
pivot_df, | |
labels=dict(x="Chunk", y="Keyword", color="TF-IDF Score"), | |
x=pivot_df.columns, | |
y=pivot_df.index, | |
color_continuous_scale="Viridis", | |
aspect="auto" | |
) | |
fig.update_layout( | |
title='Top Keywords Across Chunks', | |
height=600 | |
) | |
return fig | |
# Cell 6: Define functions for the final output formats | |
def generate_final_output(chunks, embedding_data=None, processing_stats=None): | |
""" | |
Generate final structured output | |
Args: | |
chunks: List of chunks | |
embedding_data: Dictionary with embeddings | |
processing_stats: Dictionary with processing statistics | |
Returns: | |
Dictionary with final structured data | |
""" | |
if not chunks: | |
return {} | |
# Initialize final data structure | |
final_data = { | |
'metadata': { | |
'timestamp': datetime.now().isoformat(), | |
'num_chunks': len(chunks), | |
'processing_stats': processing_stats or {} | |
}, | |
'chunks': [] | |
} | |
# Get embeddings if available | |
embeddings = embedding_data.get('embeddings', []) if embedding_data else [] | |
# Process each chunk | |
for i, chunk in enumerate(chunks): | |
# Get text from chunk | |
chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) | |
# Get element types in chunk | |
element_types = {} | |
for e in chunk.elements: | |
element_type = type(e).__name__ | |
if element_type not in element_types: | |
element_types[element_type] = 0 | |
element_types[element_type] += 1 | |
# Add chunk data | |
chunk_data = { | |
'chunk_id': f"chunk_{i+1}", | |
'metadata': { | |
'element_types': element_types, | |
'num_elements': len(chunk.elements), | |
'text_length': len(chunk_text) | |
}, | |
'text': chunk_text, | |
'elements': [convert_to_dict(e) for e in chunk.elements] | |
} | |
# Add embedding if available | |
if i < len(embeddings): | |
chunk_data['embedding'] = embeddings[i].tolist() | |
final_data['chunks'].append(chunk_data) | |
return final_data | |
def format_for_qa(chunks): | |
""" | |
Format chunks for question answering | |
Args: | |
chunks: List of chunks | |
Returns: | |
List of documents in format suitable for QA systems | |
""" | |
if not chunks: | |
return [] | |
qa_docs = [] | |
for i, chunk in enumerate(chunks): | |
# Get text from chunk | |
chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) | |
# Create document | |
doc = { | |
'id': f"chunk_{i+1}", | |
'content': chunk_text, | |
'metadata': { | |
'num_elements': len(chunk.elements), | |
'element_types': [type(e).__name__ for e in chunk.elements] | |
} | |
} | |
qa_docs.append(doc) | |
return qa_docs | |
def format_for_transformers(chunks): | |
""" | |
Format chunks for HuggingFace transformers | |
Args: | |
chunks: List of chunks | |
Returns: | |
Dictionary with data formatted for transformers | |
""" | |
if not chunks: | |
return {} | |
# Create a simple format for transformers | |
try: | |
# Extract text from chunks | |
texts = [] | |
for chunk in chunks: | |
chunk_text = "\n".join([e.text for e in chunk.elements if hasattr(e, 'text')]) | |
texts.append(chunk_text) | |
# Create dataset structure | |
transformer_data = { | |
"text": texts, | |
"metadata": [{"chunk_id": f"chunk_{i}"} for i in range(len(texts))] | |
} | |
return transformer_data | |
except Exception as e: | |
logger.error(f"Error formatting for transformers: {str(e)}") | |
return {} | |
def format_for_label_studio(elements): | |
""" | |
Format elements for Label Studio | |
Args: | |
elements: List of elements | |
Returns: | |
Dictionary with data formatted for Label Studio | |
""" | |
if not elements: | |
return {} | |
try: | |
# Create a basic format for Label Studio | |
label_studio_data = [] | |
for i, element in enumerate(elements): | |
if hasattr(element, 'text'): | |
label_studio_data.append({ | |
"id": i, | |
"text": element.text, | |
"element_type": type(element).__name__, | |
"metadata": element.metadata if hasattr(element, 'metadata') else {} | |
}) | |
return label_studio_data | |
except Exception as e: | |
logger.error(f"Error formatting for Label Studio: {str(e)}") | |
return {} | |
# Cell 7: Build the Gradio interface components | |
def process_files( | |
files, | |
partition_options, | |
cleaning_options, | |
extraction_options, | |
chunking_method, | |
chunking_options, | |
embedding_model, | |
output_format | |
): | |
""" | |
Main processing function for the Gradio interface | |
Args: | |
files: List of uploaded files | |
partition_options: Dictionary of partitioning options | |
cleaning_options: Dictionary of cleaning options | |
extraction_options: Dictionary of extraction options | |
chunking_method: Method to use for chunking | |
chunking_options: Dictionary of chunking options | |
embedding_model: Model to use for embeddings | |
output_format: Format for final output | |
Returns: | |
Tuple of ( | |
status_html, | |
log_html, | |
element_stats, | |
element_chart, | |
similarity_matrix, | |
embedding_viz, | |
keyword_viz, | |
output_data | |
) | |
""" | |
# Create temp directory for uploads | |
temp_dir = create_temp_dir() | |
# Initialize status and logs | |
status_html = "<div style='color: blue;'>Initializing processing pipeline...</div>" | |
log_html = "<div style='font-family: monospace; height: 200px; overflow-y: auto;'>" | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Starting document processing pipeline\n" | |
try: | |
# Save uploaded files | |
file_paths = [] | |
for file in files: | |
if file is None: | |
continue | |
file_path = save_uploaded_file(file, temp_dir) | |
file_paths.append(file_path) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Saved {file.name} to temporary directory\n" | |
if not file_paths: | |
status_html = "<div style='color: red;'>No files were uploaded. Please upload at least one file.</div>" | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Error: No files were uploaded\n" | |
log_html += "</div>" | |
return status_html, log_html, None, None, None, None, None, None | |
# Process each file | |
all_elements = [] | |
for file_path in file_paths: | |
file_name = os.path.basename(file_path) | |
file_type = identify_file_type(file_path) | |
status_html = f"<div style='color: blue;'>Processing {file_name} ({file_type})...</div>" | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Processing {file_name} ({file_type})\n" | |
# Partition file | |
partition_kwargs = {k: v for k, v in partition_options.items() if v} | |
elements = partition_file(file_path, partition_kwargs) | |
# Add source information to elements | |
for element in elements: | |
if not hasattr(element, 'metadata'): | |
element.metadata = {} | |
element.metadata.update({ | |
'source_filename': file_name, | |
'source_filetype': file_type | |
}) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Extracted {len(elements)} elements from {file_name}\n" | |
all_elements.extend(elements) | |
# Process all elements | |
status_html = "<div style='color: blue;'>Cleaning and processing elements...</div>" | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Processing {len(all_elements)} elements\n" | |
# Clean elements | |
cleaning_kwargs = {k: v for k, v in cleaning_options.items() if v} | |
if cleaning_kwargs: | |
cleaned_elements = clean_elements(all_elements, cleaning_kwargs) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Applied {len(cleaning_kwargs)} cleaning operations\n" | |
else: | |
cleaned_elements = all_elements | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] No cleaning operations selected\n" | |
# Extract entities | |
extraction_kwargs = {k: v for k, v in extraction_options.items() if v} | |
if extraction_kwargs: | |
processed_elements = extract_entities(cleaned_elements, extraction_kwargs) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Applied {len(extraction_kwargs)} extraction operations\n" | |
else: | |
processed_elements = cleaned_elements | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] No extraction operations selected\n" | |
# Categorize elements | |
element_stats = categorize_elements(processed_elements) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Categorized {element_stats['total']} elements into {len(element_stats['by_type'])} types\n" | |
# Create element distribution chart | |
element_chart = visualize_element_distribution(element_stats) | |
# Chunk elements | |
status_html = "<div style='color: blue;'>Chunking elements...</div>" | |
chunking_kwargs = {k: v for k, v in chunking_options.items() if v} | |
chunks = chunk_elements(processed_elements, chunking_method, **chunking_kwargs) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Created {len(chunks)} chunks using {chunking_method} method\n" | |
# Extract keywords | |
status_html = "<div style='color: blue;'>Extracting keywords...</div>" | |
keywords_data = extract_top_keywords(chunks) | |
keyword_viz = visualize_keywords(keywords_data) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Extracted keywords from {len(keywords_data)} chunks\n" | |
# Generate embeddings | |
if embedding_model: | |
status_html = f"<div style='color: blue;'>Generating embeddings using {embedding_model}...</div>" | |
embedding_data = generate_embeddings(chunks, embedding_model) | |
# Create embedding visualizations | |
embedding_viz = visualize_embeddings_tsne(embedding_data) | |
similarity_matrix = generate_similarity_matrix(embedding_data) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated {embedding_data['dimension']}-dimensional embeddings\n" | |
else: | |
embedding_data = None | |
embedding_viz = None | |
similarity_matrix = None | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Skipped embedding generation (no model selected)\n" | |
# Generate final output | |
status_html = "<div style='color: blue;'>Generating final output...</div>" | |
processing_stats = { | |
'num_files': len(file_paths), | |
'file_types': [identify_file_type(fp) for fp in file_paths], | |
'total_elements': element_stats['total'], | |
'element_types': element_stats['by_type'], | |
'num_chunks': len(chunks) | |
} | |
if output_format == 'json': | |
output_data = generate_final_output(chunks, embedding_data, processing_stats) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated JSON output with {len(output_data['chunks'])} chunks\n" | |
elif output_format == 'qa': | |
output_data = format_for_qa(chunks) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated Q&A format with {len(output_data)} documents\n" | |
elif output_format == 'transformers': | |
output_data = format_for_transformers(chunks) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated Transformer format\n" | |
elif output_format == 'label_studio': | |
output_data = format_for_label_studio(processed_elements) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated Label Studio format\n" | |
else: | |
# Default to JSON | |
output_data = generate_final_output(chunks, embedding_data, processing_stats) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Generated default JSON output\n" | |
status_html = "<div style='color: green;'>Processing complete! ✅</div>" | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Successfully completed document processing pipeline\n" | |
except Exception as e: | |
status_html = f"<div style='color: red;'>Error in processing: {str(e)}</div>" | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] ERROR: {str(e)}\n" | |
element_stats = None | |
element_chart = None | |
embedding_viz = None | |
similarity_matrix = None | |
keyword_viz = None | |
output_data = None | |
finally: | |
# Clean up temp directory | |
try: | |
shutil.rmtree(temp_dir) | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Cleaned up temporary files\n" | |
except Exception as e: | |
log_html += f"[{datetime.now().strftime('%H:%M:%S')}] Warning: Failed to clean temporary files: {str(e)}\n" | |
log_html += "</div>" | |
return status_html, log_html, element_stats, element_chart, similarity_matrix, embedding_viz, keyword_viz, output_data | |
# Cell 8: Define the Gradio interface | |
def build_gradio_interface(): | |
""" | |
Build and launch the Gradio interface | |
""" | |
# Define theme | |
custom_theme = gr.themes.Default( | |
primary_hue="indigo", | |
secondary_hue="purple", | |
) | |
# Create interface | |
with gr.Blocks(theme=custom_theme, title="Unstructured Document Processing") as app: | |
gr.Markdown(""" | |
# 📄 Unstructured Document Processing Pipeline | |
This application demonstrates a comprehensive document processing pipeline using the [Unstructured](https://unstructured.io/) library. | |
Upload one or more documents to process them through partitioning, cleaning, extraction, chunking, and embedding. | |
**Supported file formats**: PDF, DOCX, PPTX, XLSX, HTML, CSV, JSON, XML, Email, Images (JPG, PNG) | |
""") | |
# File upload section | |
with gr.Row(): | |
with gr.Column(scale=3): | |
files = gr.File( | |
file_count="multiple", | |
label="Upload Documents", | |
type="binary", | |
file_types=[ | |
".pdf", ".docx", ".pptx", ".xlsx", ".html", ".htm", | |
".csv", ".json", ".xml", ".eml", ".msg", | |
".jpg", ".jpeg", ".png", ".txt" | |
] | |
) | |
with gr.Column(scale=2): | |
with gr.Accordion("Status", open=True): | |
status = gr.HTML(value="<div style='color: gray;'>Waiting for files...</div>") | |
with gr.Accordion("Processing Log", open=True): | |
log = gr.HTML(value="<div style='font-family: monospace; height: 200px; overflow-y: auto;'>Processing log will appear here...</div>") | |
# Processing options | |
with gr.Tabs(): | |
# Partitioning options | |
with gr.TabItem("Partitioning"): | |
gr.Markdown("### Document Partitioning Options") | |
with gr.Row(): | |
with gr.Column(): | |
partition_options = { | |
"extract_images": gr.Checkbox(value=True, label="Extract Images", info="Extract images from documents"), | |
"infer_table_structure": gr.Checkbox(value=True, label="Infer Table Structure", info="Extract tables with structure"), | |
"include_page_breaks": gr.Checkbox(value=True, label="Include Page Breaks", info="Include page break elements"), | |
"include_metadata": gr.Checkbox(value=True, label="Include Metadata", info="Extract document metadata"), | |
"strategy": gr.Radio(choices=["fast", "hi_res", "ocr_only"], value="hi_res", label="OCR Strategy (for images/scanned docs)", info="Fast is quicker but less accurate") | |
} | |
# Cleaning options | |
with gr.TabItem("Cleaning"): | |
gr.Markdown("### Text Cleaning Options") | |
with gr.Row(): | |
with gr.Column(): | |
cleaning_options = { | |
"extra_whitespace": gr.Checkbox(value=True, label="Clean Extra Whitespace", info="Remove redundant whitespace"), | |
"unicode_quotes": gr.Checkbox(value=True, label="Replace Unicode Quotes", info="Normalize quotes to ASCII"), | |
"bullets": gr.Checkbox(value=True, label="Clean Bullets", info="Standardize bullet points"), | |
"dashes": gr.Checkbox(value=True, label="Clean Dashes", info="Standardize dashes"), | |
"group_paragraphs": gr.Checkbox(value=False, label="Group Broken Paragraphs", info="Combine paragraphs split across pages"), | |
} | |
with gr.Column(): | |
cleaning_options.update({ | |
"remove_punctuation": gr.Checkbox(value=False, label="Remove Punctuation", info="Remove all punctuation") | |
}) | |
# Extraction options | |
with gr.TabItem("Extraction"): | |
gr.Markdown("### Entity Extraction Options") | |
with gr.Row(): | |
with gr.Column(): | |
extraction_options = { | |
"emails": gr.Checkbox(value=True, label="Extract Emails", info="Extract email addresses"), | |
"urls": gr.Checkbox(value=True, label="Extract URLs", info="Extract URLs"), | |
"phone_numbers": gr.Checkbox(value=True, label="Extract Phone Numbers", info="Extract phone numbers"), | |
"ip_addresses": gr.Checkbox(value=False, label="Extract IP Addresses", info="Extract IP addresses"), | |
"ner": gr.Checkbox(value=True, label="Named Entity Recognition", info="Extract named entities (people, orgs, locations)") | |
} | |
# Chunking options | |
with gr.TabItem("Chunking"): | |
gr.Markdown("### Text Chunking Options") | |
with gr.Row(): | |
with gr.Column(): | |
chunking_method = gr.Radio( | |
choices=["by_title", "by_token"], | |
value="by_title", | |
label="Chunking Method", | |
info="How to divide the document into chunks" | |
) | |
with gr.Column(): | |
chunking_options = { | |
"max_characters": gr.Number(value=2000, label="Max Characters (by_token)", info="Maximum characters per chunk"), | |
"combine_text_under_n_chars": gr.Number(value=300, label="Combine Small Text (by_title)", info="Combine sections smaller than this") | |
} | |
# Embedding options | |
with gr.TabItem("Embedding"): | |
gr.Markdown("### Embedding Generation Options") | |
with gr.Row(): | |
embedding_model = gr.Dropdown( | |
choices=[ | |
"all-MiniLM-L6-v2", | |
"paraphrase-multilingual-MiniLM-L12-v2", | |
"all-mpnet-base-v2", | |
"sentence-t5-base", | |
"" # Empty option to skip embedding | |
], | |
value="all-MiniLM-L6-v2", | |
label="Embedding Model", | |
info="Select a model for generating embeddings (or empty to skip)" | |
) | |
# Output format options | |
with gr.TabItem("Output Format"): | |
gr.Markdown("### Output Format Options") | |
with gr.Row(): | |
output_format = gr.Radio( | |
choices=["json", "qa", "transformers", "label_studio"], | |
value="json", | |
label="Output Format", | |
info="Format for the final processed output" | |
) | |
# Process button | |
process_btn = gr.Button("Process Documents", variant="primary") | |
# Results section | |
with gr.Tabs(): | |
with gr.TabItem("Element Analysis"): | |
with gr.Row(): | |
element_stats_json = gr.JSON(label="Element Statistics") | |
element_dist_chart = gr.Plot(label="Element Distribution") | |
with gr.TabItem("Semantic Analysis"): | |
with gr.Row(): | |
keyword_viz_plot = gr.Plot(label="Keyword Analysis") | |
with gr.Row(): | |
embedding_viz_plot = gr.Plot(label="Embedding Visualization") | |
similarity_matrix_plot = gr.Plot(label="Semantic Similarity Matrix") | |
with gr.TabItem("Processed Output"): | |
output_data_json = gr.JSON(label="Processed Data") | |
# Set up event handlers | |
process_btn.click( | |
fn=process_files, | |
inputs=[ | |
files, | |
gr.Group(list(partition_options.values())), | |
gr.Group(list(cleaning_options.values())), | |
gr.Group(list(extraction_options.values())), | |
chunking_method, | |
gr.Group(list(chunking_options.values())), | |
embedding_model, | |
output_format | |
], | |
outputs=[ | |
status, | |
log, | |
element_stats_json, | |
element_dist_chart, | |
similarity_matrix_plot, | |
embedding_viz_plot, | |
keyword_viz_plot, | |
output_data_json | |
] | |
) | |
# Examples | |
gr.Examples( | |
examples=[ | |
[ | |
# Example with default settings - user would upload their own files | |
None | |
] | |
], | |
inputs=[files], | |
) | |
# Add markdown with instructions | |
with gr.Accordion("Instructions", open=False): | |
gr.Markdown(""" | |
## How to Use This App | |
1. **Upload Documents**: Start by uploading one or more documents in the supported formats. | |
2. **Configure Processing Options**: | |
- **Partitioning**: Control how documents are broken into elements | |
- **Cleaning**: Select text cleaning operations to apply | |
- **Extraction**: Choose entities to extract from the text | |
- **Chunking**: Set how elements are grouped into chunks | |
- **Embedding**: Select a model for generating vector embeddings | |
- **Output Format**: Choose the format of the final processed data | |
3. **Process Documents**: Click the "Process Documents" button to start the pipeline | |
4. **Analyze Results**: | |
- **Element Analysis**: View statistics and distribution of document elements | |
- **Semantic Analysis**: Explore keyword distribution and semantic relationships | |
- **Processed Output**: View the final structured data ready for use with LLMs | |
## Typical Use Cases | |
- **Content Extraction**: Extract structured content from unstructured documents | |
- **Document Understanding**: Analyze and categorize document components | |
- **Text Preprocessing**: Prepare text for further NLP or machine learning | |
- **Knowledge Base Creation**: Convert documents into semantic chunks for retrieval | |
- **LLM Integration**: Structure documents for use with large language models | |
""") | |
return app | |
# Cell 9: Launch the application | |
# Create and launch the app | |
app = build_gradio_interface() | |
app.launch(debug=True) | |