Skip to content

Redis — Streams

OWNER: gerco (infrastructure) ALSO_USED_BY: urszula, maxim (application usage) LAST_VERIFIED: 2026-03-26 GE_STACK_VERSION: Redis 7.4


Overview

Redis Streams are GE's backbone for agent communication. Every work item, trigger, and completion signal flows through Streams. This page covers the patterns GE uses, consumer groups, and mandatory safety rules. Agents writing code that publishes or consumes Streams MUST follow these patterns.

Core Concepts

A Redis Stream is an append-only log. Each entry has a unique, time-based ID (e.g., 1711468800000-0). Entries are immutable — you do not update or delete individual entries.

The queue state (pending vs. completed) lives in the Consumer Group metadata, not in the data itself.

CHECK: Agent is treating a Stream like a mutable queue. IF: Agent uses XDEL after processing to "remove" consumed messages. THEN: Do NOT use XDEL. Use XACK to acknowledge. Use XTRIM with MAXLEN to manage memory.

GE Stream Architecture

Stream Names

Stream Purpose MAXLEN
ge:work:incoming New work items from Admin UI ~1000
triggers.{agent_name} Per-agent work triggers from orchestrator ~100
ge:completions Completion signals (future — currently COMP files) ~1000
ge:health Health heartbeats from agents ~500
ge:discussions Multi-agent discussion messages ~500

CHECK: Agent is creating a new stream. IF: Stream name does not follow the patterns above. THEN: Get approval from gerco before creating new streams. Undiscovered streams waste memory.

Work Routing Flow

1. Admin UI → XADD ge:work:incoming * task "build-feature" agent_id "martijn" ...
2. ge-orchestrator consumer group reads ge:work:incoming
3. Orchestrator routes based on dolly-routing.yaml:
   - work_type match
   - explicit agent_id
   - trigger pattern
4. Orchestrator → XADD triggers.martijn * task "build-feature" ...
5. ge-executor consumer for "martijn" reads triggers.martijn
6. Executor processes, writes COMP-*.md

XADD — Publishing Messages

Mandatory MAXLEN

Every XADD in GE code MUST include a MAXLEN constraint.

# CORRECT — Python (ioredis equivalent in Node)
await redis.xadd(
    f"triggers.{agent_name}",
    {"task": task, "type": work_type, "work_item_id": work_id},
    maxlen=100,
    approximate=True  # ~ prefix — allows Redis to optimize trimming
)
// CORRECT — Node.js (ioredis)
await redis.xadd(
  `triggers.${agentName}`,
  'MAXLEN', '~', '100',
  '*',
  'task', task,
  'type', workType,
  'work_item_id', workId
);

ANTI_PATTERN: XADD without MAXLEN. FIX: Add MAXLEN. Per-agent streams: ~100. System streams: ~1000. No exceptions.

ANTI_PATTERN: Publishing the same task to BOTH triggers.{agent} AND ge:work:incoming. FIX: Publish to ONE stream only. The orchestrator routes from ge:work:incoming to triggers.{agent}. Double-publishing causes double execution and double cost.

Message Fields

GE messages use these standard fields:

Field Required Description
task Yes Description of what to do
type Yes Work type for routing
inbox_id Yes Unique identifier for dedup
priority No low, normal, high, critical
work_item_id Yes Dedup key (5-min window in executor)
work_item No Serialized work package JSON
agent_id No Explicit target agent (overrides routing)

Consumer Groups

Setup

Consumer groups are created once per stream. The orchestrator and executor create their groups on startup.

# Create consumer group (idempotent — MKSTREAM creates stream if missing)
try:
    await redis.xgroup_create(
        "ge:work:incoming",
        "orchestrator-group",
        id="0",
        mkstream=True
    )
except ResponseError as e:
    if "BUSYGROUP" not in str(e):
        raise  # Group already exists — fine

Reading with XREADGROUP

# Read new messages (> means "only undelivered messages")
messages = await redis.xreadgroup(
    groupname="orchestrator-group",
    consumername=f"orchestrator-{instance_id}",
    streams={"ge:work:incoming": ">"},
    count=10,
    block=1000  # Block up to 1 second for new messages
)

CHECK: Agent is reading from a stream. IF: Agent uses XREAD instead of XREADGROUP. THEN: Use XREADGROUP. XREAD does not track delivery or support acknowledgment. Consumer groups provide load balancing, PEL tracking, and failure recovery.

