Skip to content

Redis — Patterns

OWNER: gerco (infrastructure) ALSO_USED_BY: urszula, maxim (application usage) LAST_VERIFIED: 2026-03-26 GE_STACK_VERSION: Redis 7.4


Overview

Common Redis patterns used across GE beyond Streams and caching. Covers rate limiting, distributed locks, pub/sub, and sorted set queues. Agents implementing these patterns MUST follow the GE-specific constraints.

Rate Limiting

Sliding Window Counter (GE Default)

GE uses sliding window counters for API rate limiting and agent task throttling. The orchestrator enforces 10 tasks/min/agent using this pattern.

async def check_rate_limit(
    key: str,
    limit: int,
    window_seconds: int
) -> bool:
    """Returns True if request is allowed, False if rate-limited."""
    now = time.time()
    window_start = now - window_seconds
    pipe = redis.pipeline()

    # Remove expired entries
    pipe.zremrangebyscore(key, "-inf", window_start)
    # Count current window
    pipe.zcard(key)
    # Add current request
    pipe.zadd(key, {f"{now}:{uuid4().hex[:8]}": now})
    # Set TTL on the sorted set itself
    pipe.expire(key, window_seconds + 1)

    results = await pipe.execute()
    current_count = results[1]

    return current_count < limit

Usage in GE

# Agent task rate limit (orchestrator)
allowed = await check_rate_limit(
    f"ratelimit:tasks:{agent_name}:1min",
    limit=10,
    window_seconds=60
)
if not allowed:
    # Queue for later or reject
    logger.warning(f"Rate limit exceeded for {agent_name}")
    return

# API endpoint rate limit
allowed = await check_rate_limit(
    f"ratelimit:api:{client_ip}:1min",
    limit=60,
    window_seconds=60
)
if not allowed:
    raise HTTPException(status_code=429, detail="Too many requests")

GE Rate Limits

Scope Limit Window Enforced By
Tasks per agent 10 1 min ge-orchestrator
Hooks per agent 20 1 hour hooks.py
Chat messages per agent 10 1 min claude-chat.ts
API requests per client 60 1 min admin-ui middleware

CHECK: Agent is adding a new rate limit. IF: The sorted set has no EXPIRE. THEN: Add EXPIRE. Without it, rate limit keys for inactive entities never expire, leaking memory.

ANTI_PATTERN: Using fixed-window counters with INCR/EXPIRE. FIX: Fixed windows have boundary bursts (2x limit at window edges). Use sliding window with sorted sets.

Distributed Locks

Simple Lock (Single Instance)

GE runs a single Redis instance. Redlock (multi-instance) is NOT needed. Use the simple SET NX pattern for distributed locking.

async def acquire_lock(
    key: str,
    token: str,
    ttl_seconds: int = 10
) -> bool:
    """Acquire a lock. Returns True if acquired."""
    return await redis.set(
        f"lock:{key}",
        token,
        nx=True,
        ex=ttl_seconds
    )

async def release_lock(key: str, token: str) -> bool:
    """Release a lock only if we own it (token matches)."""
    # Lua script for atomic check-and-delete
    script = """
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    """
    result = await redis.eval(script, 1, f"lock:{key}", token)
    return result == 1

Context Manager

@asynccontextmanager
async def distributed_lock(
    key: str,
    ttl_seconds: int = 10,
    retry_delay: float = 0.1,
    max_retries: int = 50
):
    token = uuid4().hex
    acquired = False
    for _ in range(max_retries):
        acquired = await acquire_lock(key, token, ttl_seconds)
        if acquired:
            break
        await asyncio.sleep(retry_delay)

    if not acquired:
        raise LockAcquisitionTimeout(key)

    try:
        yield token
    finally:
        await release_lock(key, token)

Usage

# Prevent duplicate agent execution
async with distributed_lock(f"agent-execution:{agent_name}", ttl_seconds=300):
    await execute_task(agent_name, task)

CHECK: Agent is implementing a distributed lock. IF: Lock has no TTL (or TTL longer than the expected operation). THEN: Set a reasonable TTL. Locks without TTL become permanent if the holder crashes.

ANTI_PATTERN: Releasing a lock without verifying ownership (token check). FIX: Always use the Lua script pattern to atomically check-and-delete. Without it, you may release another process's lock.

ANTI_PATTERN: Using Redlock for GE. FIX: GE runs a single Redis instance. Redlock adds complexity for multi-instance setups we do not have. Use simple SET NX.

