#!/usr/bin/env python3 """ User Interaction Analysis Dashboard A comprehensive UI for viewing and analyzing user interactions across Intercom chats and JustCall meetings with priority-based filtering. """ import gradio as gr from pymongo import MongoClient from typing import List, Dict, Any, Tuple, Optional import pandas as pd from loguru import logger # MongoDB Configuration MONGODB_URI = "mongodb+srv://contextdb:HOqIgSH01CoEiMb1@cluster0.d9cmff.mongodb.net/" DATABASE_NAME = "second_brain_course" COLLECTION_NAME = "user_interaction_analyses" class UserInteractionDashboard: """Dashboard for user interaction analyses.""" def __init__(self): """Initialize dashboard with MongoDB connection.""" self.client = MongoClient(MONGODB_URI) self.db = self.client[DATABASE_NAME] self.collection = self.db[COLLECTION_NAME] logger.info(f"Connected to MongoDB: {DATABASE_NAME}.{COLLECTION_NAME}") def get_summary_stats(self) -> Tuple[int, int, int, int, int, int]: """Get summary statistics for the dashboard.""" total_users = self.collection.count_documents({}) # Count by priority high_priority = self.collection.count_documents({"priority_level": "high"}) medium_priority = self.collection.count_documents({"priority_level": "medium"}) low_priority = self.collection.count_documents({"priority_level": "low"}) # Aggregate total conversations and meetings pipeline = [ { "$group": { "_id": None, "total_conversations": {"$sum": "$total_conversations"}, "total_meetings": {"$sum": "$total_meetings"} } } ] agg_result = list(self.collection.aggregate(pipeline)) total_conversations = agg_result[0]["total_conversations"] if agg_result else 0 total_meetings = agg_result[0]["total_meetings"] if agg_result else 0 return ( total_users, total_conversations, total_meetings, high_priority, medium_priority, low_priority ) def get_users_data(self, priority_filter: Optional[str] = None) -> pd.DataFrame: """Get user data for table display with optional priority filter.""" # Build query query = {} if priority_filter and priority_filter != "All": query["priority_level"] = priority_filter.lower() # Fetch documents users = list(self.collection.find(query)) if not users: return pd.DataFrame(columns=[ "User ID", "Conversations", "Meetings", "Conv Key Findings", "Meeting Key Findings", "Priority" ]) # Transform to table format table_data = [] for user in users: user_id = user.get("user_id", "") # Get conversation IDs conv_ids = user.get("conversation_ids", []) conv_ids_str = ", ".join(conv_ids[:3]) # Show first 3 if len(conv_ids) > 3: conv_ids_str += f" (+{len(conv_ids) - 3} more)" # Get meeting IDs meeting_ids = user.get("meeting_ids", []) meeting_ids_str = ", ".join(meeting_ids[:3]) # Show first 3 if len(meeting_ids) > 3: meeting_ids_str += f" (+{len(meeting_ids) - 3} more)" # Get key findings from conversation level conv_insights = user.get("conversation_level_insights", {}) conv_findings = conv_insights.get("aggregated_marketing_insights", {}).get("key_findings", []) conv_findings_str = f"{len(conv_findings)} findings" # Get key findings from meeting level meeting_insights = user.get("meeting_level_insights", {}) meeting_findings = meeting_insights.get("aggregated_marketing_insights", {}).get("key_findings", []) meeting_findings_str = f"{len(meeting_findings)} findings" priority = user.get("priority_level", "unknown").upper() table_data.append({ "User ID": user_id, "Conversations": conv_ids_str, "Meetings": meeting_ids_str, "Conv Key Findings": conv_findings_str, "Meeting Key Findings": meeting_findings_str, "Priority": priority, "_raw": user # Store raw data for detail view }) df = pd.DataFrame(table_data) return df def get_user_detail(self, df: pd.DataFrame, evt: gr.SelectData) -> str: """Get detailed view of selected user.""" if df is None or len(df) == 0: return "No user selected" try: selected_row = evt.index[0] if isinstance(evt.index, list) else evt.index user_data = df.iloc[selected_row]["_raw"] # Build detailed HTML view html = f"""

User Profile: {user_data.get('user_id', 'N/A')}

Priority Level: {user_data.get('priority_level', 'unknown').upper()}

