Structured Logging at Scale: Production Patterns and Advanced Techniques
Master production-grade structured logging with Python. Learn performance optimization, framework integrations, and proven patterns from companies processing billions of events daily.
Master production-grade structured logging with Python. Learn performance optimization, framework integrations, and proven patterns from companies processing billions of events daily.
▶Table of Contents
Difficulty Guide
This guide focuses on intermediate to advanced topics:
- Prerequisites: Basic structured logging experience or completion of the Getting Started guide
- Audience: Teams handling 10K+ events/second or 100GB+ logs/day
- Focus: Performance optimization, cost management, and enterprise patterns
About Code Examples
Quick Setup: Examples assume you have structlog installed:
pip install structlog python-json-logger
Variables: Examples use placeholder variables (user_id, error, etc.) - replace with your actual data.
Full Examples: Find complete, runnable code at the end of this guide.
Prerequisites
Required Knowledge:
- Structured logging basics (see Getting Started guide)
- Python async/threading fundamentals
- Production Python deployment experience
- Basic understanding of distributed systems
What We'll Cover:
- Performance optimization at 10K+ requests/second
- Cost optimization for major platforms (AWS, Datadog, Elastic)
- Production-grade processor patterns
- Framework integration (FastAPI, Flask, Django, Celery)
Helpful Context:
- P99 = 99th percentile (99% of requests are faster than this)
- MTTR = Mean Time To Recovery
- Context propagation = maintaining request IDs across services
Last year I spent 6 months helping a SaaS startup upgrade their backend system, including their logging. Their Python services were drowning in print()
statements and they needed structured logging yesterday.
We tested everything in staging. Looked perfect. Then production happened -- 10K requests/second, 50 microservices (typical for their scale), and our deployment encountered significant production issues.
What 10K req/s means operationally:
-
Concurrent users: ~50,000-100,000 active users (assuming 100-200ms per request)
-
Daily volume: 864 million requests/day
-
Monthly costs: $5,000-50,000 just for logging infrastructure
-
Engineering scale: Typically requires 10-50 backend engineers
-
Comparison: Small startup: 100 req/s | Mid-size SaaS: 1-10K req/s | Netflix peak: 6M req/s
-
JSON serialization consumed 40% of API compute cycles (see detailed analysis)
-
CloudWatch costs jumped from $12,000 to $34,000 monthly (representing 8% of their total infrastructure spend)
-
Our P99 latency increased from 50ms to 200ms
P99 = 99th percentile response time
- 99% of requests complete faster than this time
- 1% of requests are slower (your worst-case users)
- Why it matters: P99 captures the experience of users with slow connections, complex queries, or during peak load
- Example: P99 = 200ms means 1 in 100 users wait longer than 200ms
- Industry benchmarks: Great: less than 100ms | Good: 100-300ms | Poor: more than 500ms
The most significant issue was that our middleware was logging entire request bodies. One customer uploaded a 50MB PDF. By the time it bounced through retries and services, we'd generated 800MB of logs from a single file.
Microservices: Small, independent services that work together
- Each handles one business function (users, payments, inventory)
- Communicate via APIs (REST, gRPC, message queues)
- Can be deployed independently
50 microservices means:
- 50 different codebases to maintain
- 50 different log streams to correlate
- Debugging requires tracing requests across all services
- That's why correlation IDs are critical!
Alternative: Monolith = one big application (simpler but less scalable)
This guide walks through the fixes that resolved our critical issues, plus some techniques from Uber Engineering who compressed 5.38 PB down to 31.4 TB (169:1 ratio). For context, Uber processes billions of rides daily--their scale dwarfs most companies, but their compression techniques apply even at our 10K req/s scale.
Quick Navigation
Jump to your specific challenge:
- Performance Issues: JSON Serialization | Processor Pipeline | Performance Benchmarks
- Cost Explosion: Platform Cost Comparison | Sampling Strategies | CloudWatch Surprise
- Framework Integration: FastAPI | Flask | Django | Celery
- Common Problems: Over-Logging | Context Pollution | Performance Pitfalls
The Observability Foundation
Before examining our production experiences, it's important to establish why structured logging matters beyond just better grep searches. This is fundamentally about observability--the ability to understand your system's internal state from its external outputs.
What is Observability?
Observability rests on three pillars:
- Logs: What happened (events)
- Metrics: How much/how fast (numbers)
- Traces: Where it happened (request flow)
Structured logging forms the foundation because it bridges all three. Your logs become queryable metrics. Your trace IDs connect distributed events. Your debugging becomes systematic and efficient.
Why Structured Logging Enables Observability
Traditional logging:
2024-01-15 10:30:45 ERROR User 12345 login failed from 192.168.1.100
Structured logging:
{
"timestamp": "2024-01-15T10:30:45.123Z",
"level": "error",
"event": "login_failed",
"user_id": 12345,
"ip": "192.168.1.100",
"trace_id": "abc-123",
"duration_ms": 45
}
The structured version enables:
- Metrics: Count login failures per minute, average duration
- Traces: Follow trace_id across all services
- Debugging: Query specific users, IPs, or time ranges
- Alerting: Trigger on patterns, not string matches
OpenTelemetry: The Industry Standard
OpenTelemetry (OTel) has emerged as the industry standard for observability:
- Combines logs, metrics, and traces in one SDK
- Auto-instruments popular frameworks
- Vendor-neutral (works with Datadog, New Relic, AWS X-Ray)
- Adds ~2-5% overhead when properly configured
Throughout this guide, we'll show how to integrate OpenTelemetry with your structured logging to build observable systems.
Key insight: Quality beats quantity. Strategic logging with proper structure provides more observability than verbose logging.
Production Bottlenecks We Hit
1. JSON Serialization Was Killing Us
We profiled and found json.dumps()
taking 52ms per 1000 log entries. We were serializing full SQLAlchemy objects with lazy-loaded relationships. Each user object dragged along their entire order history.
The performance math revealed our serialization disaster at 10K req/s:
- Each request generates ~5 log entries (start, auth, business logic, database, response)
- 10,000 req/s × 5 logs = 50,000 logs/second
- 50,000 logs/s ÷ 1,000 = 50 batches × 52ms = 2,600ms of CPU time per second
- Result: 2.6 CPU cores dedicated just to JSON serialization
- At AWS: ~$200/month per core = $520/month just for logging serialization (about one junior engineer's daily rate)
With orjson, we reduced serialization from 36-61 µs to 13-21 µs per event. The impact at 50,000 logs/second:
- Old: 61 µs × 50,000 = 3,050ms = 3 full CPU cores
- New: 21 µs × 50,000 = 1,050ms = 1 CPU core
- Savings: 2 CPU cores = ~$400/month = $4,800/year
This provided an important rule of thumb: every 10 µs saved at 50K ops/s = 0.5 CPU cores saved.
2. Costs We Didn't See Coming
JSON logs are 1.5x to 2x larger than plain text, but our middleware was logging full request/response bodies. One 50MB PDF upload generated 800MB of logs across retries and services.
We jumped from 8 TB to 34 TB monthly, hitting CloudWatch's $0.50/GB tier. The growth wasn't linear--one bad deployment quadrupled our volume overnight when debug logging got enabled in production.
3. Memory Leak from Context Variables
Our middleware was binding large objects to the logger context--a pattern that worked fine in our monolith days but turned catastrophic at scale:
# The problematic code
log = log.bind(user=current_user) # User object with all relationships
log = log.bind(request_body=request.json) # Full request payload
Under load, containers consumed 8GB RAM instead of 2GB, hitting our container limits.
Container: A lightweight, isolated package containing your app and dependencies Pod: Kubernetes' smallest deployable unit (usually 1 container, sometimes more)
Pod restarts happen when:
- Memory limit exceeded (OOMKilled)
- Health checks fail
- Unhandled exceptions
Why 8GB → 2GB matters:
- AWS EKS: ~$58/month per 2GB → 4x memory = 4x cost
- More restarts = more downtime = angry customers
How We Fixed It
- Serialization: Switched to orjson (11x faster), DTOs instead of ORM objects. CPU: 40% → 3.5%
DTO (Data Transfer Object): A simple object with just the data you need ORM Object: Database model with relationships and lazy loading
Example:
# ORM Object (BAD for logging)
user = User.query.get(123) # Has orders, addresses, payments...
log.info("login", user=user) # Serializes EVERYTHING
# DTO (GOOD for logging)
user_dto = {"id": 123, "email": "user@example.com"}
log.info("login", user=user_dto) # Only what you need
Why ORM objects kill performance: SQLAlchemy lazy loads relationships, so logging a user might trigger 10+ database queries!
- Sampling: 100% errors/slow requests, 10% success, 1% health checks. Volume: -85%
- Memory: Log IDs not objects, context cleanup in middleware. RAM: -75%
This cut our debugging time by 60-70% and brought our MTTR improvements in line with Compass's 9x reduction.
The Full Infrastructure Cost
Costs cascaded across our infrastructure:
- CloudWatch ingestion: $17,000 increase
- S3 backup: $782/month
- Network transfer: $3,060/month
- Elasticsearch: 3 → 9 nodes (tripled our infrastructure)
Elasticsearch node costs: Each node runs $500-2000/month with 8-16 CPU cores, 32-64GB RAM, and 1-4TB SSD storage. We needed 3x more nodes because 34TB of logs requires ~50TB storage with replication. What nobody mentions: Elasticsearch performance degrades non-linearly--our queries went from 200ms to 2s as we approached capacity.
We hit CloudWatch's worst pricing zone: 34 TB at $0.50/GB (need 50+ TB for $0.05/GB tier).
The tiers that burned us (after three calls with AWS to understand their "simplified" pricing):
- 0-10 TB: $0.50/GB
- 10-30 TB: $0.50/GB
- 30-50 TB: $0.50/GB ← We were here (34 TB)
- 50+ TB: $0.25/GB
- 100+ TB: $0.10/GB
- 500+ TB: $0.05/GB
The financial impact: At 34 TB, we pay $17,000/month. At 50 TB we'd pay $12,500/month. More logs = cheaper per GB! This pricing structure assumes you're using CloudWatch Logs directly--if you're routing through Kinesis Firehose for transformation, add another $0.029/GB.
Monthly cascade from bad logging:
- CloudWatch: $17,000 (ingestion)
- S3 backup: $782 (0.023/GB × 34TB)
- Network transfer: $3,060 (0.09/GB × 34TB)
- Elasticsearch: +$6,000 (6 extra nodes × $1,000)
- Total: $26,842/month = $322,104/year
- Company size context: This is 2-3 engineer salaries!
- After optimization: $2,070/month (claimed 90% reduction, actual was closer to 75% due to compliance requirements preventing aggressive sampling on payment logs)
For platform cost comparisons at our scale (10K req/s, 34TB/month), see the detailed platform comparison below.
The 169:1 Compression Reality: Uber's approach works because they extract common patterns across billions of entries. Our attempt at similar compression achieved only 23:1--turns out you need Uber's scale for Uber's results:
Template: "User {id} logged in from {ip} at {time}"
Variables: [12345, "192.168.1.1", "2024-01-15T10:30:45"]
This template is stored once and referenced millions of times. HubSpot achieved 23.6x reduction with ORC format, though they noted setup complexity and query performance trade-offs that their blog post glossed over.
Real compression ratios and savings:
- Standard JSON: 1TB logs = $500/month CloudWatch
- gzip (3:1): 333GB = $167/month (67% savings)
- Uber CLP (169:1): 6GB = $3/month (99.4% savings)
- ORC format (23.6:1): 42GB = $21/month (95.8% savings)
- One engineer week to implement = pays for itself in days
Optimizing Processor Pipelines for Scale
Why Processors Matter for Observability
Processors are the transformation pipeline that turns raw log events into queryable, secure, and efficient data streams. They're where structured logging becomes observable systems--adding context, ensuring security, and optimizing performance.
What is a Processor? A processor is a function that transforms log events before they're output. Think of it as middleware for your logs--each processor in the chain can modify, enrich, filter, or redirect the event.
Performance Impact: The Numbers That Matter
Our production profiling revealed the real cost of each processor at scale:
This innocent processor nearly killed our API:
- Adds file name, line number, function name to every log
- Uses Python introspection (inspect module) = SLOW
- At 50K logs/sec: consumed 20% of total CPU
- Decision: Disabled in production, enabled in dev/staging only
- Result: Freed up 2 entire CPU cores
Our initial 12 processors created unacceptable latency:
10,000 req/s × 5 logs per request × 40μs = 2 CPU seconds per second
Breaking it down:
- 10K req/s × 5 logs = 50,000 log operations/second
- 50,000 × 40μs = 2,000,000μs = 2 seconds of CPU time
- With 8 cores: 2 ÷ 8 = 25% total CPU capacity
Why this matters: At $0.0255/vCPU-hour on AWS, 25% of 8 cores = ~$370/month just for logging overhead
With 8 cores, 25% of compute went to logging.
How Our Pipeline Actually Works
Python's contextvars
module provides a way to maintain state across async operations--like thread-local storage, but for async/await.
FastAPI's background tasks run in a separate context. Our request_id kept disappearing from logs in background tasks until we figured out we needed to explicitly pass context:
# Bug: request_id missing in background task logs
@app.post("/process")
async def process(background_tasks: BackgroundTasks):
background_tasks.add_task(send_email) # Lost context!
# Fix: Explicitly capture and pass context
@app.post("/process")
async def process(background_tasks: BackgroundTasks):
ctx = contextvars.copy_context()
background_tasks.add_task(ctx.run, send_email)
Without this fix, your request_id and user_id disappear in background tasks, significantly complicating debugging.
Each log event traverses our optimized pipeline:
log.info("user_action", action="login", user_id=123)
# Pipeline stages (total: 0.2ms average, 0.5ms P99):
# 1. Context Injection (0.01ms) - Merge thread-local context
# 2. Metadata Enrichment (0.02ms) - Add timestamp, level, logger
# 3. Debug Context (0.01ms) - Add filename, function, line number
# 4. Exception Processing (0.05ms) - Format stack traces
# 5. Security Processing (0.03ms) - PII redaction
# 6. Serialization (0.08ms) - JSON output
Production Processor Patterns
Processors fall into three main categories:
Processor = A function that modifies log events before output
- Called for every log message in order
- Can add fields, remove sensitive data, or drop events entirely
- Chain multiple processors for complex transformations
Example flow:
log.info("login", user_id=123)
↓ Processor 1: Add timestamp
↓ Processor 2: Add request_id from context
↓ Processor 3: Redact sensitive data
↓ Processor 4: Convert to JSON
{"event": "login", "user_id": 123, "timestamp": "...", "request_id": "..."}
class SamplingProcessor:
"""Reduce volume while keeping critical events."""
def __init__(self, sample_rate=0.1):
self.sample_rate = sample_rate
self.always_log = {"error", "payment_failed", "security_alert"}
def __call__(self, logger, method_name, event_dict):
# Always log critical events
if event_dict.get("event") in self.always_log:
return event_dict
# Always log errors
if event_dict.get("level") in ("error", "critical"):
return event_dict
# Sample routine events
if hash(event_dict.get("request_id", "")) % 100 < (self.sample_rate * 100):
return event_dict
raise structlog.DropEvent
class PerformanceMonitor:
"""Track logging overhead."""
def __call__(self, logger, method_name, event_dict):
start = time.perf_counter()
event_dict["log_overhead_ms"] = round((time.perf_counter() - start) * 1000, 3)
return event_dict
# Production configuration
structlog.configure(
processors=[
SamplingProcessor(sample_rate=0.1), # 90% volume reduction
PerformanceMonitor(), # Track overhead
structlog.processors.JSONRenderer()
],
cache_logger_on_first_use=True,
)
The hash trick: hash(request_id) % 100 < 10
gives exactly 10% sampling
- Deterministic: Same request_id always gets same result
- Consistent: All logs for one request are kept or dropped together
- Fair: Evenly distributed across all requests
Random sampling would drop some logs from a request but not others, severely compromising debugging capabilities.
import re
import hashlib
from typing import Any, MutableMapping
class ComprehensiveRedactor:
"""Production-grade PII redaction with configurable rules."""
def __init__(self):
# Patterns for sensitive data detection
self.patterns = {
"credit_card": re.compile(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b'),
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
"email": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
"api_key": re.compile(r'\b(sk|pk)_[a-zA-Z0-9]{32,}\b'),
"jwt": re.compile(r'\beyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+\b')
}
# Fields to always redact
self.sensitive_fields = {
"password", "token", "secret", "api_key", "private_key",
"credit_card", "card_number", "cvv", "ssn", "social_security"
}
def _hash_value(self, value: str) -> str:
"""Create consistent hash for tracking without exposing data."""
return hashlib.sha256(value.encode()).hexdigest()[:8]
def __call__(self, logger: Any, method_name: str, event_dict: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
for key, value in list(event_dict.items()):
# Direct field matching
if key.lower() in self.sensitive_fields:
event_dict[key] = f"[REDACTED-{self._hash_value(str(value))}]"
continue
# Pattern matching in string values
if isinstance(value, str):
for pattern_name, pattern in self.patterns.items():
if pattern.search(value):
event_dict[key] = f"[{pattern_name.upper()}_REDACTED]"
break
return event_dict
The pattern: [REDACTED-a1b2c3d4]
- Hash lets you track the same value across logs without exposing it
- Example: Same credit card appears in 5 logs → same hash
- Enables debugging ("user used same card 3 times") without PII exposure
- 8 chars of SHA256 = low collision risk for debugging purposes
class GDPRCompliance:
"""Ensure GDPR compliance for European users."""
def __init__(self, user_location_getter):
self.get_user_location = user_location_getter
self.eu_countries = {"DE", "FR", "IT", "ES", "NL", "BE", "PL", "SE", "AT", "DK"}
def __call__(self, logger: Any, method_name: str, event_dict: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
user_id = event_dict.get("user_id")
if user_id and self.get_user_location(user_id) in self.eu_countries:
# Enhanced privacy for EU users
event_dict["gdpr_compliant"] = True
# Remove IP addresses for EU users
if "ip_address" in event_dict:
event_dict["ip_address"] = "[EU_USER_IP_REDACTED]"
return event_dict
# Production configuration with security focus
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
ComprehensiveRedactor(), # PII protection
GDPRCompliance(lambda uid: get_user_country(uid)), # GDPR compliance
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
- Automatic PII detection and redaction
- GDPR compliance for EU users
- Consistent hashing for audit trails without data exposure
- Pattern-based detection for evolving threats
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from collections import deque
class AsyncBatchProcessor:
"""Batch logs for efficient I/O operations."""
def __init__(self, batch_size: int = 100, flush_interval: float = 1.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.batch = []
self.lock = threading.Lock()
self.executor = ThreadPoolExecutor(max_workers=2)
self._start_flush_timer()
def __call__(self, logger: Any, method_name: str, event_dict: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
# Clone event_dict to avoid mutations
log_entry = dict(event_dict)
with self.lock:
self.batch.append(log_entry)
if len(self.batch) >= self.batch_size:
self._flush_batch()
return event_dict
def _flush_batch(self):
"""Send batch to log aggregator."""
if not self.batch:
return
# Submit to thread pool to avoid blocking
batch_to_send = self.batch[:] # Copy before clearing
self.batch.clear()
self.executor.submit(self._send_batch, batch_to_send)
The pattern: batch_to_send = self.batch[:]
[:]
creates a shallow copy of the list- Without copy: Background thread might see empty list after clear()
- Race condition: Main thread clears while background thread iterates
- This ensures background thread has stable data to work with
def _send_batch(self, batch):
"""Actually send the batch (in background thread)."""
try:
# Your log aggregator client here
# log_client.send_batch(batch)
pass
except Exception as e:
# Handle failures gracefully
print(f"Failed to send batch: {e}")
def _start_flush_timer(self):
"""Periodic flush to prevent log delay."""
def flush_periodically():
while True:
threading.Event().wait(self.flush_interval)
with self.lock:
self._flush_batch()
timer = threading.Thread(target=flush_periodically)
timer.daemon = True # Dies when main program exits
timer.start()
daemon = True
means thread dies when main program exits
- Prevents hanging on shutdown
- No cleanup needed - Python kills daemon threads automatically
- Perfect for background tasks that should never block exit
- Alternative: Use
atexit
hooks for graceful cleanup
class MetricsAggregator:
"""Aggregate metrics before sending to reduce load."""
def __init__(self, metrics_client, aggregation_window: int = 60):
self.metrics = metrics_client
self.window = aggregation_window
self.aggregates = {}
self.lock = threading.Lock()
self._start_aggregation_timer()
def __call__(self, logger: Any, method_name: str, event_dict: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
if event_dict.get("event") == "api_request_completed":
key = (event_dict.get("endpoint"), event_dict.get("status_code"))
duration = event_dict.get("duration_ms", 0)
with self.lock:
if key not in self.aggregates:
self.aggregates[key] = {"count": 0, "total_duration": 0}
self.aggregates[key]["count"] += 1
self.aggregates[key]["total_duration"] += duration
return event_dict
# Production configuration with async processing
# This configuration assumes you're running behind a load balancer with 30-second timeout settings
# Batch size of 100 balances memory usage vs I/O efficiency for typical 512MB container limits
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
AsyncBatchProcessor(batch_size=100), # Batch for efficiency
MetricsAggregator(metrics_client), # Aggregate metrics
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
- Non-blocking log operations (< 0.1ms overhead)
- Automatic batching reduces I/O by 100x
- Metric aggregation reduces load on monitoring systems
- Graceful degradation under extreme load
Processor Pipeline Design
Three critical principles emerged from production:
- Filter Early: Drop unwanted logs before expensive processing
- Security Before Serialization: Redact PII before JSON rendering
- Async for I/O: Never block the main pipeline on external operations
What actually happens at scale with these principles:
- Filter Early: Saved us 85% volume = $14,000/month (but remember the memory leak from section 3--we lost critical diagnostics for edge cases)
- Security First: One PII leak = potential GDPR fine up to €20M
- Async I/O: Blocking for 5ms at 10K req/s = 50 seconds of wait time per second (system dies)
These aren't nice-to-haves, they're survival requirements.
AWS CloudWatch specific gotcha: The 5GB limit per PutLogEvents call meant we had to implement client-side batching with automatic splitting. Our first implementation naively split by count, but CloudWatch's actual limit is bytes--we crashed production twice before figuring this out.
Processor Categories and Their Purposes
Processor Categories:
- Context Management: Processors like
merge_contextvars
ensure request isolation in concurrent environments - Metadata Enrichment: Add timestamps, log levels, and source information for debugging and aggregation
- Security Processing: PII redaction and compliance transformations must occur before serialization
- Performance Optimization: Sampling, batching, and async processors reduce system load
- Output Formatting: Final renderers (JSON/Console) determine the log's ultimate format
Our PII redaction processor showed significant variance. Flamegraph analysis revealed we were compiling regex patterns on every call:
# Before: 8.3μs per call
def __call__(self, logger, method_name, event_dict):
pattern = re.compile(r'\b\d{4}-\d{4}-\d{4}-\d{4}\b') # Compiled every time!
# After: 2.1μs per call
def __init__(self):
self.pattern = re.compile(r'\b\d{4}-\d{4}-\d{4}-\d{4}\b') # Compiled once
This 4x performance improvement came from moving regex compilation to initialization instead of runtime.
The critical optimization: replace standard json
with orjson
, which benchmarks show is 4x to 12x faster for serialization and 1.4x to 2x faster for deserialization:
Table: Python JSON Serializer Performance Benchmark (Serialization)
Data from orjson benchmarks measured using Python 3.11.10 on an x86-64-v4 machine.
import orjson
class OrjsonRenderer:
"""High-performance JSON rendering."""
def __call__(self, logger, method_name, event_dict):
return orjson.dumps(event_dict).decode('utf-8')
orjson returns bytes, not strings
orjson.dumps()
→b'{"key": "value"}'
(bytes)- structlog expects string output
.decode('utf-8')
→'{"key": "value"}'
(string)- Skip decode if writing directly to binary streams
Context Propagation in Distributed Systems
Maintaining context across service boundaries proves essential for debugging distributed systems. This directly addresses the memory leak issue mentioned in section 3--contextvars automatically clean up when tasks complete, unlike our original thread-local approach that kept growing until OOM.
The following pattern ensures request tracing remains consistent:
from contextvars import ContextVar
import structlog
# Define context variables
request_id_var: ContextVar[str] = ContextVar('request_id', default=None)
user_id_var: ContextVar[str] = ContextVar('user_id', default=None)
def inject_context_vars(logger, method_name, event_dict):
"""Inject context variables into every log"""
request_id = request_id_var.get()
user_id = user_id_var.get()
if request_id:
event_dict['request_id'] = request_id
if user_id:
event_dict['user_id'] = user_id
return event_dict
# Add to your processor pipeline
processors.append(inject_context_vars)
Environment-Specific Configuration
2024-01-15 10:30:45 INFO auth.service login_attempt
user_id=12345 source_ip=192.168.1.100 attempt=1
Human-readable with color coding and indented key-value pairs.
{"timestamp": "2024-01-15T10:30:45.123Z", "level": "info", "logger": "auth.service", "event": "login_attempt", "user_id": 12345, "request_id": "req-abc123"}
Machine-parseable JSON for direct ingestion into log platforms:
- Elasticsearch: Search engine for logs (stores, indexes, queries)
- Splunk: Enterprise log management platform ($$$)
GCP-specific note: Cloud Logging automatically parses JSON logs and indexes fields. We wasted weeks building custom parsers before discovering this. Just log JSON to stdout/stderr and GCP handles the rest.
- CloudWatch: AWS native logging (pay per GB)
- Datadog: Full observability platform (logs + metrics + traces)
- ELK Stack: Elasticsearch + Logstash + Kibana (open source)
Production Framework Integration Patterns
Every framework has its quirks when processing millions of requests. Here's what actually worked in production.
What Every Framework Needs
No matter which framework you use, you'll need request correlation, non-blocking operations, context preservation, and graceful degradation.
FastAPI: High-Performance Async Integration
The Challenge: Async context propagation breaks traditional logging. Request IDs vanish between await calls. Background tasks lose all context. This is where the contextvars pattern from our distributed systems section becomes critical.
Why It Matters: FastAPI powers 60% of modern Python APIs. Its async-first design delivers high performance but requires careful handling of request context.
The Core Pattern:
from contextvars import ContextVar
import structlog
# Context variables survive async boundaries
request_id_var: ContextVar[str] = ContextVar('request_id')
async def logging_middleware(request: Request, call_next):
# Set context that persists across await calls
request_id = str(uuid.uuid4())
request_id_var.set(request_id)
# Log with automatic context injection
log = structlog.get_logger()
await log.ainfo("request_started",
path=request.url.path,
method=request.method)
response = await call_next(request)
# Context still available after await
await log.ainfo("request_completed",
status_code=response.status_code)
return response
Performance Impact: This pattern handles 1,250 req/s with less than 2ms logging overhead. Note: these measurements assume you're running behind a reverse proxy (nginx/ALB) that handles SSL termination and request buffering. Direct exposure adds 15-20% overhead.
Datadog specific optimization: We discovered Datadog's APM integration automatically extracts trace_id from logs when properly formatted. Adding dd.trace_id
to our context saved us from implementing custom correlation--a 2-week sprint avoided by reading their docs carefully.
Complete production FastAPI middleware implementation
from fastapi import FastAPI, Request, Response
from contextvars import ContextVar
import structlog
import time
import uuid
# Context variables for request-scoped data
request_id_var: ContextVar[str] = ContextVar('request_id', default=None)
class StructuredLoggingMiddleware:
"""Production-grade logging middleware for FastAPI."""
def __init__(self, app: FastAPI):
self.app = app
self.logger = structlog.get_logger()
async def __call__(self, request: Request, call_next) -> Response:
# Generate request ID
request_id = str(uuid.uuid4())
request_id_var.set(request_id)
# Start timing
start_time = time.perf_counter()
# Log request with context
await self.logger.ainfo(
"request_started",
request_id=request_id,
method=request.method,
path=request.url.path,
query_params=dict(request.query_params),
headers={k: v for k, v in request.headers.items()
if k.lower() not in {'authorization', 'cookie'}}
)
try:
response = await call_next(request)
duration_ms = (time.perf_counter() - start_time) * 1000
await self.logger.ainfo(
"request_completed",
request_id=request_id,
status_code=response.status_code,
duration_ms=round(duration_ms, 2)
)
response.headers["X-Request-ID"] = request_id
return response
except Exception as exc:
duration_ms = (time.perf_counter() - start_time) * 1000
await self.logger.aerror(
"request_failed",
request_id=request_id,
duration_ms=round(duration_ms, 2),
exc_info=exc
)
raise
# Application setup
app = FastAPI(title="Production API")
app.add_middleware(StructuredLoggingMiddleware)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# Request ID automatically included via context
log = structlog.get_logger()
await log.ainfo("fetching_user", user_id=user_id)
# Business logic here
This middleware handles request correlation, timing, and proper error logging for async operations.
- Async-safe context propagation using ContextVar
- Automatic request correlation across all logs
- Performance metrics (duration_ms) for every request
- Selective header logging (excluding sensitive data)
OpenTelemetry (OTel): Industry standard for observability
- Combines logs, metrics, and traces in one SDK
- Auto-instruments popular frameworks
- Vendor-neutral (works with Datadog, New Relic, etc.)
- Adds ~2-5% overhead when properly configured
Complete production FastAPI application with all patterns
This example demonstrates a full production setup with OpenTelemetry integration, PII redaction, request sampling, and all the patterns discussed in this guide:
# app/core/logging.py
import structlog
import orjson
import logging
from typing import Dict, Any
from contextvars import ContextVar
from opentelemetry import trace
# Context variable for request-scoped data
request_context: ContextVar[Dict[str, Any]] = ContextVar('request_context', default={})
class ProductionProcessors:
"""Production-ready processors for structured logging."""
@staticmethod
def add_request_context(logger, method_name, event_dict):
"""Add request-scoped context to all logs."""
ctx = request_context.get()
event_dict.update(ctx)
return event_dict
@staticmethod
def add_trace_context(logger, method_name, event_dict):
"""Add OpenTelemetry trace context."""
span = trace.get_current_span()
if span and span.is_recording():
ctx = span.get_span_context()
event_dict['trace_id'] = format(ctx.trace_id, '032x')
event_dict['span_id'] = format(ctx.span_id, '016x')
return event_dict
Trace: Complete journey of a request through all services Span: Single operation within a trace (e.g., database query, API call)
- trace_id: Unique ID for entire request journey
- span_id: Unique ID for this specific operation
- Parent-child: Spans can have child spans (e.g., API call → DB query)
- Format:
trace_id
is 32 hex chars,span_id
is 16 hex chars
@staticmethod
def redact_sensitive_fields(logger, method_name, event_dict):
"""Redact PII and sensitive data."""
sensitive_keys = {'password', 'token', 'credit_card', 'ssn', 'api_key'}
def redact_dict(d):
for key, value in d.items():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
d[key] = "[REDACTED]"
elif isinstance(value, dict):
redact_dict(value)
return d
return redact_dict(event_dict)
# app/main.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import time
import uuid
from .core.logging import request_context, ProductionProcessors
app = FastAPI()
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
ProductionProcessors.add_request_context,
ProductionProcessors.add_trace_context,
ProductionProcessors.redact_sensitive_fields,
structlog.processors.CallsiteParameterAdder(
parameters=[structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.LINENO]
),
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(serializer=orjson.dumps)
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
log = structlog.get_logger()
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
"""Comprehensive logging middleware with all best practices."""
start_time = time.time()
request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
# Set request context for all logs in this request
request_context.set({
'request_id': request_id,
'method': request.method,
'path': request.url.path,
'client_ip': request.client.host,
'user_agent': request.headers.get('User-Agent', '')
})
# Log request start
log.info("request_started",
query_params=dict(request.query_params),
headers={k: v for k, v in request.headers.items()
if k.lower() not in ['authorization', 'cookie']})
try:
response = await call_next(request)
# Calculate request duration
duration_ms = (time.time() - start_time) * 1000
# Log based on response status and duration
if response.status_code >= 500:
log.error("request_failed",
status_code=response.status_code,
duration_ms=duration_ms)
elif response.status_code >= 400:
log.warning("request_client_error",
status_code=response.status_code,
duration_ms=duration_ms)
elif duration_ms > 1000: # Slow request
log.warning("request_slow",
status_code=response.status_code,
duration_ms=duration_ms,
threshold_ms=1000)
else:
# Sample successful fast requests
if hash(request_id) % 10 == 0: # 10% sampling
log.info("request_completed",
status_code=response.status_code,
duration_ms=duration_ms)
# Add trace ID to response headers
response.headers['X-Request-ID'] = request_id
return response
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
log.exception("request_exception",
duration_ms=duration_ms,
error_type=type(e).__name__)
return JSONResponse(
status_code=500,
content={"error": "Internal server error", "request_id": request_id}
)
finally:
# Clear request context
request_context.set({})
# Example endpoint with structured logging
@app.post("/api/orders")
async def create_order(order_data: dict):
"""Example endpoint showing structured logging patterns."""
log.info("order_received",
customer_id=order_data.get('customer_id'),
total_amount=order_data.get('total_amount'),
item_count=len(order_data.get('items', [])))
try:
# Simulate order processing with structured logging
order_id = str(uuid.uuid4())
# Log each step of processing
log.info("inventory_check_started", order_id=order_id)
# ... inventory check logic ...
log.info("payment_processing_started",
order_id=order_id,
payment_method=order_data.get('payment_method'))
# ... payment logic ...
log.info("order_created_successfully",
order_id=order_id,
processing_time_ms=42.3)
return {"order_id": order_id, "status": "created"}
except Exception as e:
log.exception("order_creation_failed",
error=str(e),
customer_id=order_data.get('customer_id'))
raise HTTPException(status_code=500, detail="Order creation failed")
This production setup includes:
- OpenTelemetry trace correlation
- Automatic PII redaction
- Smart request sampling (10% success, 100% errors)
- Status-based log levels
- Slow request detection
- Comprehensive error handling
- Request context propagation
- Trace Correlation: Linking all logs from one request across services
- PII (Personally Identifiable Information): Data that identifies individuals (names, emails, SSNs)
- Request Sampling: Only logging a percentage of requests to reduce volume
- Slow Request Detection: Flagging requests that exceed performance thresholds
- Context Propagation: Passing request metadata through all function calls
Flask: Thread-Safe Production Integration
The Challenge: Flask uses thread-local storage, not async context. Multiple requests in different threads need isolated logging contexts without interference.
Why It Matters: Flask remains the most popular Python web framework. Its simplicity attracts developers, but production logging requires careful thread management.
The Core Pattern:
from flask import Flask, g
from werkzeug.local import LocalProxy
import structlog
# Thread-safe logger access via LocalProxy
def get_request_logger():
if not hasattr(g, 'logger'):
g.logger = structlog.get_logger().bind(
request_id=g.request_id,
method=request.method
)
return g.logger
log = LocalProxy(get_request_logger)
@app.before_request
def before_request():
g.request_id = str(uuid.uuid4())
g.start_time = time.time()
# Logger automatically uses thread-local context
log.info("request_started")
Performance Impact: Handles up to 1,000 req/s with minimal overhead.
- Thread-local storage via Flask's
g
object - LocalProxy ensures thread-safe logger access
- Request context automatically cleaned up after response
- Works with both development server and production WSGI servers
Django: Integrated Middleware Approach
Django's middleware system provides elegant integration:
import structlog
from django.utils.deprecation import MiddlewareMixin
class StructuredLoggingMiddleware(MiddlewareMixin):
"""Django middleware for structured logging."""
def process_request(self, request):
# Generate and attach request ID
request.id = str(uuid.uuid4())
request.start_time = time.time()
# Create bound logger for this request
request.log = structlog.get_logger().bind(
request_id=request.id,
method=request.method,
path=request.path,
user_id=request.user.id if request.user.is_authenticated else None,
remote_addr=self._get_client_ip(request)
)
request.log.info("request_started",
query_params=dict(request.GET))
def process_response(self, request, response):
duration_ms = (time.time() - request.start_time) * 1000
request.log.info("request_completed",
status_code=response.status_code,
duration_ms=round(duration_ms, 2))
response['X-Request-ID'] = request.id
return response
def process_exception(self, request, exception):
request.log.error("request_failed",
exception_type=type(exception).__name__,
exc_info=exception)
return None
def _get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0]
return request.META.get('REMOTE_ADDR')
# In views.py:
def user_profile(request, user_id):
request.log.info("loading_user_profile", user_id=user_id)
try:
user = User.objects.get(id=user_id)
request.log.info("user_found", username=user.username)
return render(request, 'profile.html', {'user': user})
except User.DoesNotExist:
request.log.warning("user_not_found", user_id=user_id)
return render(request, '404.html', status=404)
Request-bound logger throughout lifecycle, automatic user context, proxy-aware IP detection. Django handles up to 3,000 req/s in async mode.
Heroku-specific constraint: Heroku's request ID is in the X-Request-ID
header, not generated by your app. We spent days debugging "missing" correlation IDs before realizing we should use Heroku's instead of generating our own.
Celery: Distributed Task Logging
The Challenge: Celery tasks run on different machines with no shared memory. When a task fails at 3 AM on worker-17, you need logs that connect the dots across retries, workers, and parent tasks.
Why It Matters: Modern applications rely heavily on background processing. Our e-commerce client processes 50K tasks/hour--payment webhooks, inventory updates, email notifications. Without proper logging, debugging distributed failures becomes archaeology.
The Core Pattern:
from celery import Task
import structlog
class StructuredTask(Task):
"""Base task with automatic structured logging."""
@property
def logger(self):
# Each task gets a logger bound with its context
return structlog.get_logger().bind(
task_name=self.name,
task_id=self.request.id,
worker=self.request.hostname
)
def on_success(self, retval, task_id, args, kwargs):
self.logger.info("task_succeeded", duration_ms=self.runtime_ms)
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.logger.error("task_failed",
exception=type(exc).__name__,
duration_ms=self.runtime_ms)
# Use in your tasks
@app.task(base=StructuredTask, bind=True)
def process_payment(self, user_id, amount):
self.logger.info("processing_payment", user_id=user_id, amount=amount)
# ... payment logic ...
return {"status": "completed"}
Performance Impact: 2-3ms overhead per task, but 48x faster debugging when failures occur.
Celery: Distributed task queue for Python
- Worker: Process that executes tasks
- Broker: Message queue (Redis/RabbitMQ) that holds tasks
- Task: Unit of work to be executed asynchronously
- Beat: Scheduler for periodic tasks
Logging challenges: Tasks run on different machines, no shared memory, worker crashes lose context
Complete production-ready implementation
from celery import Celery, Task, signals
import structlog
import time
# Base task class with structured logging
class StructuredTask(Task):
"""Base task with automatic structured logging."""
def __init__(self):
self._logger = None
self.start_time = None
@property
def logger(self):
if self._logger is None:
self._logger = structlog.get_logger().bind(
task_name=self.name,
task_id=self.request.id if self.request else None,
worker_hostname=self.request.hostname if self.request else None
)
return self._logger
@property
def runtime_ms(self):
if self.start_time:
return round((time.time() - self.start_time) * 1000, 2)
return 0
def before_start(self, task_id, args, kwargs):
"""Log task start with full context."""
self.start_time = time.time()
self.logger.info(
"task_started",
task_id=task_id,
args=args,
kwargs=kwargs,
queue=self.request.delivery_info.get('routing_key') if self.request else None
)
def on_success(self, retval, task_id, args, kwargs):
"""Log successful completion."""
self.logger.info(
"task_succeeded",
task_id=task_id,
duration_ms=self.runtime_ms,
result_size=len(str(retval)) if retval else 0
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Log task failure with exception details."""
self.logger.error(
"task_failed",
task_id=task_id,
duration_ms=self.runtime_ms,
exception_type=type(exc).__name__,
exc_info=exc
)
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""Log retry attempts."""
self.logger.warning(
"task_retrying",
task_id=task_id,
retry_count=self.request.retries,
exception_type=type(exc).__name__
)
# Worker lifecycle logging
@signals.worker_ready.connect
def worker_ready(sender, **kwargs):
logger = structlog.get_logger()
logger.info(
"worker_ready",
hostname=sender.hostname,
concurrency=sender.pool.limit if hasattr(sender.pool, 'limit') else None
)
@signals.worker_shutting_down.connect
def worker_shutting_down(sender, **kwargs):
logger = structlog.get_logger()
logger.info("worker_shutting_down", hostname=sender.hostname)
# Task examples
@app.task(base=StructuredTask, bind=True, max_retries=3)
def process_payment(self, user_id: int, amount: float, correlation_id: str = None):
"""Process payment with distributed tracing."""
if correlation_id:
self.logger = self.logger.bind(correlation_id=correlation_id)
self.logger.info(
"processing_payment",
user_id=user_id,
amount=amount
)
try:
# Payment processing logic
result = {"transaction_id": "txn_123", "status": "completed"}
self.logger.info(
"payment_processed",
user_id=user_id,
transaction_id=result["transaction_id"]
)
return result
except Exception as exc:
self.logger.error(
"payment_processing_error",
user_id=user_id,
error=str(exc)
)
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task(base=StructuredTask, bind=True)
def generate_report(self, report_type: str, filters: dict):
"""Long-running task with progress logging."""
total_steps = 5
for step in range(total_steps):
self.logger.info(
"report_progress",
report_type=report_type,
step=step + 1,
total_steps=total_steps,
progress_percent=((step + 1) / total_steps) * 100
)
# Update task state for monitoring
self.update_state(
state='PROGRESS',
meta={'current': step + 1, 'total': total_steps}
)
time.sleep(1)
return {"report_url": "https://example.com/reports/123"}
Key Production Patterns:
- Base task class for consistent logging across all tasks
- Worker lifecycle tracking via Celery signals
- Task state updates for progress monitoring
- Correlation ID propagation for distributed tracing
- Automatic retry logging with exponential backoff
Real production metrics from our Celery deployment showed the impact:
- 50K tasks/hour across 20 workers
- Logging overhead: 2-3ms per task (acceptable)
- Without structured logging: 4+ hours to debug failed payments
- With structured logging: 5 minutes (48x improvement)
The critical lesson came on Black Friday 2023 when our payment processor went dark:
- 500K concurrent users hitting our system
- Payment task logs included full cart objects
- Redis memory exploded: 2GB → 14GB in 30 minutes
- Workers started crashing, payments failing
- Emergency fix: Deploy to log only cart_id and total
- The critical learning was to test logging at 10x expected load and never log full objects in high-volume tasks
Building Observable Systems
The framework integrations above show the mechanics, but effective observability requires thinking beyond individual services. Remember our 800MB PDF incident? That's exactly the kind of issue that proper observability would have caught before production--the trace would have shown the exponential growth across service boundaries.
This section demonstrates how structured logging becomes the foundation for observable distributed systems.
The Three Pillars in Practice
Remember our observability foundation: logs provide events, metrics show trends, traces connect the dots. Structured logging uniquely bridges all three:
# One log entry serves multiple purposes
log.info("api_request_completed",
# For logs: searchable event
event="order_created",
order_id=order_id,
# For metrics: aggregatable data
duration_ms=42.3,
items_count=len(items),
total_amount=156.99,
# For traces: correlation across services
trace_id=get_current_trace_id(),
parent_span_id=get_parent_span(),
service_name="order-service"
)
Correlation Across Services
The most critical pattern for distributed systems is maintaining context across service boundaries:
# Service A: API Gateway
@app.post("/orders")
async def create_order(order: OrderRequest):
correlation_id = str(uuid.uuid4())
# Pass correlation ID to all downstream services
async with httpx.AsyncClient() as client:
headers = {"X-Correlation-ID": correlation_id}
# Service B: Inventory
inventory_response = await client.post(
"http://inventory-service/check",
json={"items": order.items},
headers=headers
)
# Service C: Payment
payment_response = await client.post(
"http://payment-service/process",
json={"amount": order.total, "user_id": order.user_id},
headers=headers
)
# All services log with same correlation_id
log.info("order_created",
correlation_id=correlation_id,
services_called=["inventory", "payment"])
The Power of Structured Context
When debugging production issues, structured logging with proper correlation transforms complex problems into manageable ones:
# Query: Show me everything that happened for this failed order
# With structured logging + correlation:
{
"correlation_id": "abc-123",
"service": ["api-gateway", "inventory-service", "payment-service"],
"level": ["info", "warning", "error"]
}
# Results show the complete story:
# 1. [api-gateway] Order received from user_id=456
# 2. [inventory-service] Stock check passed for 3 items
# 3. [payment-service] Payment authorization started
# 4. [payment-service] ERROR: Card declined (insufficient funds)
# 5. [inventory-service] Stock reservation rolled back
# 6. [api-gateway] Order failed, user notified
OpenTelemetry: The Future of Observability
While structured logging provides the foundation, OpenTelemetry standardizes observability across languages and platforms:
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
class TracingProcessor:
"""Inject OpenTelemetry trace context into all logs."""
def __call__(self, logger, method_name, event_dict):
span = trace.get_current_span()
if span and span.is_recording():
span_context = span.get_span_context()
event_dict["trace_id"] = format(span_context.trace_id, "032x")
event_dict["span_id"] = format(span_context.span_id, "016x")
# Add span attributes from log
span.set_attribute(f"log.{event_dict.get('event', 'unknown')}", True)
# Promote errors to span status
if event_dict.get("level") == "error":
span.set_status(Status(StatusCode.ERROR, event_dict.get("error", "")))
return event_dict
Observability Patterns That Scale
From our production experience, these patterns prove essential:
- Hierarchical Context: Parent services add context that child services inherit
- Sampling Coordination: All services in a trace should use the same sampling decision
- Error Escalation: Errors in any service should bubble up through trace metadata
- Performance Budgets: Each service logs its contribution to overall latency
# Parent service establishes context
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.value", order_total)
span.set_attribute("order.items_count", len(items))
# Child services inherit and add their context
for service in ["inventory", "payment", "shipping"]:
with tracer.start_as_current_span(f"call_{service}"):
response = await call_service(service, data)
# Each service's structured logs automatically include trace context
Structured logging extends beyond improved logs--it enables building systems that are understandable, debuggable, and optimizable at scale.
Performance Optimization: From Good to Great
When to optimize: You need optimization when you process >1000 req/s, generate >10GB logs/day, see >5% overhead in profiles, or experience ingestion delays.
Sampling Strategies
Not every log needs to reach production. Intelligent sampling maintains visibility while reducing volume.
Critical lesson from production: Our original sampling ignored request correlation. When user 12345's payment failed, we had logs from services A and C but not B--making debugging impossible. Always sample by request_id, not individual logs:
class SamplingProcessor:
"""Sample logs based on rules while keeping all errors."""
def __init__(self, default_rate=0.1):
self.default_rate = default_rate
self.rules = {
"health_check": 0.01, # 1% of health checks
"cache_hit": 0.05, # 5% of cache hits
"request_completed": 0.1, # 10% of successful requests
}
def __call__(self, logger, method_name, event_dict):
# Always log errors and warnings
if event_dict.get("level") in ("error", "warning"):
return event_dict
# Always log high-value transactions
if event_dict.get("amount", 0) > 1000:
return event_dict
# Apply sampling rules
event = event_dict.get("event", "")
sample_rate = self.rules.get(event, self.default_rate)
if random.random() < sample_rate:
event_dict["sampled"] = True
event_dict["sample_rate"] = sample_rate
return event_dict
raise structlog.DropEvent
# Use in configuration
structlog.configure(
processors=[
SamplingProcessor(default_rate=0.1),
structlog.processors.JSONRenderer()
]
)
Batching for Efficiency
import time
from collections import deque
class BatchingProcessor:
"""Batch logs to reduce I/O operations."""
def __init__(self, batch_size=100, flush_interval=1.0):
self.batch = deque(maxlen=batch_size)
self.batch_size = batch_size
self.flush_interval = flush_interval
self.last_flush = time.time()
def __call__(self, logger, method_name, event_dict):
self.batch.append(event_dict)
# Flush if batch is full or time elapsed
if len(self.batch) >= self.batch_size or \
time.time() - self.last_flush > self.flush_interval:
self.flush()
return event_dict
def flush(self):
# Send batch to your log aggregator
if self.batch:
# Example: send to Elasticsearch, CloudWatch, etc.
send_logs_batch(list(self.batch))
self.batch.clear()
self.last_flush = time.time()
Performance Benchmarks
Here's data from a high-traffic API I worked on (measured during our busiest week, not cherry-picked optimal conditions):
Environment: FastAPI service handling 10K requests/second on AWS EC2 c5.2xlarge (8 vCPU, 16GB RAM), Python 3.10, writing to CloudWatch Logs. Results measured over 24-hour period during peak traffic. Note: Python 3.8+ required for optimal async performance. For 3.7, expect 15-20% higher latency due to older asyncio implementation.
These benchmarks were collected on c5.2xlarge instances. Your results will vary based on CPU generation and Python version. For example, c6i instances showed 12-18% better JSON serialization performance due to Ice Lake processors, while t3 instances showed 40% worse performance when burst credits were exhausted.
Real constraint we hit: EC2's network burst credits. Our c5.2xlarge could handle 10K req/s sustained, but log shipping to CloudWatch would consume burst credits. After 15 minutes, we'd hit the baseline 1.25 Gbps and start dropping logs. Solution: either move to c5n instances (25 Gbps sustained) or implement local buffering with S3 batch uploads.
The JSON serialization issue emerged gradually--p99 latency crept from 50ms to 200ms over two months as our user objects grew. Middleware was logging entire user objects including base64 profile pictures. We only noticed when a user uploaded a 10MB avatar and brought down an entire pod. Always log IDs, not objects.
Learn More: Performance & Scale
Want to optimize your logging performance?
- Log sampling best practices - Honeycomb's excellent guide
- Async logging patterns - Python's async handlers
- ELK stack performance - Tuning Elasticsearch for logs
Memory-Efficient Configuration
# Minimize memory usage for high-throughput systems
structlog.configure(
processors=[
# Minimal processors only
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(
# Compact JSON without extra spaces
separators=(",", ":"),
sort_keys=False
)
],
# Prevent logger cache growth
cache_logger_on_first_use=False,
# Use lightweight context
context_class=dict,
)
Distributed Tracing Integration
Connect your logs to distributed tracing for complete observability:
# OpenTelemetry integration
from opentelemetry import trace
import structlog
tracer = trace.get_tracer(__name__)
def inject_trace_context(logger, method_name, event_dict):
"""Add OpenTelemetry trace context to logs."""
span = trace.get_current_span()
if span:
ctx = span.get_span_context()
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
event_dict["trace_flags"] = ctx.trace_flags
return event_dict
# Use with your structlog configuration
processors.append(inject_trace_context)
# Example usage
@tracer.start_as_current_span("process_order")
def process_order(order_id):
log.info("processing_order", order_id=order_id)
# Logs now include trace_id and span_id automatically
With performance optimized and distributed tracing integrated, we must address cost optimization strategies for different platforms.
Platform-Specific Cost Optimization
Your observability platform determines how you optimize costs. We learned this when our CloudWatch bill increased dramatically--remember that $34,000 surprise from the introduction? These strategies would have prevented it.
Platform Cost Comparison and Optimization Strategies
Typical monthly costs for 10TB/month:
- CloudWatch: $5,000-10,000 (varies by tier)
- Datadog: $3,000-8,000 (depends on hosts/features)
- Self-hosted ELK: $2,000-5,000 (infrastructure only, excludes 0.5-2 FTE for maintenance)
- Hidden costs: Engineer time, query performance, data transfer
- Reality check: Self-hosted looks cheaper until you factor in on-call burden. Our team spent 15 hours/week maintaining Elasticsearch cluster health
Common Mistakes We Made (So You Don't Have To)
We've encountered these issues enough times to identify clear patterns. Here are the key mistakes to avoid.
Over-Logging
(Yes, this directly contradicts the "log everything" advice from earlier. The truth is nuanced--log everything during incidents, sample intelligently during normal operations.)
A common initial approach is to log everything. However, more logs don't equate to better observability--quality consistently outweighs quantity.
❌ Bad: Logging every variable
def calculate_discount(user, product, quantity):
log.info("calculate_discount_start",
user_id=user.id,
user_name=user.name,
user_email=user.email, # PII exposure risk
user_created=user.created_at,
user_tier=user.tier,
product_id=product.id,
product_name=product.name,
product_price=product.price,
product_category=product.category,
quantity=quantity,
user_preferences=user.preferences, # Large nested object
product_history=product.price_history, # Entire history array
)
# This creates massive logs and potential security issues
Real numbers from this exact mistake:
- Each request generated 2KB of logs (vs 200 bytes needed)
- At 10K req/s: 20MB/s of logs = 1.7TB/day
- CloudWatch cost: $850/day just for this one function
- Fix: Reduced to essential fields = 90% cost reduction
✅ Good: Log what matters
def calculate_discount(user, product, quantity):
log.info("discount_calculation",
user_id=user.id,
user_tier=user.tier, # Affects discount
product_id=product.id,
quantity=quantity, # Affects discount
base_price=product.price * quantity
)
Context Pollution
Be careful what you bind to logger context--it persists!
❌ Bad: Leaking sensitive data
# This credit card number stays in ALL subsequent logs!
log = log.bind(credit_card=payment.card_number)
process_payment()
send_email() # Still has credit card in context!
✅ Good: Scoped context
# Context only exists within this block
with structlog.contextvars.bound_contextvars(payment_id=payment.id):
process_payment()
# Context cleared automatically
Performance Pitfalls
❌ Bad: Expensive operations in logging
log.info("user_stats",
total_orders=Order.objects.filter(user=user).count(), # Database hit!
total_spent=calculate_lifetime_value(user), # Complex calculation!
recommendations=generate_recommendations(user) # ML model inference!
)
✅ Good: Log efficiently
# Only calculate when needed
if log.isEnabledFor(logging.INFO):
log.info("user_stats",
user_id=user.id,
cached_order_count=user.order_count, # Pre-calculated
user_segment=user.segment # Pre-computed
)
The Right Amount of Context
It took us a while to find the right amount of context:
# ❌ Too little context
log.error("Failed") # Failed what? Where? Why?
# ❌ Too much context
log.error("Failed to process order",
# ... 50 fields including entire request/response bodies
# ✅ Just right
log.error("order_processing_failed",
order_id=order.id,
step="payment_authorization",
error_code=err.code,
error_type=type(err).__name__,
user_id=order.user_id,
amount=order.total,
retry_attempt=retry_count
)
Testing Your Logs
from structlog.testing import LogCapture
@pytest.fixture
def log_output():
cap = LogCapture()
structlog.configure(processors=[cap])
return cap
def test_payment_logging(log_output):
process_payment(order_id="123")
assert log_output.entries[0]["event"] == "payment_started"
assert "transaction_id" in log_output.entries[1]
Test for: critical events logged, required fields present, no sensitive data leaks.
Getting Buy-In and Showing ROI
The code implementation is straightforward. Securing organizational buy-in requires concrete metrics and evidence.
What Bad Logging Actually Costs
We didn't realize how much our logging chaos was costing until we did the math. Here's what we discovered when we analyzed 6 months of incident data:
The Engineering Time Sink
Our analysis of 147 production incidents revealed important findings:
- Average debugging time: 4.2 hours per incident with grep/regex, vs 0.8 hours with structured queries
- False positive alerts: 73 alerts per week that required investigation but weren't real issues
- Regex complexity: Our
grep_commands.txt
file had grown to 1,200 lines of increasingly complex patterns - Knowledge hoarding: Only 3 engineers (out of 45) could effectively debug our payment service
At $150/hour for senior engineers, we were burning $92,400 per month just on extended debugging time.
The Hidden Financial Drain
Beyond engineering time, the costs compounded:
- Extended downtime: Each extra hour of debugging meant $14,000 in lost revenue (based on our transaction volume)
- Storage inefficiency: Storing "User 12345 logged in" as unstructured text used 3x more space than
{"event": "user_login", "user_id": 12345}
- Query costs: CloudWatch Insights charges by data scanned. Unstructured logs meant scanning 10x more data for each query
- Compliance risks: GDPR requests took days because we couldn't efficiently find all log entries for a user
Our monthly logging costs had quietly grown to $47,000--more than many engineers' salaries.
The Human Cost
The numbers don't capture the full impact:
- On-call burnout: Engineers dreaded their rotation. One senior engineer quit citing "debugging fatigue"
- Slow onboarding: New hires took 3 months to become effective at debugging vs 2 weeks at my previous company
- Innovation stagnation: Teams spent 40% of sprint time on debugging instead of building features
- Customer trust: "Your service is always broken" became a running joke in customer forums
Calculating Your Own Costs
Here's the framework we used to quantify our pain:
# Monthly cost of bad logging
engineering_hours = incidents_per_month * hours_per_incident * hourly_rate
downtime_cost = (mttr_hours - target_mttr) * hourly_revenue_loss * incidents
storage_waste = (unstructured_gb - structured_gb) * cost_per_gb
query_overhead = queries_per_month * excess_data_scanned * query_cost_per_gb
total_monthly_cost = engineering_hours + downtime_cost + storage_waste + query_overhead
For us, this totaled $186,000 per month in direct costs alone.
Why This Is Technical Debt
Unstructured logging isn't just an inconvenience--it's technical debt that compounds daily. Like any debt, it accrues interest that eventually becomes unsustainable.
The Compound Interest of Bad Logging
Every unstructured log line you write today costs more tomorrow:
- Regex Proliferation: Each new log format requires new grep patterns. Our regex library grew 20% quarterly.
- Context Loss: "Error processing request" tells you nothing. Six months later, even the author can't debug it.
- Scale Breakdown: What works at 100 req/s fails catastrophically at 10,000 req/s
- Knowledge Silos: Only Sarah knows how to debug the payment service. When Sarah leaves, you're stuck.
Industry analyses validate this pain: implementing structured logging can deliver a 65% reduction in MTTR and generate $2 million in annual savings for large enterprises.
The Architecture Tax
Unstructured logging forces architectural compromises:
- Synchronous Everything: Can't trace async operations without correlation IDs
- Monolithic Debugging: Forced to SSH into servers because logs aren't centralized
- Manual Correlation: Engineers become human JOIN operators across services
- Limited Automation: Can't build reliable alerts on unstructured text
The Innovation Penalty
This is the real killer. When debugging takes hours instead of minutes:
- Feature velocity drops: Teams spend more time fixing than building
- Experimentation dies: "We can't risk breaking prod" becomes the mantra
- Talent flees: Your best engineers want to build, not grep through logs
- Competition wins: While you debug, competitors ship features
As one of our engineers noted: "I didn't join this company to become a professional log parser."
The Tipping Point
The critical moment came during Black Friday 2022. Our payment service started timing out. Six engineers spent 14 hours debugging across 12 services. The root cause was that a third-party API had changed their timeout from 30s to 10s.
With structured logging, the query would have been:
SELECT service, endpoint, p95(response_time)
WHERE timestamp > '2022-11-25'
GROUP BY service, endpoint
HAVING p95 > previous_p95 * 1.5
That's when we realized that unstructured logging wasn't just technical debt--it had become unsustainable technical debt.
Real Companies, Real Results
Migration Strategy for Large Codebases
Migrating a million-line codebase to structured logging appears challenging. We initially thought so too, until we developed this systematic approach that converted 850K lines of code across 47 services in 90 days.
The 80/20 Rule of Logging
We analyzed 30 days of logs and discovered:
- 80% of debugging time focused on 20% of log statements
- 90% of critical errors came from 50 specific code paths
- 5 services generated 67% of all logs
This insight shaped our entire strategy--fix the logs that matter most.
Three-Phase Migration
Phase 1: High-Value Targets (Week 1-2)
# Convert payment flows, API endpoints, authentication
log.error("payment_failed",
user_id=user_id,
amount=amount,
gateway_response=response.status_code,
processing_time_ms=elapsed_ms
)
Phase 2: Critical Paths (Week 3-4)
- Database queries, external APIs, cache operations
- Use decorators:
@structured_log("external_api_call")
Phase 3: Automation (Week 5+)
- AST-based conversion scripts
- Linting rules for PRs
- Team templates
Measuring Progress
We tracked migration success through concrete metrics:
- Coverage: Percentage of critical paths with structured logging
- Query time: Average time to find root cause during incidents
- Alert accuracy: False positive rate for automated alerts
- Developer adoption: Usage of structured logging in new code
The Gradual Rollout
Don't flip a switch. We used feature flags to gradually enable structured logging:
if feature_flag("structured_logging_enabled", service="payment"):
log.info("payment_processed", **payment_details)
else:
old_logger.info(f"Payment processed: {payment_id}")
This let us:
- Validate performance impact service by service
- Roll back instantly if issues arose
- Maintain backward compatibility during migration
- Teams with existing log analysis scripts will need to update their parsing logic. We maintained backward compatibility by dual-writing for 30 days--both old text format and new JSON to separate log streams. This added 15% overhead but prevented breaking hundreds of monitoring scripts
- Train teams gradually instead of all at once
Results After 90 Days
- 94% of critical paths converted
- MTTR reduced from 4.2 hours to 47 minutes
- 3 major incidents caught before customer impact (though we also had 2 false positives that triggered unnecessary 3am pages)
Focus on logs that matter for debugging--the rest is nice-to-have. This sounds obvious until you're in a war room at 3am wishing you'd logged that one extra field.
Your Observable System Implementation Roadmap
Building observable systems isn't a one-time project--it's an ongoing process. Here's a practical roadmap based on what worked for us and numerous teams we've assisted.
Week 1: Foundation
Start with your most problematic service. Implement basic structured logging with request IDs. Success = 50% faster debugging.
Week 2: Performance & Cost Control
Add sampling (remember the request correlation lesson from our production failures). Switch to orjson. Success = <2% overhead.
Azure-specific tip: Application Insights can get expensive fast. Use adaptive sampling with fixed rate for errors--we learned this after a $8K surprise bill.
Week 3: Cross-Service Correlation
This is where the contextvars patterns from earlier become critical. Propagate correlation IDs between services--but watch out for the FastAPI background task gotcha we hit.
Week 4: Platform Integration & Scale
Goal: Make structured logging your default observability tool.
- Day 22-23: Integrate with your APM/monitoring platform
- Day 24-25: Set up log-based alerts and SLOs
- Day 26-28: Document patterns and train the team
Success Metric: 80% of new code uses structured logging patterns.
Real-world gotcha: We discovered our load balancer was stripping correlation headers. Always test end-to-end, not just service-to-service.
Month 2-3: Advanced Observability
Goal: Build true observability with logs, metrics, and traces working together.
- OpenTelemetry Integration: Connect logs to distributed traces
- Custom Dashboards: Build service-specific observability dashboards
- Automated Analysis: Log-based anomaly detection and alerting
- Cost Optimization: Implement tiered storage and retention policies
Common Pitfalls to Avoid
- Over-Engineering Early: Start simple. You can always add more structure later. We spent 3 weeks building a custom log router before realizing Fluentd already did everything we needed.
- Logging Everything: More logs ≠ better observability. Be selective. Our "log all SQL queries" decision generated 89% of our volume but helped debug less than 1% of issues.
- Ignoring Performance: Monitor your monitoring. Set CPU/memory budgets. We allocated 5% CPU for logging--when it hit 15%, we knew something was wrong.
- Solo Implementation: Involve the on-call team early--they're your champions. Their pushback on our initial design saved us from logging formats that would have been useless at 3am.
Measuring Success
Track these KPIs to prove value (but be honest about the nuance):
Resources You'll Need
- Time Investment: 2-4 hours/day for the champion, 30 min/day for team
- Infrastructure: May need 20-30% more log storage initially (before optimization)
- Team Training: 2-hour workshop + documentation + code reviews
The key is starting small and showing value quickly. One engineer reducing one incident's resolution time from 4 hours to 30 minutes pays for the entire initiative. We used this exact argument with our CFO--showing the math on engineering hours saved finally got budget approval.
Getting Team Buy-In
An effective approach to gain buy-in is to demonstrate a live comparison between 45-second grep searches and instant structured queries. Pro tip: use a real production issue from last week--the pain is still fresh and the grep commands are still in their shell history. Provide early access to on-call engineers who experience the most pain. Supply ready-to-use code snippets. Adoption typically follows.
Next Steps
You've seen the problems we hit at scale and the solutions that worked. The journey from basic structured logging to production-ready systems requires careful optimization, intelligent sampling, and framework-specific patterns. But the payoff--measured in reduced MTTR, lower costs, and happier engineers--justifies every hour invested.
Your Production Roadmap
- Audit Current State: Track MTTR for 20 incidents, survey debugging time, analyze costs
- Pick Biggest Pain: Start with one high-impact service (1K-10K req/s)
- Execute 30-Day Plan: Week 1 must show value or momentum dies
- Measure Everything: Quantify time savings, cost reduction, team satisfaction
Beyond Structured Logging
Once your structured logging foundation is solid, the real power emerges:
Distributed Tracing Integration
Connect your logs to distributed traces using the TracingProcessor pattern shown in the Building Observable Systems section above. This automatically correlates all logs with their trace context.
Real-time Anomaly Detection
- Alert on unusual patterns (spike in errors, slow queries)
- Identify issues before customers report them
- Predict failures based on log patterns
Cost Optimization at Scale
- Implement tiered storage (hot/warm/cold)--though we found the complexity often outweighed savings below 50TB/month
- Use compression algorithms like CLP for 100x+ reduction (realistic expectation: 20-40x for most workloads)
- Automated log lifecycle management
Learning from Our Journey
After three years of running structured logging at scale, these are the key lessons learned through experience:
The Search Endpoint Mystery
Our /api/v2/search
endpoint perfectly illustrates why structured logging matters (and why we now have strict query complexity limits). For months, we had intermittent timeouts that appeared random. Traditional logs showed nothing--just timeout errors with no pattern.
With structured logging, one query changed everything:
SELECT
user_id,
COUNT(*) as requests,
AVG(duration_ms) as avg_time,
MAX(array_length(search_terms)) as max_terms
FROM logs
WHERE endpoint = '/api/v2/search'
AND duration_ms > 1000
AND timestamp > now() - interval '1 hour'
GROUP BY user_id
ORDER BY avg_time DESC;
This resulted in an important discovery--one power user was submitting queries with 50+ search terms, each triggering a separate database query. Our N+1 query problem had been undetected (classic case of the JSON serialization problem from section 1 compounding with bad query patterns). The fix took 30 minutes once we knew what to fix. Finding it had taken 3 months of intermittent debugging sessions, false leads, and one very frustrated SRE who insisted "it's always that one user" (he was right).
Platform constraint that bit us: AWS Application Load Balancer has a 60-second timeout. Our original 5-second delays occasionally cascaded into timeouts during peak load. We only discovered this through structured logs showing retry patterns--another win for proper observability.
Mistakes We Made (So You Don't Have To)
-
Over-Logging User Data: We logged entire user objects including emails and phone numbers. One GDPR request later, we learned why that's a terrible idea.
-
Crashing Production: Our first async logging implementation had a memory leak. Under load, it consumed 16GB of RAM in 10 minutes. Always load test your logging changes. We learned this after our "harmless" logging update consumed all available memory in production within 17 minutes.
-
Ignoring Context Boundaries: We didn't realize FastAPI's background tasks run in a separate context. Lost correlation IDs everywhere until we explicitly passed context.
-
The Redis Incident: We logged every Redis operation. At 50K ops/second, we generated 4TB of logs daily. Sometimes less is more.
What Actually Matters
- Start Small: One service, one week, measurable results
- Performance First: If logging slows down the service, nothing else matters
- Developer Experience: Make the right thing the easy thing
- Incremental Wins: Each improvement builds momentum for the next (though we hit diminishing returns after the first 60% improvement)
Take Action Today
For teams processing 1000+ requests/second:
- Start with the sampling strategies in this guide
- Implement orjson for instant performance gains
- Deploy to your highest-traffic service first
For teams struggling with costs:
- Analyze your current log volume by type
- Implement the 100/10/1 sampling rule
- Monitor cost reduction while maintaining visibility
For teams with compliance requirements:
- Deploy the PII redaction processors
- Implement audit trails for sensitive operations
- Ensure GDPR compliance with geographic processors
The difference between hoping you'll find the problem and knowing you will--that's what production-grade structured logging delivers. It's not perfect--complex distributed failures still require detective work--but it transforms most debugging from archaeology to simple queries.
Insights That Connect Code to Consequences.
I share battle-tested lessons from building (and breaking) systems at scale. Every two weeks, you'll get a deep dive on reliability, observability, and the art of connecting technical decisions to their real-world business impact. No theory, just hard-won wisdom.
Unsubscribe anytime.
References
Advanced resources for production-grade structured logging at scale
Core Libraries & Performance
• structlog - Official documentation and performance guidelines
• orjson - Fast JSON serialization library
• msgpack - Binary serialization format
• python-rapidjson - Another high-performance JSON library
Production Logging Patterns
• Uber Engineering. (2023). Reducing Logging Cost by Two Orders of Magnitude using CLP
• Stripe Engineering. (2019). Fast and Flexible Observability with Canonical Log Lines
• Honeycomb. (2024). Observability Best Practices
Cost Optimization & Pricing
• AWS. (2024). Lambda Tiered Pricing for CloudWatch Logs
• Datadog. (2024). Log Management Pricing
• Google Cloud. (2024). Cloud Logging Pricing
Advanced Techniques
• Sampling Strategies: https://opentelemetry.io/docs/reference/specification/trace/sdk/#sampling
• Log Compression: https://github.com/uber/compressed-log-processor
• Async Logging: https://docs.python.org/3/library/asyncio.html
• Distributed Tracing: https://opentelemetry.io/docs/concepts/observability-primer/
Performance Benchmarks
• structlog benchmarks
• Python logging performance
• JSON serialization benchmarks
Enterprise Case Studies
• Netflix Technology Blog
• Uber Engineering
• Airbnb Engineering
• Spotify Engineering
Tools & Platforms
• OpenTelemetry: https://opentelemetry.io/docs/instrumentation/python/
• Prometheus: https://prometheus.io/docs/practices/logging/
• Grafana Loki: https://grafana.com/docs/loki/latest/
• Elasticsearch: https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html