Acknowledging with XACK

# After successful processing
await redis.xack("ge:work:incoming", "orchestrator-group", message_id)

ANTI_PATTERN: Processing a message but not ACKing it. FIX: ALWAYS XACK after successful processing. Unacknowledged messages stay in the PEL, consume memory, and get redelivered on recovery.

Claiming Stale Messages (Recovery)

When a consumer crashes, its pending messages must be claimed by another consumer.

# Claim messages idle for more than 5 minutes
stale = await redis.xautoclaim(
    "ge:work:incoming",
    "orchestrator-group",
    f"orchestrator-{instance_id}",
    min_idle_time=300000,  # 5 minutes in milliseconds
    start_id="0",
    count=10
)

CHECK: Agent is implementing consumer recovery. IF: Recovery uses XCLAIM without checking idle time. THEN: Use XAUTOCLAIM with a minimum idle time. Claiming active messages from a healthy consumer breaks processing.

Dead Letter Handling

Redis has no built-in dead letter queue (DLQ). GE implements DLQ logic in the orchestrator.

# Check delivery count via XPENDING
pending = await redis.xpending_range(
    "ge:work:incoming",
    "orchestrator-group",
    min="-",
    max="+",
    count=100
)

for entry in pending:
    if entry["times_delivered"] > 3:
        # Move to DLQ stream
        original = await redis.xrange(
            "ge:work:incoming",
            min=entry["message_id"],
            max=entry["message_id"]
        )
        if original:
            await redis.xadd(
                "ge:dlq",
                {**original[0][1], "original_stream": "ge:work:incoming",
                 "failure_count": str(entry["times_delivered"])},
                maxlen=1000,
                approximate=True
            )
            await redis.xack(
                "ge:work:incoming",
                "orchestrator-group",
                entry["message_id"]
            )

Idempotency

Messages may be delivered more than once (consumer crash before XACK, XAUTOCLAIM recovery). All consumers MUST be idempotent.

GE executor uses a work_item_id dedup window of 5 minutes:

# Dedup check in executor
dedup_key = f"dedup:{work_item_id}"
if await redis.set(dedup_key, "1", nx=True, ex=300):
    # First delivery — process
    await process(message)
else:
    # Duplicate — skip and ACK
    await redis.xack(stream, group, message_id)

CHECK: Agent is consuming Stream messages. IF: Consumer does not handle duplicate delivery. THEN: Add dedup logic. Use work_item_id with a TTL key in Redis.

Stream Trimming

MAXLEN (Preferred)

# Approximate trim — Redis may keep slightly more for efficiency
await redis.xtrim("triggers.martijn", maxlen=100, approximate=True)

MINID (Time-Based)

# Remove entries older than 24 hours
import time
cutoff = int((time.time() - 86400) * 1000)
await redis.xtrim("ge:health", minid=cutoff)

ANTI_PATTERN: Using XDEL to clean up processed messages. FIX: Use XTRIM with MAXLEN or MINID. XDEL marks entries as deleted but does not free memory until the entire radix tree macro-node is empty — creating "Swiss cheese" fragmentation.

Monitoring

Key Commands

# Stream length
XLEN ge:work:incoming

# Consumer group info
XINFO GROUPS ge:work:incoming

# Pending entries per consumer
XPENDING ge:work:incoming orchestrator-group - + 10

# Stream info (first/last entry, length)
XINFO STREAM ge:work:incoming

Health Thresholds (GE)

Metric Warning Critical
Stream length > MAXLEN * 2 Yes MAXLEN * 5
PEL size per consumer > 50 Yes > 200
Message idle time > 5 min Recovery triggered > 15 min escalate
Consumer lag > 100 messages Yes > 500

GE-Specific Conventions

  • MAXLEN on EVERY XADD — 100 for per-agent, 1000 for system streams.
  • Never double-publish — One stream per task, orchestrator routes.
  • XREADGROUP only — No plain XREAD for work processing.
  • Always XACK — Unacked messages are memory leaks.
  • Idempotent consumers — Dedup with work_item_id.
  • No XDEL — Use XTRIM for cleanup.
  • Port 6381 — Always.

Cross-References

READ_ALSO: wiki/docs/stack/redis/index.md READ_ALSO: wiki/docs/stack/redis/patterns.md READ_ALSO: wiki/docs/stack/redis/pitfalls.md READ_ALSO: wiki/docs/stack/redis/checklist.md