Skip to content

ge-orchestrator

Event-driven HA orchestrator. Replaces dolly-orchestrator.py (2130-line monolith) with a pure Redis Streams architecture. Multiple replicas in the same consumer group for automatic failover.

Source: ge_orchestrator/ (single consolidated Python package).

Architecture

ge:work:incoming → Orchestrator (consumer group) → triggers.{team}.{agent} (or triggers.{agent} for shared)
ge:work:completed → Orchestrator → PostgreSQL update + next-step routing

No filesystem polling. No disk I/O. All state in PostgreSQL. All routing via Redis.

5 Async Loops

Loop Purpose Interval
route_incoming Reads ge:work:incoming, routes to triggers.{team}.{agent} (or triggers.{agent} for shared) Continuous (XREADGROUP, 5s block)
process_completions Reads ge:work:completed, updates DB, routes next steps Continuous (XREADGROUP, 5s block)
health_reporter Publishes heartbeat to system.health Every 30s
recover_pending Claims orphaned messages from dead consumers Every 60s
config_watcher Listens for config reload signals (pub/sub) Continuous

Note: ge:work:completed stream has no publisher yet — executor writes COMP files instead. This loop is ready but idle until the executor XADD is implemented.

Routing Logic

ge_orchestrator/router.py resolves incoming messages to agents. Priority order:

  1. Explicit agent_id — message specifies target agent directly
  2. Work type match — maps work types to agent roles from dolly-routing.yaml
  3. Trigger pattern match — fnmatch patterns in routing config

Config: config/dolly-routing.yaml (hot-reloadable via pub/sub, no file watcher)

DAG Enforcement

ge_orchestrator/dag.py manages work package dependencies:

  • When a WP completes, checks which blocked WPs are now unblocked
  • Swimming lanes: same lane is sequential, different lanes run in parallel
  • All state in work_package_deps PostgreSQL table
  • Prevents out-of-order execution in dependent workflows

Streams

Stream Direction Consumer Group
ge:work:incoming Read orchestrator-group
ge:work:completed Read orchestrator-group
ge:work:failed Read orchestrator-group
triggers.{team}.{agent} / triggers.{agent} Write
system.health Write

K8s Deployment

  • Namespace: ge-agents
  • Replicas: 2 (for HA — consumer group ensures no double-processing)
  • PDB: PodDisruptionBudget ensures at least 1 replica during updates
  • Manifest: k8s/base/agents/ge-orchestrator.yaml
  • Image: Built alongside executor via build script

Key Files

File Purpose
ge_orchestrator/main.py Entry point, 5 async loops, Orchestrator class
ge_orchestrator/router.py Routing resolution (dolly-routing.yaml + registry)
ge_orchestrator/dag.py Work package DAG + swimming lanes
ge_orchestrator/Dockerfile Container build
ge_orchestrator/requirements.txt Python dependencies

Package Structure

All orchestrator code lives in ge_orchestrator/ (underscore). This single package contains both the HA orchestrator (main.py, router.py, dag.py) and the supporting modules (completion scanner, hooks, discussions, rate limiter, circuit breaker, health tracker). There is no separate ge-orchestrator/ (hyphen) directory.