Redis — Streams¶
OWNER: gerco (infrastructure) ALSO_USED_BY: urszula, maxim (application usage) LAST_VERIFIED: 2026-03-26 GE_STACK_VERSION: Redis 7.4
Overview¶
Redis Streams are GE's backbone for agent communication. Every work item, trigger, and completion signal flows through Streams. This page covers the patterns GE uses, consumer groups, and mandatory safety rules. Agents writing code that publishes or consumes Streams MUST follow these patterns.
Core Concepts¶
A Redis Stream is an append-only log.
Each entry has a unique, time-based ID (e.g., 1711468800000-0).
Entries are immutable — you do not update or delete individual entries.
The queue state (pending vs. completed) lives in the Consumer Group metadata, not in the data itself.
CHECK: Agent is treating a Stream like a mutable queue. IF: Agent uses XDEL after processing to "remove" consumed messages. THEN: Do NOT use XDEL. Use XACK to acknowledge. Use XTRIM with MAXLEN to manage memory.
GE Stream Architecture¶
Stream Names¶
| Stream | Purpose | MAXLEN |
|---|---|---|
ge:work:incoming |
New work items from Admin UI | ~1000 |
triggers.{agent_name} |
Per-agent work triggers from orchestrator | ~100 |
ge:completions |
Completion signals (future — currently COMP files) | ~1000 |
ge:health |
Health heartbeats from agents | ~500 |
ge:discussions |
Multi-agent discussion messages | ~500 |
CHECK: Agent is creating a new stream. IF: Stream name does not follow the patterns above. THEN: Get approval from gerco before creating new streams. Undiscovered streams waste memory.
Work Routing Flow¶
1. Admin UI → XADD ge:work:incoming * task "build-feature" agent_id "martijn" ...
2. ge-orchestrator consumer group reads ge:work:incoming
3. Orchestrator routes based on dolly-routing.yaml:
- work_type match
- explicit agent_id
- trigger pattern
4. Orchestrator → XADD triggers.martijn * task "build-feature" ...
5. ge-executor consumer for "martijn" reads triggers.martijn
6. Executor processes, writes COMP-*.md
XADD — Publishing Messages¶
Mandatory MAXLEN¶
Every XADD in GE code MUST include a MAXLEN constraint.
# CORRECT — Python (ioredis equivalent in Node)
await redis.xadd(
f"triggers.{agent_name}",
{"task": task, "type": work_type, "work_item_id": work_id},
maxlen=100,
approximate=True # ~ prefix — allows Redis to optimize trimming
)
// CORRECT — Node.js (ioredis)
await redis.xadd(
`triggers.${agentName}`,
'MAXLEN', '~', '100',
'*',
'task', task,
'type', workType,
'work_item_id', workId
);
ANTI_PATTERN: XADD without MAXLEN. FIX: Add MAXLEN. Per-agent streams: ~100. System streams: ~1000. No exceptions.
ANTI_PATTERN: Publishing the same task to BOTH triggers.{agent} AND ge:work:incoming.
FIX: Publish to ONE stream only. The orchestrator routes from ge:work:incoming to triggers.{agent}. Double-publishing causes double execution and double cost.
Message Fields¶
GE messages use these standard fields:
| Field | Required | Description |
|---|---|---|
task |
Yes | Description of what to do |
type |
Yes | Work type for routing |
inbox_id |
Yes | Unique identifier for dedup |
priority |
No | low, normal, high, critical |
work_item_id |
Yes | Dedup key (5-min window in executor) |
work_item |
No | Serialized work package JSON |
agent_id |
No | Explicit target agent (overrides routing) |
Consumer Groups¶
Setup¶
Consumer groups are created once per stream. The orchestrator and executor create their groups on startup.
# Create consumer group (idempotent — MKSTREAM creates stream if missing)
try:
await redis.xgroup_create(
"ge:work:incoming",
"orchestrator-group",
id="0",
mkstream=True
)
except ResponseError as e:
if "BUSYGROUP" not in str(e):
raise # Group already exists — fine
Reading with XREADGROUP¶
# Read new messages (> means "only undelivered messages")
messages = await redis.xreadgroup(
groupname="orchestrator-group",
consumername=f"orchestrator-{instance_id}",
streams={"ge:work:incoming": ">"},
count=10,
block=1000 # Block up to 1 second for new messages
)
CHECK: Agent is reading from a stream. IF: Agent uses XREAD instead of XREADGROUP. THEN: Use XREADGROUP. XREAD does not track delivery or support acknowledgment. Consumer groups provide load balancing, PEL tracking, and failure recovery.
Acknowledging with XACK¶
# After successful processing
await redis.xack("ge:work:incoming", "orchestrator-group", message_id)
ANTI_PATTERN: Processing a message but not ACKing it. FIX: ALWAYS XACK after successful processing. Unacknowledged messages stay in the PEL, consume memory, and get redelivered on recovery.
Claiming Stale Messages (Recovery)¶
When a consumer crashes, its pending messages must be claimed by another consumer.
# Claim messages idle for more than 5 minutes
stale = await redis.xautoclaim(
"ge:work:incoming",
"orchestrator-group",
f"orchestrator-{instance_id}",
min_idle_time=300000, # 5 minutes in milliseconds
start_id="0",
count=10
)
CHECK: Agent is implementing consumer recovery. IF: Recovery uses XCLAIM without checking idle time. THEN: Use XAUTOCLAIM with a minimum idle time. Claiming active messages from a healthy consumer breaks processing.
Dead Letter Handling¶
Redis has no built-in dead letter queue (DLQ). GE implements DLQ logic in the orchestrator.
# Check delivery count via XPENDING
pending = await redis.xpending_range(
"ge:work:incoming",
"orchestrator-group",
min="-",
max="+",
count=100
)
for entry in pending:
if entry["times_delivered"] > 3:
# Move to DLQ stream
original = await redis.xrange(
"ge:work:incoming",
min=entry["message_id"],
max=entry["message_id"]
)
if original:
await redis.xadd(
"ge:dlq",
{**original[0][1], "original_stream": "ge:work:incoming",
"failure_count": str(entry["times_delivered"])},
maxlen=1000,
approximate=True
)
await redis.xack(
"ge:work:incoming",
"orchestrator-group",
entry["message_id"]
)
Idempotency¶
Messages may be delivered more than once (consumer crash before XACK, XAUTOCLAIM recovery). All consumers MUST be idempotent.
GE executor uses a work_item_id dedup window of 5 minutes:
# Dedup check in executor
dedup_key = f"dedup:{work_item_id}"
if await redis.set(dedup_key, "1", nx=True, ex=300):
# First delivery — process
await process(message)
else:
# Duplicate — skip and ACK
await redis.xack(stream, group, message_id)
CHECK: Agent is consuming Stream messages.
IF: Consumer does not handle duplicate delivery.
THEN: Add dedup logic. Use work_item_id with a TTL key in Redis.
Stream Trimming¶
MAXLEN (Preferred)¶
# Approximate trim — Redis may keep slightly more for efficiency
await redis.xtrim("triggers.martijn", maxlen=100, approximate=True)
MINID (Time-Based)¶
# Remove entries older than 24 hours
import time
cutoff = int((time.time() - 86400) * 1000)
await redis.xtrim("ge:health", minid=cutoff)
ANTI_PATTERN: Using XDEL to clean up processed messages. FIX: Use XTRIM with MAXLEN or MINID. XDEL marks entries as deleted but does not free memory until the entire radix tree macro-node is empty — creating "Swiss cheese" fragmentation.
Monitoring¶
Key Commands¶
# Stream length
XLEN ge:work:incoming
# Consumer group info
XINFO GROUPS ge:work:incoming
# Pending entries per consumer
XPENDING ge:work:incoming orchestrator-group - + 10
# Stream info (first/last entry, length)
XINFO STREAM ge:work:incoming
Health Thresholds (GE)¶
| Metric | Warning | Critical |
|---|---|---|
| Stream length > MAXLEN * 2 | Yes | MAXLEN * 5 |
| PEL size per consumer > 50 | Yes | > 200 |
| Message idle time > 5 min | Recovery triggered | > 15 min escalate |
| Consumer lag > 100 messages | Yes | > 500 |
GE-Specific Conventions¶
- MAXLEN on EVERY XADD — 100 for per-agent, 1000 for system streams.
- Never double-publish — One stream per task, orchestrator routes.
- XREADGROUP only — No plain XREAD for work processing.
- Always XACK — Unacked messages are memory leaks.
- Idempotent consumers — Dedup with
work_item_id. - No XDEL — Use XTRIM for cleanup.
- Port 6381 — Always.
Cross-References¶
READ_ALSO: wiki/docs/stack/redis/index.md READ_ALSO: wiki/docs/stack/redis/patterns.md READ_ALSO: wiki/docs/stack/redis/pitfalls.md READ_ALSO: wiki/docs/stack/redis/checklist.md