Analysis Date: {user_data.get('analysis_timestamp', 'N/A')}


📊 Overview


💬 Conversation Level Insights (Intercom)

""" # Conversation insights conv_insights = user_data.get("conversation_level_insights", {}) conv_summary = conv_insights.get("conversation_summary", "No summary available") html += f"

Summary: {conv_summary}

" # Conversation quotes conv_marketing = conv_insights.get("aggregated_marketing_insights", {}) conv_quotes = conv_marketing.get("quotes", []) if conv_quotes: html += "

Key Quotes:

" # Conversation findings conv_findings = conv_marketing.get("key_findings", []) if conv_findings: html += "

Key Findings:

" html += "
" # Meeting insights html += "

📞 Meeting Level Insights (JustCall)

" meeting_insights = user_data.get("meeting_level_insights", {}) meeting_summary = meeting_insights.get("meeting_summary", "No summary available") html += f"

Summary: {meeting_summary}

" # Meeting quotes meeting_marketing = meeting_insights.get("aggregated_marketing_insights", {}) meeting_quotes = meeting_marketing.get("quotes", []) if meeting_quotes: html += "

Key Quotes:

" # Meeting findings meeting_findings = meeting_marketing.get("key_findings", []) if meeting_findings: html += "

Key Findings:

" html += "
" # Unified insights html += "

🎯 Unified Insights

" unified_summary = user_data.get("unified_insights", {}).get("unified_summary", "No unified summary available") html += f"

Summary: {unified_summary}

" # User journey user_journey = user_data.get("user_journey_summary", "No journey summary available") html += f"

User Journey:

{user_journey}

" # Cross-interaction patterns patterns = user_data.get("cross_interaction_patterns", []) if patterns: html += "

Cross-Interaction Patterns:

" # Follow-up recommendations recommendations = user_data.get("unified_follow_up_recommendations", "No recommendations available") html += f"

Follow-up Recommendations:

{recommendations}

" html += "
" return html except Exception as e: logger.error(f"Error getting user detail: {e}") return f"Error loading user details: {str(e)}" def filter_by_priority(self, priority: str) -> Tuple[pd.DataFrame, str]: """Filter users by priority level.""" df = self.get_users_data(priority_filter=priority) # Remove the _raw column for display display_df = df.drop(columns=["_raw"]) if "_raw" in df.columns else df return display_df, f"Showing {len(df)} users with {priority} priority" def search_table(self, df: pd.DataFrame, search_term: str) -> pd.DataFrame: """Search across all columns in the table.""" if not search_term or df is None or len(df) == 0: return df # Search across all string columns mask = df.astype(str).apply( lambda row: row.str.contains(search_term, case=False, na=False).any(), axis=1 ) return df[mask] def create_dashboard(): """Create the Gradio dashboard.""" dashboard = UserInteractionDashboard() # Get initial stats total_users, total_convs, total_meetings, high_count, medium_count, low_count = dashboard.get_summary_stats() # Custom CSS for better styling custom_css = """ .priority-btn { font-size: 18px !important; font-weight: bold !important; padding: 15px 30px !important; border-radius: 8px !important; } .stats-box { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; border-radius: 10px; color: white; text-align: center; } """ with gr.Blocks(css=custom_css, title="User Interaction Analysis Dashboard") as demo: # Header gr.Markdown("# 🎯 User Interaction Analysis Dashboard") gr.Markdown("*Analyzing user interactions across Intercom chats and JustCall meetings*") # ============================================================ # SECTION 1: Summary Statistics and Priority Filters # ============================================================ with gr.Row(): with gr.Column(scale=1): gr.Markdown(f"""

{total_users}

Total Users Analyzed

""") with gr.Column(scale=1): gr.Markdown(f"""

{total_convs}

Intercom Conversations

""") with gr.Column(scale=1): gr.Markdown(f"""

{total_meetings}

JustCall Meetings

