Advanced Patterns
Advanced techniques for production-ready dashboards.
Advanced Filtering
Hierarchical Filtering
# Level 1: Main categories
@app.callback(
Output('main-category-dropdown', 'options'),
Input('page-load', 'id')
)
def populate_main_categories(_):
return [{'label': cat, 'value': cat} for cat in CATEGORIES.keys()]
# Level 2: Subcategories depend on main
@app.callback(
Output('sub-category-dropdown', 'options'),
Input('main-category-dropdown', 'value')
)
def populate_sub_categories(main_cat):
if not main_cat:
return []
return [{'label': sub, 'value': sub} for sub in CATEGORIES[main_cat].keys()]
Server-Side Filtering
@app.callback(
Output('table', 'data'),
[Input('category-filter', 'value'),
Input('year-filter', 'value')]
)
def apply_filters(category, year):
# Filter in SQL/BigQuery (server-side)
filters = {'category': category, 'year': year}
data = dataset_manager.get_dataset('publications', filters=filters)
return data.to_dict('records')
Caching Strategies
Function-Level Caching
from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_calculation(category, year):
"""Result cached for 128 recent calls"""
print(f"Computing {category} {year}...")
return complex_math(category, year)
# First call - computes
result1 = expensive_calculation('A', 2020)
# Second call - cached (no computation)
result2 = expensive_calculation('A', 2020)
Cache with TTL
from datetime import datetime, timedelta
class TTLCache:
"""Cache with expiration time"""
def __init__(self, ttl_seconds=3600):
self.cache = {}
self.ttl = ttl_seconds
def get(self, key):
"""Get value if not expired"""
if key not in self.cache:
return None
value, timestamp = self.cache[key]
age = (datetime.now() - timestamp).total_seconds()
if age > self.ttl:
del self.cache[key]
return None
return value
def set(self, key, value):
"""Store value with timestamp"""
self.cache[key] = (value, datetime.now())
Redis Distributed Cache
import redis
import pickle
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class RedisCache:
"""Cache using Redis for distributed systems"""
def __init__(self, client, ttl_seconds=3600):
self.client = client
self.ttl = ttl_seconds
def get(self, key):
"""Get from Redis"""
value = self.client.get(key)
return pickle.loads(value) if value else None
def set(self, key, value):
"""Store in Redis with TTL"""
pickled = pickle.dumps(value)
self.client.setex(key, self.ttl, pickled)
Complex Callbacks
Pattern Matching
Use MATCH to handle dynamic components:
from dash import callback, Input, Output, ALL, MATCH
# One callback handles all filters with same type
@app.callback(
Output({'type': 'dynamic-output', 'index': MATCH}, 'children'),
Input({'type': 'dynamic-filter', 'index': MATCH}, 'value')
)
def update_output(filter_value):
return f'Selected: {filter_value}'
Chained Callbacks
# Step 1: Store category
@app.callback(
Output('category-store', 'data'),
Input('category-filter', 'value')
)
def store_category(category):
return category
# Step 2: Update subcategories based on category
@app.callback(
Output('subcategory-filter', 'options'),
Input('category-store', 'data')
)
def update_subcategories(category):
if not category:
return []
return get_subcategories_for_category(category)
# Step 3: Update chart based on both
@app.callback(
Output('chart', 'figure'),
[Input('category-filter', 'value'),
Input('subcategory-filter', 'value')]
)
def update_chart(category, subcategory):
data = get_data(category, subcategory)
return px.bar(data, x='X', y='Y')
Preventing Circular Dependencies
# ✅ Use dcc.Store to break circular dependencies
@app.callback(
Output('data-store', 'data'),
Input('filter-a', 'value')
)
def store_filter_a(value):
return {'filter_a': value}
@app.callback(
Output('chart', 'figure'),
Input('data-store', 'data')
)
def update_chart(data):
return create_chart(data)
Performance Optimization
Identifying Bottlenecks
import time
@app.callback(
Output('chart', 'figure'),
Input('filter', 'value')
)
def update_chart(filter_value):
start = time.time()
# Step 1: Get data
t1 = time.time()
data = dataset_manager.get_dataset('publications')
print(f"[Data] {time.time() - t1:.2f}s")
# Step 2: Filter data
t1 = time.time()
filtered = data[data['category'] == filter_value]
print(f"[Filter] {time.time() - t1:.2f}s")
# Step 3: Create chart
t1 = time.time()
fig = px.bar(filtered, x='X', y='Y')
print(f"[Chart] {time.time() - t1:.2f}s")
print(f"[Total] {time.time() - start:.2f}s")
return fig
Server-Side Filtering
# ❌ Bad: Load all data, filter in Python
@app.callback(
Output('table', 'data'),
Input('category', 'value')
)
def update_table(category):
all_data = dataset_manager.get_dataset('publications')
filtered = all_data[all_data['category'] == category]
return filtered.to_dict('records')
# ✅ Good: Filter in SQL
@app.callback(
Output('table', 'data'),
Input('category', 'value')
)
def update_table(category):
data = dataset_manager.get_dataset(
'publications',
filters={'category': category}
)
return data.to_dict('records')
Pagination
@app.callback(
Output('table', 'data'),
[Input('page', 'value'),
Input('page-size', 'value')]
)
def paginate_results(page, page_size):
"""Load one page at a time"""
offset = (page - 1) * page_size
data = dataset_manager.get_dataset(
'publications',
offset=offset,
limit=page_size
)
return data.to_dict('records')
Memory Optimization
@app.callback(
Output('result', 'children'),
Input('process-button', 'n_clicks')
)
def process_data(n_clicks):
try:
large_data = load_huge_dataset()
result = expensive_transform(large_data)
return f'Processed {len(result)} records'
finally:
# Clean up
del large_data
import gc
gc.collect()
Testing
Unit Testing Callbacks
import pytest
from app import app
def test_callback_updates_output():
"""Test callback output"""
callback = app.callback_map['output']['callback']
result = callback('test input')
assert result == 'You entered: test input'
Testing with Mock Data
from unittest.mock import patch
import pandas as pd
def test_callback_with_mock():
"""Test callback using mock data"""
mock_data = pd.DataFrame({
'category': ['A', 'B', 'A'],
'value': [10, 20, 30]
})
with patch('data.datamanager.DataManager.get_dataset') as mock_get:
mock_get.return_value = mock_data
callback = app.callback_map['chart']['callback']
result = callback('A')
assert len(result['data'][0]['y']) == 2
Integration Testing
def test_callback_chain():
"""Test multiple callbacks working together"""
with patch('data.datamanager.DataManager.get_dataset') as mock:
mock.return_value = pd.DataFrame({'category': ['A'], 'value': [10]})
# Step 1: Get data
callback1 = app.callback_map['data-store']['callback']
data = callback1('load')
# Step 2: Create chart
callback2 = app.callback_map['chart']['callback']
chart = callback2(data)
assert len(chart['data'][0]['y']) > 0
Error Handling
Try/Except in Callbacks
@app.callback(
Output('result', 'children'),
Input('filter', 'value')
)
def get_data(filter_value):
try:
data = dataset_manager.get_dataset(
'publications',
filters={'category': filter_value}
)
return f'Found {len(data)} records'
except ValueError as e:
return f'Invalid filter value: {str(e)}'
except TimeoutError as e:
return 'Query timed out. Please try again.'
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return 'An unexpected error occurred.'
Input Validation
@app.callback(
Output('result', 'children'),
[Input('start-date', 'value'),
Input('end-date', 'value')]
)
def analyze_date_range(start, end):
# Validate inputs exist
if not start or not end:
return 'Please select both dates'
# Validate logic
if start > end:
return 'Start date must be before end date'
# Process valid data
data = get_data(start, end)
return f'Data from {start} to {end}: {len(data)} records'
Logging
import logging
logger = logging.getLogger(__name__)
@app.callback(
Output('result', 'children'),
Input('button', 'n_clicks')
)
def do_something(n_clicks):
try:
logger.info(f"Processing button click #{n_clicks}")
result = expensive_operation()
logger.info(f"Success: {result}")
return f'Result: {result}'
except Exception as e:
logger.error(f"Operation failed: {str(e)}", exc_info=True)
return 'Error: Operation failed'