Redis as a Cache
Redis is an in-memory data structure store that excels as a cache due to its sub-millisecond latency, rich data types, and built-in expiration. When used correctly, Redis can reduce database load by 90% or more while dramatically improving application response times.
Data Structures Overview
| Structure | Use Case | Example |
|-----------|----------|---------|
| String | Simple values, counters | User session, page views |
| Hash | Object fields | User profile fields |
| List | Ordered collection | Message queue, timeline |
| Set | Unique values | Tags, followers |
| Sorted Set | Ranked data | Leaderboard, rate limiting |
| HyperLogLog | Cardinality estimation | Unique visitors |
| Bitmap | Boolean flags | Daily active users |
Caching Patterns
Cache-Aside (Lazy Loading)
The most common caching pattern. The application checks the cache first; on a miss, it loads data from the database and populates the cache.
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_user(user_id):
cache_key = f"user:{user_id}"
# Try cache first
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss: load from database
user = db.query("SELECT * FROM users WHERE id = %s", [user_id])
if user:
# Populate cache with TTL
r.setex(cache_key, 3600, json.dumps(user))
return user
**Pros**: Only caches data that is requested, resilient to cache failures.
**Cons**: Cache miss penalty (three round trips), stale data until TTL expires.
Write-Through
Data is written to the cache first, then to the database. Reads always hit the cache.
def update_user(user_id, data):
cache_key = f"user:{user_id}"
# Write to cache first
r.setex(cache_key, 3600, json.dumps(data))
# Then write to database
db.execute(
"UPDATE users SET name = %s, email = %s WHERE id = %s",
[data['name'], data['email'], user_id]
)
**Pros**: Cache is always consistent with database writes.
**Cons**: Write latency increases, cache stores data that may never be read.
Write-Behind (Write-Back)
Data is written to cache and asynchronously written to the database later.
def write_behind(user_id, data):
cache_key = f"user:{user_id}"
# Write to cache immediately
r.setex(cache_key, 3600, json.dumps(data))
# Queue database write for batch processing
r.lpush("db:write:queue", json.dumps({
"table": "users",
"id": user_id,
"data": data
}))
**Pros**: Very fast writes, can batch database operations.
**Cons**: Risk of data loss if cache fails before database write.
Cache Invalidation Strategies
| Strategy | How It Works | Best For |
|----------|-------------|----------|
| TTL expiration | Automatic expiry after set time | Most applications |
| Key deletion | Delete cache key on data update | Write-through |
| Versioned keys | Include version in key name | Schema changes |
| Pub/sub invalidation | Notify all instances to invalidate | Distributed caches |
Advanced Patterns
Distributed Locking
def acquire_lock(lock_name, acquire_timeout=10):
identifier = str(uuid.uuid4())
end = time.time() + acquire_timeout
while time.time() < end:
if r.setnx(f"lock:{lock_name}", identifier):
r.expire(f"lock:{lock_name}", 10)
return identifier
time.sleep(0.001)
return None
def release_lock(lock_name, identifier):
# Use Lua script for atomic release
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
r.eval(script, 1, f"lock:{lock_name}", identifier)
Rate Limiting with Sorted Sets
def is_rate_limited(user_id, max_requests=100, window_seconds=60):
key = f"ratelimit:{user_id}"
now = time.time()
# Remove old entries
r.zremrangebyscore(key, 0, now - window_seconds)
# Count current requests
if r.zcard(key) >= max_requests:
return True
# Add current request
r.zadd(key, {now: now})
r.expire(key, window_seconds)
return False
Session Storage
# Store session with hash
def create_session(session_id, user_data, ttl=86400):
key = f"session:{session_id}"
r.hset(key, mapping={
'user_id': user_data['id'],
'username': user_data['username'],
'roles': json.dumps(user_data['roles']),
'ip': user_data['ip'],
'created_at': time.time()
})
r.expire(key, ttl)
def get_session(session_id):
key = f"session:{session_id}"
data = r.hgetall(key)
if data:
data['roles'] = json.loads(data['roles'])
return data
Performance Optimization
Connection Pooling
from redis.connection import ConnectionPool
# Reuse connections across requests
pool = ConnectionPool(
host='localhost',
port=6379,
max_connections=50,
socket_timeout=5
)
r = redis.Redis(connection_pool=pool)
Pipeline / Batching
# Batch operations to reduce round trips
pipe = r.pipeline()
for user_id in user_ids:
pipe.get(f"user:{user_id}")
results = pipe.execute()
Monitoring
# Redis CLI monitoring
redis-cli info stats
# Key metrics to watch
# - hit_rate: keyspace_hits / (keyspace_hits + keyspace_misses)
# - evicted_keys: keys evicted due to maxmemory
# - maxmemory: configured memory limit
# - connected_clients: active connections
Common Pitfalls
| Pitfall | Consequence | Fix |
|---------|-------------|-----|
| No TTL | Memory exhaustion | Always set TTL |
| Cache stampede | Overload database on miss | Use mutex locking |
| Too-large values | Memory waste, slow serialization | Compress or split |
| Hot keys | Single key becomes bottleneck | Shard or replicate |
| Cache-aside without TTL | Stale data lives forever | Always set TTL |
Summary
Redis caching can dramatically improve application performance when the right pattern is applied. Use cache-aside as the default pattern with appropriate TTLs, implement write-through for consistency-critical data, use distributed locking for race conditions, and sorted sets for rate limiting. Monitor hit rates and eviction counts, and always set TTLs to prevent memory exhaustion.