""") gr.Markdown("---") # Priority Filter Buttons gr.Markdown("### 🎚️ Filter by Priority Level") with gr.Row(): high_btn = gr.Button( f"🔴 High Priority ({high_count})", elem_classes=["priority-btn"], variant="primary", scale=1 ) medium_btn = gr.Button( f"🟡 Medium Priority ({medium_count})", elem_classes=["priority-btn"], variant="secondary", scale=1 ) low_btn = gr.Button( f"🟢 Low Priority ({low_count})", elem_classes=["priority-btn"], variant="secondary", scale=1 ) all_btn = gr.Button( f"⚪ All Users ({total_users})", elem_classes=["priority-btn"], variant="secondary", scale=1 ) filter_status = gr.Textbox( label="Filter Status", value=f"Showing all {total_users} users", interactive=False ) gr.Markdown("---") # ============================================================ # SECTION 2: User Data Table with Search # ============================================================ gr.Markdown("### 📊 User Interaction Data") search_box = gr.Textbox( label="🔍 Search across all columns", placeholder="Search by User ID, Conversation ID, Meeting ID, findings...", scale=1 ) # Get initial data initial_df = dashboard.get_users_data() display_df = initial_df.drop(columns=["_raw"]) if "_raw" in initial_df.columns else initial_df user_table = gr.Dataframe( value=display_df, label="User Interactions", interactive=False, wrap=True ) # Hidden state to store full dataframe with _raw data full_data_state = gr.State(value=initial_df) filtered_data_state = gr.State(value=initial_df) gr.Markdown("---") # ============================================================ # SECTION 3: Detailed User View # ============================================================ gr.Markdown("### 👤 User Details") gr.Markdown("*Click on any row in the table above to see detailed analysis*") user_detail = gr.HTML( value="

Select a user from the table above to view detailed insights

" ) # ============================================================ # Event Handlers # ============================================================ def filter_high(): df = dashboard.get_users_data(priority_filter="High") display = df.drop(columns=["_raw"]) if "_raw" in df.columns else df return display, df, df, f"Showing {len(df)} HIGH priority users" def filter_medium(): df = dashboard.get_users_data(priority_filter="Medium") display = df.drop(columns=["_raw"]) if "_raw" in df.columns else df return display, df, df, f"Showing {len(df)} MEDIUM priority users" def filter_low(): df = dashboard.get_users_data(priority_filter="Low") display = df.drop(columns=["_raw"]) if "_raw" in df.columns else df return display, df, df, f"Showing {len(df)} LOW priority users" def filter_all(): df = dashboard.get_users_data(priority_filter=None) display = df.drop(columns=["_raw"]) if "_raw" in df.columns else df return display, df, df, f"Showing all {len(df)} users" def search_users(search_term: str, current_filtered_df: pd.DataFrame): """Search within currently filtered data.""" if not search_term: # Return the current filtered data display = current_filtered_df.drop(columns=["_raw"]) if "_raw" in current_filtered_df.columns else current_filtered_df return display # Search in the filtered data if current_filtered_df is None or len(current_filtered_df) == 0: return pd.DataFrame() # Create a copy for searching search_df = current_filtered_df.copy() # Search across all visible columns (excluding _raw) visible_cols = [col for col in search_df.columns if col != "_raw"] mask = search_df[visible_cols].astype(str).apply( lambda row: row.str.contains(search_term, case=False, na=False).any(), axis=1 ) result_df = search_df[mask] display = result_df.drop(columns=["_raw"]) if "_raw" in result_df.columns else result_df return display def show_detail(evt: gr.SelectData, full_data: pd.DataFrame): """Show detailed view when row is selected.""" return dashboard.get_user_detail(full_data, evt) # Wire up event handlers high_btn.click( fn=filter_high, outputs=[user_table, filtered_data_state, full_data_state, filter_status] ) medium_btn.click( fn=filter_medium, outputs=[user_table, filtered_data_state, full_data_state, filter_status] ) low_btn.click( fn=filter_low, outputs=[user_table, filtered_data_state, full_data_state, filter_status] ) all_btn.click( fn=filter_all, outputs=[user_table, filtered_data_state, full_data_state, filter_status] ) search_box.change( fn=search_users, inputs=[search_box, filtered_data_state], outputs=[user_table] ) user_table.select( fn=show_detail, inputs=[full_data_state], outputs=[user_detail] ) return demo if __name__ == "__main__": logger.info("Starting User Interaction Analysis Dashboard...") demo = create_dashboard() demo.launch( server_name="0.0.0.0", server_port=7861, share=False )