Redis — Patterns¶
OWNER: gerco (infrastructure) ALSO_USED_BY: urszula, maxim (application usage) LAST_VERIFIED: 2026-03-26 GE_STACK_VERSION: Redis 7.4
Overview¶
Common Redis patterns used across GE beyond Streams and caching. Covers rate limiting, distributed locks, pub/sub, and sorted set queues. Agents implementing these patterns MUST follow the GE-specific constraints.
Rate Limiting¶
Sliding Window Counter (GE Default)¶
GE uses sliding window counters for API rate limiting and agent task throttling. The orchestrator enforces 10 tasks/min/agent using this pattern.
async def check_rate_limit(
key: str,
limit: int,
window_seconds: int
) -> bool:
"""Returns True if request is allowed, False if rate-limited."""
now = time.time()
window_start = now - window_seconds
pipe = redis.pipeline()
# Remove expired entries
pipe.zremrangebyscore(key, "-inf", window_start)
# Count current window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {f"{now}:{uuid4().hex[:8]}": now})
# Set TTL on the sorted set itself
pipe.expire(key, window_seconds + 1)
results = await pipe.execute()
current_count = results[1]
return current_count < limit
Usage in GE¶
# Agent task rate limit (orchestrator)
allowed = await check_rate_limit(
f"ratelimit:tasks:{agent_name}:1min",
limit=10,
window_seconds=60
)
if not allowed:
# Queue for later or reject
logger.warning(f"Rate limit exceeded for {agent_name}")
return
# API endpoint rate limit
allowed = await check_rate_limit(
f"ratelimit:api:{client_ip}:1min",
limit=60,
window_seconds=60
)
if not allowed:
raise HTTPException(status_code=429, detail="Too many requests")
GE Rate Limits¶
| Scope | Limit | Window | Enforced By |
|---|---|---|---|
| Tasks per agent | 10 | 1 min | ge-orchestrator |
| Hooks per agent | 20 | 1 hour | hooks.py |
| Chat messages per agent | 10 | 1 min | claude-chat.ts |
| API requests per client | 60 | 1 min | admin-ui middleware |
CHECK: Agent is adding a new rate limit. IF: The sorted set has no EXPIRE. THEN: Add EXPIRE. Without it, rate limit keys for inactive entities never expire, leaking memory.
ANTI_PATTERN: Using fixed-window counters with INCR/EXPIRE. FIX: Fixed windows have boundary bursts (2x limit at window edges). Use sliding window with sorted sets.
Distributed Locks¶
Simple Lock (Single Instance)¶
GE runs a single Redis instance. Redlock (multi-instance) is NOT needed. Use the simple SET NX pattern for distributed locking.
async def acquire_lock(
key: str,
token: str,
ttl_seconds: int = 10
) -> bool:
"""Acquire a lock. Returns True if acquired."""
return await redis.set(
f"lock:{key}",
token,
nx=True,
ex=ttl_seconds
)
async def release_lock(key: str, token: str) -> bool:
"""Release a lock only if we own it (token matches)."""
# Lua script for atomic check-and-delete
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = await redis.eval(script, 1, f"lock:{key}", token)
return result == 1
Context Manager¶
@asynccontextmanager
async def distributed_lock(
key: str,
ttl_seconds: int = 10,
retry_delay: float = 0.1,
max_retries: int = 50
):
token = uuid4().hex
acquired = False
for _ in range(max_retries):
acquired = await acquire_lock(key, token, ttl_seconds)
if acquired:
break
await asyncio.sleep(retry_delay)
if not acquired:
raise LockAcquisitionTimeout(key)
try:
yield token
finally:
await release_lock(key, token)
Usage¶
# Prevent duplicate agent execution
async with distributed_lock(f"agent-execution:{agent_name}", ttl_seconds=300):
await execute_task(agent_name, task)
CHECK: Agent is implementing a distributed lock. IF: Lock has no TTL (or TTL longer than the expected operation). THEN: Set a reasonable TTL. Locks without TTL become permanent if the holder crashes.
ANTI_PATTERN: Releasing a lock without verifying ownership (token check). FIX: Always use the Lua script pattern to atomically check-and-delete. Without it, you may release another process's lock.
ANTI_PATTERN: Using Redlock for GE. FIX: GE runs a single Redis instance. Redlock adds complexity for multi-instance setups we do not have. Use simple SET NX.
Pub/Sub¶
When to Use¶
GE uses Pub/Sub sparingly — Streams are preferred for work routing. Pub/Sub is appropriate for:
- Real-time notifications to connected WebSocket clients.
- Config change broadcasts (all consumers need the update).
- Health check pings.
Pub/Sub messages are fire-and-forget — if no subscriber is listening, the message is lost.
# Publisher
await redis.publish("config:updated", json.dumps({
"key": "agent-routing",
"timestamp": time.time()
}))
# Subscriber
pubsub = redis.pubsub()
await pubsub.subscribe("config:updated")
async for message in pubsub.listen():
if message["type"] == "message":
config = json.loads(message["data"])
await reload_config(config["key"])
CHECK: Agent is choosing between Streams and Pub/Sub. IF: The message must be processed exactly once (work items, tasks). THEN: Use Streams with consumer groups. Pub/Sub has no acknowledgment, no persistence, no replay. IF: The message is a broadcast that all listeners should receive (config changes, notifications). THEN: Pub/Sub is appropriate.
Sorted Sets for Priority Queues¶
Priority Work Queue¶
# Enqueue with priority score (lower = higher priority)
PRIORITY_MAP = {"critical": 0, "high": 1, "normal": 2, "low": 3}
async def enqueue_priority(queue: str, item_id: str, priority: str):
score = PRIORITY_MAP.get(priority, 2)
# Add timestamp as tiebreaker for same-priority items
score_with_time = score + (time.time() / 1e12)
await redis.zadd(queue, {item_id: score_with_time})
# Dequeue highest priority item
async def dequeue_priority(queue: str) -> str | None:
# ZPOPMIN returns the item with the lowest score (highest priority)
result = await redis.zpopmin(queue, count=1)
if result:
return result[0][0] # (member, score)
return None
Leaderboard / Ranking¶
# Track agent completion counts
await redis.zincrby("agent:completions:daily", 1, agent_name)
# Get top 10 agents by completions
top_agents = await redis.zrevrange(
"agent:completions:daily", 0, 9, withscores=True
)
# Reset daily (via cron or TTL)
await redis.delete("agent:completions:daily")
Hash for Structured Cache¶
# Store agent state as a hash (more memory-efficient than JSON string)
await redis.hset(f"agent:{agent_name}:state", mapping={
"status": "active",
"current_task": task_id,
"last_heartbeat": str(time.time()),
"tasks_completed": str(count)
})
# Read specific fields
status = await redis.hget(f"agent:{agent_name}:state", "status")
# Read all fields
state = await redis.hgetall(f"agent:{agent_name}:state")
# Increment a counter atomically
await redis.hincrby(f"agent:{agent_name}:state", "tasks_completed", 1)
CHECK: Agent is storing structured data in Redis. IF: Data has 5+ fields that are read/written independently. THEN: Use a Hash instead of serialized JSON. Hashes allow partial reads/writes and are more memory-efficient.
Pipeline and Transactions¶
Pipelining (Performance)¶
# Batch multiple commands in one round-trip
pipe = redis.pipeline(transaction=False)
for agent in agents:
pipe.get(f"agent:{agent}:status")
results = await pipe.execute()
Transactions (Atomicity)¶
# Atomic multi-command execution
pipe = redis.pipeline(transaction=True)
pipe.multi()
pipe.set(f"agent:{name}:status", "active")
pipe.zadd("active-agents", {name: time.time()})
pipe.publish("agent:status-changed", name)
await pipe.execute()
CHECK: Agent is making multiple Redis calls in a loop. IF: Each call is independent and does not depend on the previous result. THEN: Use a pipeline. N independent calls in a pipeline = 1 round-trip instead of N.
GE-Specific Conventions¶
- Streams over Pub/Sub for work routing — Pub/Sub only for broadcasts.
- Single-instance locks — No Redlock complexity needed.
- Sliding window for rate limiting — No fixed-window counters.
- Pipelines for batch operations — Never loop with individual calls.
- Hashes for structured state — More efficient than JSON strings.
- EXPIRE on every temporary key — No permanent clutter.
Cross-References¶
READ_ALSO: wiki/docs/stack/redis/index.md READ_ALSO: wiki/docs/stack/redis/streams.md READ_ALSO: wiki/docs/stack/redis/caching.md READ_ALSO: wiki/docs/stack/redis/pitfalls.md