Pub/Sub

When to Use

GE uses Pub/Sub sparingly — Streams are preferred for work routing. Pub/Sub is appropriate for:

  • Real-time notifications to connected WebSocket clients.
  • Config change broadcasts (all consumers need the update).
  • Health check pings.

Pub/Sub messages are fire-and-forget — if no subscriber is listening, the message is lost.

# Publisher
await redis.publish("config:updated", json.dumps({
    "key": "agent-routing",
    "timestamp": time.time()
}))

# Subscriber
pubsub = redis.pubsub()
await pubsub.subscribe("config:updated")
async for message in pubsub.listen():
    if message["type"] == "message":
        config = json.loads(message["data"])
        await reload_config(config["key"])

CHECK: Agent is choosing between Streams and Pub/Sub. IF: The message must be processed exactly once (work items, tasks). THEN: Use Streams with consumer groups. Pub/Sub has no acknowledgment, no persistence, no replay. IF: The message is a broadcast that all listeners should receive (config changes, notifications). THEN: Pub/Sub is appropriate.

Sorted Sets for Priority Queues

Priority Work Queue

# Enqueue with priority score (lower = higher priority)
PRIORITY_MAP = {"critical": 0, "high": 1, "normal": 2, "low": 3}

async def enqueue_priority(queue: str, item_id: str, priority: str):
    score = PRIORITY_MAP.get(priority, 2)
    # Add timestamp as tiebreaker for same-priority items
    score_with_time = score + (time.time() / 1e12)
    await redis.zadd(queue, {item_id: score_with_time})

# Dequeue highest priority item
async def dequeue_priority(queue: str) -> str | None:
    # ZPOPMIN returns the item with the lowest score (highest priority)
    result = await redis.zpopmin(queue, count=1)
    if result:
        return result[0][0]  # (member, score)
    return None

Leaderboard / Ranking

# Track agent completion counts
await redis.zincrby("agent:completions:daily", 1, agent_name)

# Get top 10 agents by completions
top_agents = await redis.zrevrange(
    "agent:completions:daily", 0, 9, withscores=True
)

# Reset daily (via cron or TTL)
await redis.delete("agent:completions:daily")

Hash for Structured Cache

# Store agent state as a hash (more memory-efficient than JSON string)
await redis.hset(f"agent:{agent_name}:state", mapping={
    "status": "active",
    "current_task": task_id,
    "last_heartbeat": str(time.time()),
    "tasks_completed": str(count)
})

# Read specific fields
status = await redis.hget(f"agent:{agent_name}:state", "status")

# Read all fields
state = await redis.hgetall(f"agent:{agent_name}:state")

# Increment a counter atomically
await redis.hincrby(f"agent:{agent_name}:state", "tasks_completed", 1)

CHECK: Agent is storing structured data in Redis. IF: Data has 5+ fields that are read/written independently. THEN: Use a Hash instead of serialized JSON. Hashes allow partial reads/writes and are more memory-efficient.

Pipeline and Transactions

Pipelining (Performance)

# Batch multiple commands in one round-trip
pipe = redis.pipeline(transaction=False)
for agent in agents:
    pipe.get(f"agent:{agent}:status")
results = await pipe.execute()

Transactions (Atomicity)

# Atomic multi-command execution
pipe = redis.pipeline(transaction=True)
pipe.multi()
pipe.set(f"agent:{name}:status", "active")
pipe.zadd("active-agents", {name: time.time()})
pipe.publish("agent:status-changed", name)
await pipe.execute()

CHECK: Agent is making multiple Redis calls in a loop. IF: Each call is independent and does not depend on the previous result. THEN: Use a pipeline. N independent calls in a pipeline = 1 round-trip instead of N.

GE-Specific Conventions

  • Streams over Pub/Sub for work routing — Pub/Sub only for broadcasts.
  • Single-instance locks — No Redlock complexity needed.
  • Sliding window for rate limiting — No fixed-window counters.
  • Pipelines for batch operations — Never loop with individual calls.
  • Hashes for structured state — More efficient than JSON strings.
  • EXPIRE on every temporary key — No permanent clutter.

Cross-References

READ_ALSO: wiki/docs/stack/redis/index.md READ_ALSO: wiki/docs/stack/redis/streams.md READ_ALSO: wiki/docs/stack/redis/caching.md READ_ALSO: wiki/docs/stack/redis/pitfalls.md