Skip to content

Advanced Patterns

Advanced techniques for production-ready dashboards.

Advanced Filtering

Hierarchical Filtering

# Level 1: Main categories
@app.callback(
    Output('main-category-dropdown', 'options'),
    Input('page-load', 'id')
)
def populate_main_categories(_):
    return [{'label': cat, 'value': cat} for cat in CATEGORIES.keys()]

# Level 2: Subcategories depend on main
@app.callback(
    Output('sub-category-dropdown', 'options'),
    Input('main-category-dropdown', 'value')
)
def populate_sub_categories(main_cat):
    if not main_cat:
        return []
    return [{'label': sub, 'value': sub} for sub in CATEGORIES[main_cat].keys()]

Server-Side Filtering

@app.callback(
    Output('table', 'data'),
    [Input('category-filter', 'value'),
     Input('year-filter', 'value')]
)
def apply_filters(category, year):
    # Filter in SQL/BigQuery (server-side)
    filters = {'category': category, 'year': year}
    data = dataset_manager.get_dataset('publications', filters=filters)
    return data.to_dict('records')

Caching Strategies

Function-Level Caching

from functools import lru_cache

@lru_cache(maxsize=128)
def expensive_calculation(category, year):
    """Result cached for 128 recent calls"""
    print(f"Computing {category} {year}...")
    return complex_math(category, year)

# First call - computes
result1 = expensive_calculation('A', 2020)

# Second call - cached (no computation)
result2 = expensive_calculation('A', 2020)

Cache with TTL

from datetime import datetime, timedelta

class TTLCache:
    """Cache with expiration time"""

    def __init__(self, ttl_seconds=3600):
        self.cache = {}
        self.ttl = ttl_seconds

    def get(self, key):
        """Get value if not expired"""
        if key not in self.cache:
            return None

        value, timestamp = self.cache[key]
        age = (datetime.now() - timestamp).total_seconds()

        if age > self.ttl:
            del self.cache[key]
            return None

        return value

    def set(self, key, value):
        """Store value with timestamp"""
        self.cache[key] = (value, datetime.now())

Redis Distributed Cache

import redis
import pickle

redis_client = redis.Redis(host='localhost', port=6379, db=0)

class RedisCache:
    """Cache using Redis for distributed systems"""

    def __init__(self, client, ttl_seconds=3600):
        self.client = client
        self.ttl = ttl_seconds

    def get(self, key):
        """Get from Redis"""
        value = self.client.get(key)
        return pickle.loads(value) if value else None

    def set(self, key, value):
        """Store in Redis with TTL"""
        pickled = pickle.dumps(value)
        self.client.setex(key, self.ttl, pickled)

Complex Callbacks

Pattern Matching

Use MATCH to handle dynamic components:

from dash import callback, Input, Output, ALL, MATCH

# One callback handles all filters with same type
@app.callback(
    Output({'type': 'dynamic-output', 'index': MATCH}, 'children'),
    Input({'type': 'dynamic-filter', 'index': MATCH}, 'value')
)
def update_output(filter_value):
    return f'Selected: {filter_value}'

Chained Callbacks

# Step 1: Store category
@app.callback(
    Output('category-store', 'data'),
    Input('category-filter', 'value')
)
def store_category(category):
    return category

# Step 2: Update subcategories based on category
@app.callback(
    Output('subcategory-filter', 'options'),
    Input('category-store', 'data')
)
def update_subcategories(category):
    if not category:
        return []
    return get_subcategories_for_category(category)

# Step 3: Update chart based on both
@app.callback(
    Output('chart', 'figure'),
    [Input('category-filter', 'value'),
     Input('subcategory-filter', 'value')]
)
def update_chart(category, subcategory):
    data = get_data(category, subcategory)
    return px.bar(data, x='X', y='Y')

Preventing Circular Dependencies

# ✅ Use dcc.Store to break circular dependencies
@app.callback(
    Output('data-store', 'data'),
    Input('filter-a', 'value')
)
def store_filter_a(value):
    return {'filter_a': value}

@app.callback(
    Output('chart', 'figure'),
    Input('data-store', 'data')
)
def update_chart(data):
    return create_chart(data)

Performance Optimization

Identifying Bottlenecks

import time

