import base64 import io import random import dash import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go from dash import Input, Output, State, callback, dcc, html # Initialize the Dash app app = dash.Dash(__name__, suppress_callback_exceptions=True) server = app.server # Define app layout app.layout = html.Div( [ # Header html.Div( [ html.H1( "Sessions Observatory by helvia.ai 🔭📊", className="app-header", ), html.P( "Upload a CSV/Excel file to visualize the chatbot's dialog topics.", className="app-description", ), ], className="header-container", ), # File Upload Component html.Div( [ dcc.Upload( id="upload-data", children=html.Div( [ html.Div("Drag and Drop", className="upload-text"), html.Div("or", className="upload-divider"), html.Div( html.Button("Select a File", className="upload-button") ), ], className="upload-content", ), style={ "width": "100%", "height": "120px", "lineHeight": "60px", "borderWidth": "1px", "borderStyle": "dashed", "borderRadius": "0.5rem", "textAlign": "center", "margin": "10px 0", "backgroundColor": "hsl(210, 40%, 98%)", "borderColor": "hsl(214.3, 31.8%, 91.4%)", "cursor": "pointer", }, multiple=False, ), # Status message with more padding and emphasis html.Div( id="upload-status", className="upload-status-message", style={"display": "none"}, # Initially hidden ), ], className="upload-container", ), # Main Content Area (hidden until file is uploaded) html.Div( [ # Dashboard layout with flexible grid html.Div( [ # Left side: Bubble chart html.Div( [ html.H3( id="topic-distribution-header", children="Sessions Observatory", className="section-header", ), # dcc.Graph(id="bubble-chart", style={"height": "80vh"}), dcc.Graph( id="bubble-chart", style={"height": "calc(100% - 154px)"}, ), # this does not work for some reason html.Div( [ # Only keep Color by html.Div( [ html.Div( html.Label( "Color by:", className="control-label", ), className="control-label-container", ), ], className="control-labels-row", ), # Only keep Color by options html.Div( [ html.Div( dcc.RadioItems( id="color-metric", options=[ { "label": "Sentiment", "value": "negative_rate", }, { "label": "Resolution", "value": "unresolved_rate", }, { "label": "Urgency", "value": "urgent_rate", }, ], value="negative_rate", inline=True, className="radio-group", inputClassName="radio-input", labelClassName="radio-label", ), className="radio-container", ), ], className="control-options-row", ), ], className="chart-controls", ), ], className="chart-container", ), # Right side: Interactive sidebar with topic details html.Div( [ html.Div( [ html.H3( "Topic Details", className="section-header" ), html.Div( id="topic-title", className="topic-title" ), html.Div( [ html.Div( [ html.H4( "Metadata", className="subsection-header", ), html.Div( id="topic-metadata", className="metadata-container", ), ], className="metadata-section", ), html.Div( [ html.H4( "Key Metrics", className="subsection-header", ), html.Div( id="topic-metrics", className="metrics-container", ), ], className="metrics-section", ), # Added Tags section html.Div( [ html.H4( "Tags", className="subsection-header", ), html.Div( id="important-tags", className="tags-container", ), ] ), ], className="details-section", ), html.Div( [ html.H4( "Sample Dialogs (Summary)", className="subsection-header", ), html.Div( id="sample-dialogs", className="sample-dialogs-container", ), ], className="samples-section", ), ], className="topic-details-content", ), html.Div( id="no-topic-selected", children=[ html.Div( [ html.I( className="fas fa-info-circle info-icon" ), html.H3("No topic selected"), html.P( "Click or hover on a bubble to view topic details." ), ], className="no-selection-message", ) ], className="no-selection-container", ), ], className="sidebar-container", ), ], className="dashboard-container", ) ], id="main-content", style={"display": "none"}, ), # Store the processed data dcc.Store(id="stored-data"), ], className="app-container", ) # Define CSS for the app app.index_string = """ {%metas%} Sessions Observatory by helvia.ai 🔭📊 {%favicon%} {%css%} {%app_entry%} """ @callback( Output("topic-distribution-header", "children"), Input("stored-data", "data"), ) def update_topic_distribution_header(data): if not data: return "Sessions Observatory" # Default when no data is available df = pd.DataFrame(data) total_dialogs = df["count"].sum() # Sum up the 'count' column return f"Sessions Observatory ({total_dialogs} dialogs)" # Define callback to process uploaded file @callback( [ Output("stored-data", "data"), Output("upload-status", "children"), Output("upload-status", "style"), # Add style output for visibility Output("main-content", "style"), ], [Input("upload-data", "contents")], [State("upload-data", "filename")], ) def process_upload(contents, filename): if contents is None: return None, "", {"display": "none"}, {"display": "none"} # Keep hidden try: # Parse uploaded file content_type, content_string = contents.split(",") decoded = base64.b64decode(content_string) if "csv" in filename.lower(): df = pd.read_csv(io.StringIO(decoded.decode("utf-8"))) elif "xls" in filename.lower(): df = pd.read_excel(io.BytesIO(decoded)) else: return ( None, html.Div( [ html.I( className="fas fa-exclamation-circle", style={"color": "var(--destructive)", "marginRight": "8px"}, ), "Please upload a CSV or Excel file.", ], style={"color": "var(--destructive)"}, ), {"display": "block"}, # Make visible after error {"display": "none"}, ) # Process the dataframe to get topic statistics topic_stats = analyze_topics(df) return ( topic_stats.to_dict("records"), html.Div( [ html.I( className="fas fa-check-circle", style={ "color": "hsl(142.1, 76.2%, 36.3%)", "marginRight": "8px", }, ), f'Successfully uploaded "{filename}"', ], style={"color": "hsl(142.1, 76.2%, 36.3%)"}, ), {"display": "block"}, # maybe add the above line here too #TODO { "display": "block", "height": "calc(100vh - 40px)", }, # Make visible after successful upload ) except Exception as e: return ( None, html.Div( [ html.I( className="fas fa-exclamation-triangle", style={"color": "var(--destructive)", "marginRight": "8px"}, ), f"Error processing file: {str(e)}", ], style={"color": "var(--destructive)"}, ), {"display": "block"}, # Make visible after error {"display": "none"}, ) # Function to analyze the topics and create statistics def analyze_topics(df): # Group by topic name and calculate metrics topic_stats = ( df.groupby("deduplicated_topic_name") .agg( count=("id", "count"), negative_count=("Sentiment", lambda x: (x == "negative").sum()), unresolved_count=("Resolution", lambda x: (x == "unresolved").sum()), urgent_count=("Urgency", lambda x: (x == "urgent").sum()), ) .reset_index() ) # Calculate rates topic_stats["negative_rate"] = ( topic_stats["negative_count"] / topic_stats["count"] * 100 ).round(1) topic_stats["unresolved_rate"] = ( topic_stats["unresolved_count"] / topic_stats["count"] * 100 ).round(1) topic_stats["urgent_rate"] = ( topic_stats["urgent_count"] / topic_stats["count"] * 100 ).round(1) # Apply binned layout topic_stats = apply_binned_layout(topic_stats) return topic_stats # New binned layout function def apply_binned_layout(df, padding=0, bin_config=None, max_items_per_row=6): """ Apply a binned layout where bubbles are grouped into rows based on dialog count. Bubbles in each row will be centered horizontally. Args: df: DataFrame containing the topic data padding: Padding from edges as percentage bin_config: List of tuples defining bin ranges and descriptions. Example: [(300, None, "300+ dialogs"), (250, 299, "250-299 dialogs"), ...] max_items_per_row: Maximum number of items to display in a single row Returns: DataFrame with updated x, y positions """ # Create a copy of the dataframe to avoid modifying the original df_sorted = df.copy() # Default bin configuration if none is provided # 8 rows x 6 bubbles is usually good if bin_config is None: bin_config = [ (100, None, "100+ dialogs"), (50, 99, "50-99 dialogs"), (25, 49, "25-49 dialogs"), (9, 24, "9-24 dialogs"), (7, 8, "7-8 dialogs"), (5, 7, "5-6 dialogs"), (4, 4, "4 dialogs"), (0, 3, "0-3 dialogs"), ] # Generate bin descriptions and conditions dynamically bin_descriptions = {} conditions = [] bin_values = [] for i, (lower, upper, description) in enumerate(bin_config): bin_name = f"Bin {i + 1}" bin_descriptions[bin_name] = description bin_values.append(bin_name) if upper is None: # No upper limit conditions.append(df_sorted["count"] >= lower) else: conditions.append( (df_sorted["count"] >= lower) & (df_sorted["count"] <= upper) ) # Apply the conditions to create the bin column df_sorted["bin"] = np.select(conditions, bin_values, default="Bin 8") df_sorted["bin_description"] = df_sorted["bin"].map(bin_descriptions) # Sort by bin (ascending to get Bin 1 first) and by count (descending) within each bin df_sorted = df_sorted.sort_values(by=["bin", "count"], ascending=[True, False]) # Now split bins that have more than max_items_per_row items original_bins = df_sorted["bin"].unique() new_rows = [] new_bin_descriptions = bin_descriptions.copy() for bin_name in original_bins: bin_mask = df_sorted["bin"] == bin_name bin_group = df_sorted[bin_mask] bin_size = len(bin_group) # If bin has more items than max_items_per_row, split it if bin_size > max_items_per_row: # Calculate how many sub-bins we need num_sub_bins = (bin_size + max_items_per_row - 1) // max_items_per_row # Calculate items per sub-bin (distribute evenly) items_per_sub_bin = [bin_size // num_sub_bins] * num_sub_bins # Distribute the remainder one by one to achieve balance remainder = bin_size % num_sub_bins for i in range(remainder): items_per_sub_bin[i] += 1 # Original bin description original_description = bin_descriptions[bin_name] # Create new row entries and update bin assignments start_idx = 0 for i in range(num_sub_bins): # Create new bin name with sub-bin index new_bin_name = f"{bin_name}_{i + 1}" # Create new bin description with sub-bin index new_description = f"{original_description} ({i + 1}/{num_sub_bins})" new_bin_descriptions[new_bin_name] = new_description # Get slice of dataframe for this sub-bin end_idx = start_idx + items_per_sub_bin[i] sub_bin_rows = bin_group.iloc[start_idx:end_idx].copy() # Update bin name and description sub_bin_rows["bin"] = new_bin_name sub_bin_rows["bin_description"] = new_description # Add to new rows new_rows.append(sub_bin_rows) # Update start index for next iteration start_idx = end_idx # Remove the original bin from df_sorted df_sorted = df_sorted[~bin_mask] # Combine the original dataframe (with small bins) and the new split bins if new_rows: df_sorted = pd.concat([df_sorted] + new_rows) # Re-sort with the new bin names df_sorted = df_sorted.sort_values(by=["bin", "count"], ascending=[True, False]) # Calculate the vertical positions for each row (bin) bins_with_topics = sorted(df_sorted["bin"].unique()) num_rows = len(bins_with_topics) available_height = 100 - (2 * padding) row_height = available_height / num_rows # Calculate and assign y-positions (vertical positions) row_positions = {} for i, bin_name in enumerate(bins_with_topics): # Calculate row position (centered within its allocated space) row_pos = padding + i * row_height + (row_height / 2) row_positions[bin_name] = row_pos df_sorted["y"] = df_sorted["bin"].map(row_positions) # Center the bubbles in each row horizontally center_point = 50 # Middle of the chart (0-100 scale) for bin_name in bins_with_topics: # Get topics in this bin bin_mask = df_sorted["bin"] == bin_name num_topics_in_bin = bin_mask.sum() if num_topics_in_bin == 1: # If there's only one bubble, place it in the center df_sorted.loc[bin_mask, "x"] = center_point else: if num_topics_in_bin < max_items_per_row: # For fewer bubbles, add a little bit of spacing between them # Calculate the total width needed total_width = (num_topics_in_bin - 1) * 17.5 # 10 units between bubbles # Calculate starting position (to center the group) start_pos = center_point - (total_width / 2) # Assign positions positions = [start_pos + (i * 17.5) for i in range(num_topics_in_bin)] df_sorted.loc[bin_mask, "x"] = positions else: # For multiple bubbles, distribute them evenly around the center # Calculate the total width needed total_width = (num_topics_in_bin - 1) * 15 # 15 units between bubbles # Calculate starting position (to center the group) start_pos = center_point - (total_width / 2) # Assign positions positions = [start_pos + (i * 15) for i in range(num_topics_in_bin)] df_sorted.loc[bin_mask, "x"] = positions # Add original rank for reference df_sorted["size_rank"] = range(1, len(df_sorted) + 1) return df_sorted # New function to update positions based on selected size metric def update_bubble_positions(df: pd.DataFrame) -> pd.DataFrame: # For the main chart, we always use the binned layout return apply_binned_layout(df) # Callback to update the bubble chart @callback( Output("bubble-chart", "figure"), [ Input("stored-data", "data"), Input("color-metric", "value"), ], ) def update_bubble_chart(data, color_metric): if not data: return go.Figure() df = pd.DataFrame(data) # Update positions using binned layout df = update_bubble_positions(df) # Always use count for sizing size_values = df["count"] raw_sizes = df["count"] size_title = "Dialog Count" # Apply log scaling to the size values for better visualization # To make the smallest bubble bigger, increase the min_size value (currently 2.5). min_size = 1 # Minimum bubble size if size_values.max() > size_values.min(): # Log-scale the sizes log_sizes = np.log1p(size_values) # Scale to a reasonable range for visualization # To make the biggest bubble smaller, reduce the multiplier (currently 50). size_values = ( min_size + (log_sizes - log_sizes.min()) / (log_sizes.max() - log_sizes.min()) * 50 ) else: # If all values are the same, use a default size size_values = np.ones(len(df)) * 12.5 # DEBUG: Print sizes of bubbles in the first and second bins bins = sorted(df["bin"].unique()) if len(bins) >= 1: first_bin = bins[0] print(f"DEBUG - First bin '{first_bin}' bubble sizes:") first_bin_df = df[df["bin"] == first_bin] for idx, row in first_bin_df.iterrows(): print( f" Topic: {row['deduplicated_topic_name']}, Raw size: {row['count']}, Displayed size: {size_values[idx]}" ) if len(bins) >= 2: second_bin = bins[1] print(f"DEBUG - Second bin '{second_bin}' bubble sizes:") second_bin_df = df[df["bin"] == second_bin] for idx, row in second_bin_df.iterrows(): print( f" Topic: {row['deduplicated_topic_name']}, Raw size: {row['count']}, Displayed size: {size_values[idx]}" ) # Determine color based on selected metric if color_metric == "negative_rate": color_values = df["negative_rate"] # color_title = "Negative Sentiment (%)" color_title = "Negativity (%)" # color_scale = "RdBu" # no ice, RdBu - og is Reds - matter is good too # color_scale = "Portland" # color_scale = "RdYlGn_r" # color_scale = "Teal" color_scale = "Teal" elif color_metric == "unresolved_rate": color_values = df["unresolved_rate"] color_title = "Unresolved (%)" # color_scale = "Burg" # og is YlOrRd # color_scale = "Temps" # color_scale = "Armyrose" # color_scale = "YlOrRd" color_scale = "Teal" else: color_values = df["urgent_rate"] color_title = "Urgency (%)" # color_scale = "Magenta" # og is Blues # color_scale = "Tealrose" # color_scale = "Portland" color_scale = "Teal" # Set all text positions to bottom for consistent layout text_positions = ["bottom center"] * len(df) # Create enhanced hover text that includes bin information hover_text = [ f"Topic: {topic}
{size_title}: {raw:.1f}
{color_title}: {color:.1f}
Group: {bin_desc}" for topic, raw, color, bin_desc in zip( df["deduplicated_topic_name"], raw_sizes, color_values, df["bin_description"], ) ] # Create bubble chart fig = px.scatter( df, x="x", y="y", size=size_values, color=color_values, # text="deduplicated_topic_name", # Remove text here hover_name="deduplicated_topic_name", hover_data={ "x": False, "y": False, "bin_description": True, }, size_max=42.5, # Maximum size of the bubbles, change this to adjust the size color_continuous_scale=color_scale, custom_data=[ "deduplicated_topic_name", "count", "negative_rate", "unresolved_rate", "urgent_rate", "bin_description", ], ) # Update traces: Remove text related properties fig.update_traces( mode="markers", # Remove '+text' marker=dict(sizemode="area", opacity=0.8, line=dict(width=1, color="white")), hovertemplate="%{hovertext}", hovertext=hover_text, ) # Create annotations for the bubbles annotations = [] for i, row in df.iterrows(): # Wrap text every 2 words words = row["deduplicated_topic_name"].split() wrapped_text = "
".join( [" ".join(words[i : i + 4]) for i in range(0, len(words), 4)] ) # Calculate size for vertical offset (approximately based on the bubble size) # Add vertical offset based on bubble size to place text below the bubble marker_size = ( size_values[i] / 20 # type: ignore # FIXME: size_values[df.index.get_loc(i)] / 20 ) # Adjust this divisor as needed to get proper spacing annotations.append( dict( x=row["x"], y=row["y"] + 0.125 # Adding this so in a row with maximum bubbles, the left one does not overlap with the bin label + marker_size, # Add vertical offset to position text below the bubble text=wrapped_text, showarrow=False, textangle=0, font=dict( size=10, # size=8, color="var(--foreground)", family="Arial, sans-serif", weight="bold", ), xanchor="center", yanchor="top", # Anchor to top of text box so it hangs below the bubble bgcolor="rgba(255,255,255,0.7)", # Add semi-transparent background for better readability bordercolor="rgba(0,0,0,0.1)", # Add a subtle border color borderwidth=1, borderpad=1, # TODO: Radius for rounded corners ) ) # Add bin labels and separator lines unique_bins = sorted(df["bin"].unique()) bin_y_positions = [ df[df["bin"] == bin_name]["y"].mean() for bin_name in unique_bins ] # Dynamically extract bin descriptions bin_descriptions = df.set_index("bin")["bin_description"].to_dict() for bin_name, bin_y in zip(unique_bins, bin_y_positions): # Add horizontal line fig.add_shape( type="line", x0=0, y0=bin_y, x1=100, y1=bin_y, line=dict(color="rgba(0,0,0,0.1)", width=1, dash="dot"), layer="below", ) # Add subtle lines for each bin and bin labels for bin_name, bin_y in zip(unique_bins, bin_y_positions): # Add horizontal line fig.add_shape( type="line", x0=0, y0=bin_y, x1=100, y1=bin_y, line=dict(color="rgba(0,0,0,0.1)", width=1, dash="dot"), layer="below", ) # Add bin label annotation annotations.append( dict( x=0, # Position the label on the left side y=bin_y, xref="x", yref="y", text=bin_descriptions[bin_name], showarrow=False, font=dict(size=8.25, color="var(--muted-foreground)"), align="left", xanchor="left", yanchor="middle", bgcolor="rgba(255,255,255,0.7)", borderpad=1, ) ) fig.update_layout( title=None, xaxis=dict( showgrid=False, zeroline=False, showticklabels=False, title=None, range=[0, 100], ), yaxis=dict( showgrid=False, zeroline=False, showticklabels=False, title=None, range=[0, 100], autorange="reversed", # Keep largest at top ), hovermode="closest", margin=dict(l=0, r=0, t=10, b=10), coloraxis_colorbar=dict( title=color_title, title_font=dict(size=9), tickfont=dict(size=8), thickness=10, len=0.6, yanchor="middle", y=0.5, xpad=0, ), legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(0,0,0,0)", hoverlabel=dict(bgcolor="white", font_size=12, font_family="Inter"), annotations=annotations, # Add bin labels as annotations ) return fig # Update the update_topic_details callback to use grayscale colors for tags based on frequency @callback( [ Output("topic-title", "children"), Output("topic-metadata", "children"), Output("topic-metrics", "children"), Output("important-tags", "children"), Output("sample-dialogs", "children"), Output("no-topic-selected", "style"), ], [Input("bubble-chart", "hoverData"), Input("bubble-chart", "clickData")], [State("stored-data", "data"), State("upload-data", "contents")], ) def update_topic_details(hover_data, click_data, stored_data, file_contents): # Determine which data to use (prioritize click over hover) hover_info = hover_data or click_data if not hover_info or not stored_data or not file_contents: return "", [], [], "", [], {"display": "flex"} # Extract topic name from the hover data topic_name = hover_info["points"][0]["customdata"][0] # Get stored data for this topic df_stored = pd.DataFrame(stored_data) topic_data = df_stored[df_stored["deduplicated_topic_name"] == topic_name].iloc[0] # Get original data to sample conversations content_type, content_string = file_contents.split(",") decoded = base64.b64decode(content_string) if ( content_type == "data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64" ): df_full = pd.read_excel(io.BytesIO(decoded)) else: # Assume CSV df_full = pd.read_csv(io.StringIO(decoded.decode("utf-8"))) # Filter to this topic topic_conversations = df_full[df_full["deduplicated_topic_name"] == topic_name] # Create the title title = html.Div([html.Span(topic_name)]) # Create metadata items metadata_items = [ html.Div( [ html.I(className="fas fa-comments metadata-icon"), html.Span(f"{int(topic_data['count'])} dialogs"), ], className="metadata-item", ), ] # Create metrics boxes metrics_boxes = [ html.Div( [ html.Div(f"{topic_data['negative_rate']}%", className="metric-value"), html.Div("Negative Sentiment", className="metric-label"), ], className="metric-box negative", ), html.Div( [ html.Div(f"{topic_data['unresolved_rate']}%", className="metric-value"), html.Div("Unresolved", className="metric-label"), ], className="metric-box unresolved", ), html.Div( [ html.Div(f"{topic_data['urgent_rate']}%", className="metric-value"), html.Div("Urgent", className="metric-label"), ], className="metric-box urgent", ), ] # New: Extract and process consolidated_tags with improved styling tags_list = [] for _, row in topic_conversations.iterrows(): tags_str = row.get("consolidated_tags", "") if pd.notna(tags_str): tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()] tags_list.extend(tags) # Count tag frequencies for better insight tag_counts = {} for tag in tags_list: tag_counts[tag] = tag_counts.get(tag, 0) + 1 # Sort by frequency (most common first) and then alphabetically for ties sorted_tags = sorted(tag_counts.items(), key=lambda x: (-x[1], x[0])) # Keep only the top K tags TOP_K = 15 sorted_tags = sorted_tags[:TOP_K] if sorted_tags: # Create beautifully styled tags with count indicators and consistent color tags_output = html.Div( [ html.Div( [ html.I(className="fas fa-tag topic-tag-icon"), html.Span(f"{tag} ({count})"), ], className="topic-tag", ) for tag, count in sorted_tags ], className="tags-container", ) else: tags_output = html.Div( [ html.I(className="fas fa-info-circle", style={"marginRight": "5px"}), "No tags found for this topic", ], className="no-tags-message", ) # Sample up to 5 random dialogs sample_size = min(5, len(topic_conversations)) if sample_size > 0: sample_indices = random.sample(range(len(topic_conversations)), sample_size) samples = topic_conversations.iloc[sample_indices] dialog_items = [] for _, row in samples.iterrows(): # Create dialog item with tags sentiment_tag = html.Span( row["Sentiment"], className="dialog-tag tag-sentiment" ) resolution_tag = html.Span( row["Resolution"], className="dialog-tag tag-resolution" ) urgency_tag = html.Span(row["Urgency"], className="dialog-tag tag-urgency") # Add Chat ID tag if 'id' column exists chat_id_tag = None if "id" in row: chat_id_tag = html.Span( f"Chat ID: {row['id']}", className="dialog-tag tag-chat-id" ) # Compile all tags, including the new Chat ID tag if available tags = [sentiment_tag, resolution_tag, urgency_tag] if chat_id_tag: tags.append(chat_id_tag) dialog_items.append( html.Div( [ html.Div(row["Summary"], className="dialog-summary"), html.Div( tags, className="dialog-metadata", ), ], className="dialog-item", ) ) sample_dialogs = dialog_items else: sample_dialogs = [ html.Div( "No sample dialogs available for this topic.", style={"color": "var(--muted-foreground)"}, ) ] return ( title, metadata_items, metrics_boxes, tags_output, sample_dialogs, {"display": "none"}, ) if __name__ == "__main__": app.run_server(debug=False)