import dash
from dash import dcc, html, Input, Output, State, ctx, callback_context
from dash.exceptions import PreventUpdate
import dash_bootstrap_components as dbc
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import numpy as np
import umap
import hdbscan
import sklearn.feature_extraction.text as text
from dash.exceptions import PreventUpdate
import json
from dotenv import load_dotenv
import helpers
from omeka_s_api_client import OmekaSClient, OmekaSClientError
from lancedb_client import LanceDBManager
import torch
import torch.nn.functional as F
# Load .env for credentials
load_dotenv()
_DEFAULT_PARSE_METADATA = (
'dcterms:identifier','dcterms:type','dcterms:title', 'dcterms:description',
'dcterms:creator','dcterms:publisher','dcterms:date','dcterms:spatial',
'dcterms:format','dcterms:provenance','dcterms:subject','dcterms:medium',
'bibo:annotates','bibo:content', 'bibo:locator', 'bibo:owner'
)
app = dash.Dash(__name__, suppress_callback_exceptions=True, external_stylesheets=[dbc.themes.BOOTSTRAP])
app.config.suppress_callback_exceptions = True
server = app.server
manager = LanceDBManager()
french_stopwords = text.ENGLISH_STOP_WORDS.union([
"alors", "au", "aucuns", "aussi", "autre", "avant", "avec", "avoir", "bon",
"car", "ce", "cela", "ces", "ceux", "chaque", "ci", "comme", "comment", "dans",
"des", "du", "dedans", "dehors", "depuis", "devrait", "doit", "donc", "dos",
"début", "elle", "elles", "en", "encore", "essai", "est", "et", "eu", "fait",
"faites", "fois", "font", "hors", "ici", "il", "ils", "je", "juste", "la", "le",
"les", "leur", "là", "ma", "maintenant", "mais", "mes", "mine", "moins", "mon",
"mot", "même", "ni", "nommés", "notre", "nous", "nouveaux", "ou", "où", "par",
"parce", "parole", "pas", "personnes", "peut", "peu", "pièce", "plupart", "pour",
"pourquoi", "quand", "que", "quel", "quelle", "quelles", "quels", "qui", "sa",
"sans", "ses", "seulement", "si", "sien", "son", "sont", "sous", "soyez", "sujet",
"sur", "ta", "tandis", "tellement", "tels", "tes", "ton", "tous", "tout", "trop",
"très", "tu", "valeur", "voie", "voient", "vont", "votre", "vous", "vu", "ça",
"étaient", "état", "étions", "été", "être"
])
# -------------------- Layout --------------------
app.layout = html.Div([
# Header
dbc.NavbarSimple(
children=[],
brand="Omeka S Computer Vision Assistant",
brand_href="/",
color="light",
dark=False,
className="mb-4 shadow-sm border-bottom"
),
# Main Container
dbc.Container(fluid=True, children=[
dbc.Row([
# Left column - Controls
dbc.Col(width=6, children=[
dbc.Card([
dbc.CardHeader(html.H4("Data Loading and ploting", className="text-center")),
dbc.CardBody([
# Tabs
dcc.Tabs(id="data-tabs", value="api", children=[
dcc.Tab(label="Harvest data from Omeka S", value="omeka"),
dcc.Tab(label="Visualize existing collections", value="lance")
]),
html.Div(id="data-tab-content"),
html.Br(),
])
], className="mb-4 shadow-sm")
]),
# Right column - Explanations
dbc.Col(width=6, children=[
dbc.Card([
dbc.CardHeader(
html.H4(
dbc.Button("Explanations", color="primary", id="explanation-toggle", n_clicks=0),
className="text-center"
)
),
dbc.Collapse(
dbc.CardBody([
html.P("This application allows you to explore Omeka S collections through interactive visualization."),
html.P("You can load data in two ways:"),
html.P("1. From Omeka S: Connect to your Omeka S instance and select a collection to visualize."),
html.P("2. From LanceDB: Load previously processed collections from the local database."),
html.P("The visualization uses UMAP projection and topic clustering to create an interactive map of your collection."),
html.P("You can explore items by hovering over points and search using semantic queries."),
]),
id="explanation-collapse",
is_open=False
)
], className="mb-4 shadow-sm")
])
]),
html.Br(),
dbc.Row([
dbc.Col([
dbc.InputGroup([
dbc.Input(
id="search-input",
type="text",
placeholder="Search...",
),
dbc.Button(
"Search",
id="search-button",
color="primary",
size="sm",
),
dbc.Button(
"Clear",
id="clear-button",
color="secondary",
size="sm",
),
], className="d-flex align-items-center")
], width={"size": 6, "offset": 3}), # Center the input group and make it half width
], className="mb-3"),
dbc.Row([
dbc.Col([
html.Label("Number of results:", className="mb-0"),
dcc.Slider(
id="search-limit-slider",
min=1,
max=50,
step=1,
value=5,
marks={i: str(i) for i in range(1, 51, 1)},
className="mt-1"
),
], width={"size": 6, "offset": 3}),
], className="mb-3"),
html.Br(),
# Central Visualization (like scatter plot, map etc.)
dbc.Row([
html.Div([
dbc.Spinner(
id="loading-spinner",
type="grow",
color="primary",
fullscreen=False,
children=[
# Add a placeholder div
html.Div(
id="graph-placeholder",
children="Select a data source and load data to visualize",
style={
"height": "700px",
"display": "flex",
"alignItems": "center",
"justifyContent": "center",
"color": "#666",
"fontSize": "1.2rem",
"fontStyle": "italic",
"width": "900px" # Set width to 70%
}
),
dcc.Graph(
id="umap-graph",
style={
"width": "900px", # Set width to 70%
"height": "700px",
"display": "none"
},
config={
'scrollZoom': True,
'displayModeBar': True,
'modeBarButtonsToAdd': ['drawline']
}
)],
),
html.Div(id="point-details",
style={
"width": "30%", # Set width to 30%
"padding": "15px",
"borderLeft": "1px solid #ccc",
"overflowY": "auto",
"height": "700px",
"minWidth": "250px",
"maxWidth": "30%" # Match the width
}),
],
style={
"display": "flex",
"flexDirection": "row",
"width": "100%",
"gap": "10px",
"justifyContent": "space-between"
}),
]),
html.Div(id="status"),
dcc.Store(id="omeka-client-config", storage_type="session"),
]),
# Footer
html.Footer([
html.Hr(),
dbc.Container([
dbc.Row([
dbc.Col([
html.Img(src="./SmartBibl.IA_Solutions.png", height="50"),
html.Small([
html.Br(),
html.A("Géraldine Geoffroy", href="mailto:grldn.geoffroy@gmail.com", className="text-muted")
])
]),
dbc.Col([
html.H5("Code source"),
html.Ul([
html.Li(html.A("Github", href="https://github.com/gegedenice/openalex-explorer", className="text-muted", target="_blank"))
])
]),
dbc.Col([
html.H5("Ressources"),
html.Ul([
html.Li(html.A("Nomic Atlas", href="https://atlas.nomic.ai/", target="_blank", className="text-muted")),
html.Li(html.A("Model nomic-embed-text-v1.5", href="https://huggingface.co./nomic-ai/nomic-embed-text-v1.5", target="_blank", className="text-muted")),
html.Li(html.A("Model nomic-embed-vision-v1.5", href="https://huggingface.co./nomic-ai/nomic-embed-vision-v1.5", target="_blank", className="text-muted"))
])
])
])
])
], className="mt-5 p-3 bg-light border-top")
])
# -------------------- UI Callbacks --------------------
# ------------------------------------------------------
##-------------------- Tabs Callbacks --------------------
@app.callback(
Output("data-tab-content", "children"),
Input("data-tabs", "value")
)
def render_tab_content(tab):
if tab == "omeka":
return html.Div([
html.Div([
html.H5("Harvest data from an Omeka S instance", className="mb-3"),
# API URL input with full width
dbc.InputGroup([
dbc.Input(
id="api-url",
value="https://your-omeka-instance.org",
type="url",
placeholder="Enter your Omeka S instance URL",
className="mb-2"
),
]),
# Buttons and dropdowns container
dbc.Container([
dbc.Row([
dbc.Col([
dbc.Button(
"Load Item Sets",
id="load-sets",
color="link",
size="sm",
className="w-100 mb-2"
),
]),
]),
dbc.Row([
dbc.Col([
dcc.Dropdown(
id="items-sets-dropdown",
placeholder="Select a collection",
className="mb-2"
),
]),
]),
dbc.Row([
dbc.Col([
dbc.Input(
id="table-name",
value="Enter a table name for data storage",
type="text",
placeholder="New table name",
className="mb-2"
),
]),
]),
dbc.Row([
dbc.Col([
dbc.Button(
"Process Omeka Collection",
id="process-omeka",
color="success",
size="sm",
className="mt-2"
),
]),
]),
], fluid=True, className="p-0"),
], className="p-3"),
], className="border rounded bg-white shadow-sm")
elif tab == "lance":
# Get tables at runtime
tables = manager.list_tables()
return html.Div([
html.H5("From LanceDB", className="mb-3"),
html.Div([
dbc.RadioItems(
id="db-tables-radio",
options=[{"label": t, "value": t} for t in tables],
value=tables[0] if tables else None,
className="mb-3"
),
dbc.Button("Display Table", id="load-data-db", color="success", size="sm", className="me-2"),
dbc.Button("Drop Table", id="drop-data-db", color="danger", size="sm"),
]) if tables else html.P("No tables available in LanceDB", className="text-muted"),
], className="border rounded bg-white shadow-sm p-3")
return html.Div("Invalid tab selected.")
# -------------------- Collpase callback --------------------
@app.callback(
Output("explanation-collapse", "is_open"),
Input("explanation-toggle", "n_clicks"),
prevent_initial_call=True
)
def toggle_collapse(n):
return n % 2 == 1
# -------------------- Graph placeholder Toggle callback --------------------
@app.callback(
Output("graph-placeholder", "style"),
Output("umap-graph", "style"),
[Input("umap-graph", "figure")],
prevent_initial_call=True
)
def toggle_graph_visibility(figure):
if figure is None:
return {"display": "flex"}, {"display": "none"}
return {"display": "none"}, {
"flex": 3,
"width": "100%",
"display": "block"
}
# -------------------- Features Callbacks --------------------
# ------------------------------------------------------------
## -------------------- Load Omeka collections callback--------------------
@app.callback(
Output("items-sets-dropdown", "options"),
Output("omeka-client-config", "data"),
Input("load-sets", "n_clicks"),
State("api-url", "value"),
prevent_initial_call=True
)
def load_item_sets(n_clicks, base_url):
if n_clicks is None: # Add this check
raise PreventUpdate
client = OmekaSClient(base_url, "...", "...", 50)
try:
item_sets = client.list_all_item_sets()
options = [{"label": s.get('dcterms:title', [{}])[0].get('@value', 'N/A'), "value": s["o:id"]} for s in item_sets]
return options, {
"base_url": base_url,
"key_identity": "...",
"key_credential": "...",
"default_per_page": 50
}
except Exception as e:
return dash.no_update, dash.no_update
## -------------------- Load & Process Omeka items callback--------------------
@app.callback(
Output("umap-graph", "figure"),
Output("status", "children"),
Input("process-omeka", "n_clicks"), # Changed ID to match new button
State("items-sets-dropdown", "value"),
State("omeka-client-config", "data"),
State("table-name", "value"),
prevent_initial_call=True
)
def handle_omeka_data(n_clicks, item_set_id, client_config, table_name):
if not n_clicks or not client_config:
raise PreventUpdate
client = OmekaSClient(
base_url=client_config["base_url"],
key_identity=client_config["key_identity"],
key_credential=client_config["key_credential"]
)
df_omeka = harvest_omeka_items(client, item_set_id=item_set_id)
items = df_omeka.to_dict(orient="records")
records_with_text = [helpers.add_concatenated_text_field_exclude_keys(item, keys_to_exclude=['id','images_urls'], text_field_key='text', pair_separator=' - ') for item in items]
df = helpers.prepare_df_atlas(pd.DataFrame(records_with_text), id_col='id', images_col='images_urls')
text_embed = helpers.generate_text_embed(df['text'].tolist())
img_embed = helpers.generate_img_embed(df['images_urls'].tolist())
# Convert to tensors if needed
text_tensor = torch.tensor(text_embed)
img_tensor = torch.tensor(img_embed)
# Average then normalize
combined = (0.7 * text_tensor + 0.3 * img_tensor)
normalized_embeddings = F.normalize(combined, p=2, dim=1)
embeddings = normalized_embeddings.numpy()
df["embeddings"] = embeddings.tolist()
reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, metric="cosine")
umap_embeddings = reducer.fit_transform(embeddings)
df["umap_embeddings"] = umap_embeddings.tolist()
clusterer = hdbscan.HDBSCAN(min_cluster_size=10, metric="euclidean")
cluster_labels = clusterer.fit_predict(umap_embeddings)
df["Cluster"] = cluster_labels
vectorizer = text.TfidfVectorizer(max_features=1000, stop_words=list(french_stopwords), lowercase=True)
tfidf_matrix = vectorizer.fit_transform(df["text"].astype(str).tolist())
top_words = []
for label in sorted(df["Cluster"].unique()):
if label == -1:
top_words.append("Noise")
continue
mask = (df["Cluster"] == label).to_numpy().nonzero()[0]
cluster_docs = tfidf_matrix[mask]
mean_tfidf = cluster_docs.mean(axis=0)
mean_tfidf = np.asarray(mean_tfidf).flatten()
top_indices = mean_tfidf.argsort()[::-1][:5]
terms = [vectorizer.get_feature_names_out()[i] for i in top_indices]
top_words.append(", ".join(terms))
cluster_name_map = {label: name for label, name in zip(sorted(df["Cluster"].unique()), top_words)}
df["Topic"] = df["Cluster"].map(cluster_name_map)
manager.initialize_table(table_name)
manager.add_entry(table_name, df.to_dict(orient="records"))
return create_umap_plot(df)
## -------------------- Load LanceDB data callback--------------------
@app.callback(
Output("umap-graph", "figure", allow_duplicate=True),
Output("status", "children", allow_duplicate=True),
Input("load-data-db", "n_clicks"),
State("db-tables-radio", "value"),
prevent_initial_call=True
)
def handle_db_data(n_clicks, db_table):
if not n_clicks or not db_table:
raise PreventUpdate
items = manager.get_content_table(db_table)
df = pd.DataFrame(items)
df = df.dropna(axis=1, how='all')
df = df.fillna('')
#umap_embeddings = np.array(df["umap_embeddings"].tolist())
return create_umap_plot(df)
## -------------------- plotly Hover datapoint callback--------------------
@app.callback(
Output("point-details", "children"),
Input("umap-graph", "hoverData")
)
def show_point_details(hoverData):
if not hoverData:
return html.Div("🖱️ Hover a point to see more details.", style={"color": "#888"})
id,item_id, img_url, title, desc = hoverData["points"][0]["customdata"]
return html.Div([
html.H4(title, style={"fontSize": "1.2rem"}), # Reduced header size
html.P(f"Item ID: {item_id}", style={"fontSize": "0.9rem", "color": "#666"}), # Smaller text
html.Img(src=img_url, style={
"maxWidth": "300px", # Fixed max width instead of 100%
"height": "auto", # Maintain aspect ratio
"marginBottom": "10px",
"borderRadius": "5px",
"boxShadow": "0 2px 4px rgba(0,0,0,0.1)"
}),
html.P(desc or "No description available.",
style={"lineHeight": "1.6", "color": "#444", "fontSize": "0.9rem"}) # Smaller text
])
## -------------------- Search & filter datapoint callback--------------------
@app.callback(
Output("umap-graph", "figure", allow_duplicate=True),
Input("search-button", "n_clicks"),
Input("search-limit-slider", "value"), # Add slider input
State("search-input", "value"),
State("db-tables-radio", "value"),
State("umap-graph", "figure"),
prevent_initial_call=True
)
def filter_points(n_clicks, limit, search_query, table, current_fig):
# Get the trigger that caused the callback
trigger = ctx.triggered_id
# If slider changed but no search query exists, don't update
if trigger == "search-limit-slider" and not search_query:
return dash.no_update
if not search_query:
# Reset visibility of all points
for trace in current_fig['data']:
trace['visible'] = True
return current_fig
# Generate text embedding
query_embed = helpers.generate_text_embed([f"search_query: {search_query}"]).tolist()
# Perform semantic search using the slider value
matching = manager.semantic_search(
table_name=table,
query_embed=query_embed,
limit=limit # Use the slider value
)
matching_ids = [item['id'] for item in json.loads(matching)]
print(f"Searching for '{search_query}' with limit {limit}")
print(f"Found {len(matching_ids)} matches")
# Update visibility of points
fig = go.Figure(current_fig)
for trace in fig.data:
point_ids = [point[0] for point in trace['customdata']]
selected_indices = [i for i, id in enumerate(point_ids) if id in matching_ids]
trace.update(
selectedpoints=selected_indices,
unselected=dict(marker=dict(opacity=0.1))
)
return fig
## -------------------- Clear search callback--------------------
@app.callback(
Output("umap-graph", "figure", allow_duplicate=True),
Output("search-input", "value"), # Clear the search input
Input("clear-button", "n_clicks"),
State("umap-graph", "figure"),
prevent_initial_call=True
)
def clear_search(n_clicks, current_fig):
if not n_clicks:
raise PreventUpdate
fig = go.Figure(current_fig)
# Reset all points to visible and full opacity
for trace in fig.data:
trace.update(
selectedpoints=None,
unselected=None,
opacity=0.8
)
return fig, "" # Return cleared figure and empty search input
## -------------------- Drop table callback--------------------
@app.callback(
Output("db-tables-dropdown", "options",allow_duplicate=True), # Update dropdown options
Output("status", "children",allow_duplicate=True), # Show status message
Input("drop-data-db", "n_clicks"),
State("db-tables-radio", "value"),
State("data-tabs", "value"),
prevent_initial_call=True
)
def drop_db_data(n_clicks, db_table, current_tab):
if not n_clicks or not db_table:
raise PreventUpdate
try:
success = manager.drop_table(db_table)
if success:
# Re-render the entire tab content to show updated radio buttons
return render_tab_content("lance"), f"Table '{db_table}' successfully deleted"
else:
return dash.no_update, f"Failed to delete table '{db_table}'"
except Exception as e:
print(f"Error dropping table: {str(e)}")
return dash.no_update, f"Error: {str(e)}", dash.no_update
# -------------------- Utility --------------------
# -------------------------------------------------
def harvest_omeka_items(client, item_set_id=None, per_page=50):
"""
Fetch and parse items from Omeka S.
Args:
client: OmekaSClient instance
item_set_id: ID of the item set to fetch items from (optional)
per_page: Number of items to fetch per page (default: 50)
Returns:
DataFrame containing parsed item data
"""
print("\n--- Fetching and Parsing Multiple Items by colection---")
try:
# Fetch items
items_list = client.list_all_items(item_set_id=item_set_id, per_page=per_page)
print(f"Initial fetch: {len(items_list)} items")
parsed_items_list = []
for idx, item_raw in enumerate(items_list):
try:
print(f"\nProcessing item {idx + 1}/{len(items_list)}")
if 'o:media' not in item_raw:
print(f"Skipping item {idx + 1}: No media found")
continue
parsed = client.digest_item_data(item_raw, prefixes=_DEFAULT_PARSE_METADATA)
if not parsed:
print(f"Skipping item {idx + 1}: Parsing failed")
continue
# Debug media processing
medias_id = [x["o:id"] for x in item_raw["o:media"]]
print(f"Found {len(medias_id)} media items")
medias_list = []
for media_id in medias_id:
try:
media = client.get_media(media_id)
print(f"Media type: {media.get('o:media_type', 'unknown')}")
if "image" in media.get("o:media_type", ""):
url = media.get('o:original_url')
if url:
medias_list.append(url)
else:
print(f"No URL found for media {media_id}")
except Exception as e:
print(f"Error processing media {media_id}: {str(e)}")
if medias_list:
parsed["images_urls"] = medias_list
parsed_items_list.append(parsed)
print(f"Added item with {len(medias_list)} images")
else:
print(f"Skipping item {idx + 1}: No valid image URLs found")
except Exception as e:
print(f"Error processing item {idx + 1}: {str(e)}")
print(f"Item raw data: {item_raw}")
continue
if not parsed_items_list:
print("No valid items were parsed!")
return None
print(f"\nFinal results:")
print(f"Total items processed: {len(items_list)}")
print(f"Successfully parsed items: {len(parsed_items_list)}")
df = pd.DataFrame(parsed_items_list)
print(f"DataFrame columns: {df.columns.tolist()}")
print(f"DataFrame shape: {df.shape}")
return df
except OmekaSClientError as e:
print(f"Omeka client error: {str(e)}")
return None
except Exception as e:
print(f"Unexpected error: {str(e)}")
print(f"Error type: {type(e)}")
import traceback
print(f"Traceback:\n{traceback.format_exc()}")
return None
def create_umap_plot(df):
coords = np.array(df["umap_embeddings"].tolist())
fig = px.scatter(
df,
x=coords[:, 0],
y=coords[:, 1],
color="Topic", # Start with top-level topics
custom_data=[df["id"], df["item_id"], df["images_urls"], df["Title"], df["Description"]],
hover_data=None,
title="UMAP Projection with HDBSCAN Topics",
color_discrete_sequence=px.colors.qualitative.D3,
width=900,
height=700,
)
# Update marker style
fig.update_traces(
marker=dict(
size=12, # Larger points
opacity=0.8, # Slight transparency
line=dict(width=0), # Remove borders
symbol='circle'
),
hoverinfo='none', # Disable native hover
hovertemplate=None
#hovertemplate="%{customdata[1]}
"
)
# Convert to a go.Figure object to access additional configuration
fig = go.Figure(fig)
# Update layout including scroll zoom
fig.update_layout(
plot_bgcolor='white',
paper_bgcolor='white',
height=700,
margin=dict(t=30, b=30, l=30, r=30),
showlegend=True,
legend=dict(
yanchor="top",
y=0.99,
xanchor="right",
x=0.99,
bgcolor='rgba(255,255,255,0.8)',
bordercolor='rgba(0,0,0,0)'
),
xaxis=dict(
showgrid=False,
zeroline=False,
showline=False,
showticklabels=False,
fixedrange=False
),
yaxis=dict(
showgrid=False,
zeroline=False,
showline=False,
showticklabels=False,
fixedrange=False
),
dragmode='pan',
modebar_add=[
'zoom',
'pan',
'zoomIn',
'zoomOut',
'resetScale'
],
)
return fig, f"Loaded {len(df)} items and projected into 2D."
if __name__ == "__main__":
app.run(debug=True,port=7860)