@app.callback(
    Output('chart', 'figure'),
    Input('filter', 'value')
)
def update_chart(filter_value):
    start = time.time()

    # Step 1: Get data
    t1 = time.time()
    data = dataset_manager.get_dataset('publications')
    print(f"[Data] {time.time() - t1:.2f}s")

    # Step 2: Filter data
    t1 = time.time()
    filtered = data[data['category'] == filter_value]
    print(f"[Filter] {time.time() - t1:.2f}s")

    # Step 3: Create chart
    t1 = time.time()
    fig = px.bar(filtered, x='X', y='Y')
    print(f"[Chart] {time.time() - t1:.2f}s")

    print(f"[Total] {time.time() - start:.2f}s")
    return fig

Server-Side Filtering

# ❌ Bad: Load all data, filter in Python
@app.callback(
    Output('table', 'data'),
    Input('category', 'value')
)
def update_table(category):
    all_data = dataset_manager.get_dataset('publications')
    filtered = all_data[all_data['category'] == category]
    return filtered.to_dict('records')

# ✅ Good: Filter in SQL
@app.callback(
    Output('table', 'data'),
    Input('category', 'value')
)
def update_table(category):
    data = dataset_manager.get_dataset(
        'publications',
        filters={'category': category}
    )
    return data.to_dict('records')

Pagination

@app.callback(
    Output('table', 'data'),
    [Input('page', 'value'),
     Input('page-size', 'value')]
)
def paginate_results(page, page_size):
    """Load one page at a time"""
    offset = (page - 1) * page_size

    data = dataset_manager.get_dataset(
        'publications',
        offset=offset,
        limit=page_size
    )

    return data.to_dict('records')

Memory Optimization

@app.callback(
    Output('result', 'children'),
    Input('process-button', 'n_clicks')
)
def process_data(n_clicks):
    try:
        large_data = load_huge_dataset()
        result = expensive_transform(large_data)
        return f'Processed {len(result)} records'
    finally:
        # Clean up
        del large_data
        import gc
        gc.collect()

Testing

Unit Testing Callbacks

import pytest
from app import app

def test_callback_updates_output():
    """Test callback output"""
    callback = app.callback_map['output']['callback']
    result = callback('test input')
    assert result == 'You entered: test input'

Testing with Mock Data

from unittest.mock import patch
import pandas as pd

def test_callback_with_mock():
    """Test callback using mock data"""
    mock_data = pd.DataFrame({
        'category': ['A', 'B', 'A'],
        'value': [10, 20, 30]
    })

    with patch('data.datamanager.DataManager.get_dataset') as mock_get:
        mock_get.return_value = mock_data
        callback = app.callback_map['chart']['callback']
        result = callback('A')
        assert len(result['data'][0]['y']) == 2

Integration Testing

def test_callback_chain():
    """Test multiple callbacks working together"""
    with patch('data.datamanager.DataManager.get_dataset') as mock:
        mock.return_value = pd.DataFrame({'category': ['A'], 'value': [10]})

        # Step 1: Get data
        callback1 = app.callback_map['data-store']['callback']
        data = callback1('load')

        # Step 2: Create chart
        callback2 = app.callback_map['chart']['callback']
        chart = callback2(data)

        assert len(chart['data'][0]['y']) > 0

Error Handling

Try/Except in Callbacks

@app.callback(
    Output('result', 'children'),
    Input('filter', 'value')
)
def get_data(filter_value):
    try:
        data = dataset_manager.get_dataset(
            'publications',
            filters={'category': filter_value}
        )
        return f'Found {len(data)} records'

    except ValueError as e:
        return f'Invalid filter value: {str(e)}'

    except TimeoutError as e:
        return 'Query timed out. Please try again.'

    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        return 'An unexpected error occurred.'

Input Validation

@app.callback(
    Output('result', 'children'),
    [Input('start-date', 'value'),
     Input('end-date', 'value')]
)
def analyze_date_range(start, end):
    # Validate inputs exist
    if not start or not end:
        return 'Please select both dates'

    # Validate logic
    if start > end:
        return 'Start date must be before end date'

    # Process valid data
    data = get_data(start, end)
    return f'Data from {start} to {end}: {len(data)} records'

Logging

import logging

logger = logging.getLogger(__name__)

@app.callback(
    Output('result', 'children'),
    Input('button', 'n_clicks')
)
def do_something(n_clicks):
    try:
        logger.info(f"Processing button click #{n_clicks}")
        result = expensive_operation()
        logger.info(f"Success: {result}")
        return f'Result: {result}'

    except Exception as e:
        logger.error(f"Operation failed: {str(e)}", exc_info=True)
        return 'Error: Operation failed'