Skip to content

Redis Streams Contract

This page DOCUMENTS the stream topology. It is NOT the source of truth.

AUTHORITATIVE SOURCE: config/dolly-routing.yaml (for stream names and routing rules) AUTHORITATIVE SOURCE: config/ports.yaml (for Redis connection details)

Code that uses a stream name MUST read it from config/dolly-routing.yaml or use the constant defined in the executor code. Code MUST NOT hardcode stream names as string literals.

Connection

Host: redis.ge-system.svc.cluster.local (k8s service DNS) Port: DEFINED IN config/ports.yaml (currently 6381 as of 2026-02-14) Password: from Vault at ge/redis StatefulSet: redis-0 in ge-system namespace

Stream Topology (as documented in INFRA-OVERVIEW 2026-02-14)

Agent Triggers

Pattern: triggers.{agent_name} Consumer group: executor-group One stream per registered agent (56 streams for 56 agents) Authoritative list of agents: ge-ops/master/AGENT-REGISTRY.json

Work Completion

Stream: ge:work:completed Published by: PLANNED (not yet implemented) — executor currently writes COMP-*.md files, host cron syncs to DB. ge-orchestrator listens on this stream but nothing publishes to it yet.

System

Stream: system.halt -- HALT broadcasts (all agents subscribe) Stream: system.health -- Health check heartbeats

Notifications

Stream: notifications.queue -- Portal notification routing

Decisions

Stream: decisions.pending -- Human decisions awaiting response Stream: decisions.resolved -- Resolved decisions

Message Format (Agent Triggers)

Reference format from INFRA-OVERVIEW. Verify against actual executor code at ge_agent/runner.py:

{
  "trigger_id": "trig-{ISO-date}-{sequence}",
  "timestamp": "{ISO-8601}",
  "work_item": "{path to work item or task reference}",
  "context": {
    "client": "{client-slug}",
    "project": "{project-slug}",
    "team": "alpha|beta"
  },
  "triggered_by": "{agent_name|system|TaskService}",
  "reason": "{trigger_reason}"
}

Consumer Group Configuration

The shared executor uses Redis consumer groups to ensure each trigger is delivered to exactly one pod:

  • Group name: executor-group (all executor pods share this group)
  • Consumer name: $POD_NAME (unique per pod, from k8s downward API)
  • Read mode: XREADGROUP with > (new messages only for each consumer)
  • ACK: after execution completes (success or failure)
  • Orphan recovery: on startup, each pod claims and ACKs messages from dead consumers (idle > 60s) to prevent pending message buildup from pod restarts
  • Dedup: exec_dedup:{work_item_id} Redis key with 5-min TTL prevents double execution even if the same trigger is delivered twice

Source of truth: ge_agent/listener.py

Rules

  • ALL XADD calls MUST include MAXLEN ~100
  • New streams require registration in config/dolly-routing.yaml BEFORE code is written
  • Dedup on work_item_id with 5-min window prevents double execution
  • Max hook chain depth: 3 (tasks that create tasks that create tasks)
  • Hook depth: 1 for no_block tier
  • Monitoring agent isolation: see hook-loops pitfall