Redis Streams Contract¶
This page DOCUMENTS the stream topology. It is NOT the source of truth.
AUTHORITATIVE SOURCE: config/dolly-routing.yaml (for stream names and routing rules) AUTHORITATIVE SOURCE: config/ports.yaml (for Redis connection details)
Code that uses a stream name MUST read it from config/dolly-routing.yaml or use the constant defined in the executor code. Code MUST NOT hardcode stream names as string literals.
Connection¶
Host: redis.ge-system.svc.cluster.local (k8s service DNS) Port: DEFINED IN config/ports.yaml (currently 6381 as of 2026-02-14) Password: from Vault at ge/redis StatefulSet: redis-0 in ge-system namespace
Stream Topology (as documented in INFRA-OVERVIEW 2026-02-14)¶
Agent Triggers¶
Pattern: triggers.{agent_name} Consumer group: executor-group One stream per registered agent (56 streams for 56 agents) Authoritative list of agents: ge-ops/master/AGENT-REGISTRY.json
Work Completion¶
Stream: ge:work:completed Published by: PLANNED (not yet implemented) — executor currently writes COMP-*.md files, host cron syncs to DB. ge-orchestrator listens on this stream but nothing publishes to it yet.
System¶
Stream: system.halt -- HALT broadcasts (all agents subscribe) Stream: system.health -- Health check heartbeats
Notifications¶
Stream: notifications.queue -- Portal notification routing
Decisions¶
Stream: decisions.pending -- Human decisions awaiting response Stream: decisions.resolved -- Resolved decisions
Message Format (Agent Triggers)¶
Reference format from INFRA-OVERVIEW. Verify against actual executor code at ge_agent/runner.py:
{
"trigger_id": "trig-{ISO-date}-{sequence}",
"timestamp": "{ISO-8601}",
"work_item": "{path to work item or task reference}",
"context": {
"client": "{client-slug}",
"project": "{project-slug}",
"team": "alpha|beta"
},
"triggered_by": "{agent_name|system|TaskService}",
"reason": "{trigger_reason}"
}
Consumer Group Configuration¶
The shared executor uses Redis consumer groups to ensure each trigger is delivered to exactly one pod:
- Group name:
executor-group(all executor pods share this group) - Consumer name:
$POD_NAME(unique per pod, from k8s downward API) - Read mode:
XREADGROUPwith>(new messages only for each consumer) - ACK: after execution completes (success or failure)
- Orphan recovery: on startup, each pod claims and ACKs messages from dead consumers (idle > 60s) to prevent pending message buildup from pod restarts
- Dedup:
exec_dedup:{work_item_id}Redis key with 5-min TTL prevents double execution even if the same trigger is delivered twice
Source of truth: ge_agent/listener.py
Rules¶
- ALL XADD calls MUST include MAXLEN ~100
- New streams require registration in config/dolly-routing.yaml BEFORE code is written
- Dedup on work_item_id with 5-min window prevents double execution
- Max hook chain depth: 3 (tasks that create tasks that create tasks)
- Hook depth: 1 for no_block tier
- Monitoring agent isolation: see hook-loops pitfall