Skip to content

ge-orchestrator

Event-driven HA orchestrator. Replaces dolly-orchestrator.py (2130-line monolith) with a pure Redis Streams architecture. Multiple replicas in the same consumer group for automatic failover.

Source: ge-orchestrator/ (kebab-case directory, 4 Python files).

Architecture

ge:work:incoming → Orchestrator (consumer group) → triggers.{agent}
ge:work:completed → Orchestrator → PostgreSQL update + next-step routing

No filesystem polling. No disk I/O. All state in PostgreSQL. All routing via Redis.

5 Async Loops

Loop Purpose Interval
route_incoming Reads ge:work:incoming, routes to triggers.{agent} Continuous (XREADGROUP, 5s block)
process_completions Reads ge:work:completed, updates DB, routes next steps Continuous (XREADGROUP, 5s block)
health_reporter Publishes heartbeat to system.health Every 30s
recover_pending Claims orphaned messages from dead consumers Every 60s
config_watcher Listens for config reload signals (pub/sub) Continuous

Note: ge:work:completed stream has no publisher yet — executor writes COMP files instead. This loop is ready but idle until the executor XADD is implemented.

Routing Logic

ge-orchestrator/router.py resolves incoming messages to agents. Priority order:

  1. Explicit agent_id — message specifies target agent directly
  2. Work type match — maps work types to agent roles from dolly-routing.yaml
  3. Trigger pattern match — fnmatch patterns in routing config

Config: config/dolly-routing.yaml (hot-reloadable via pub/sub, no file watcher)

DAG Enforcement

ge-orchestrator/dag.py manages work package dependencies:

  • When a WP completes, checks which blocked WPs are now unblocked
  • Swimming lanes: same lane is sequential, different lanes run in parallel
  • All state in work_package_deps PostgreSQL table
  • Prevents out-of-order execution in dependent workflows

Streams

Stream Direction Consumer Group
ge:work:incoming Read orchestrator-group
ge:work:completed Read orchestrator-group
ge:work:failed Read orchestrator-group
triggers.{agent} Write
system.health Write

K8s Deployment

  • Namespace: ge-agents
  • Replicas: 2 (for HA — consumer group ensures no double-processing)
  • PDB: PodDisruptionBudget ensures at least 1 replica during updates
  • Manifest: k8s/base/agents/ge-orchestrator.yaml
  • Image: Built alongside executor via build script

Key Files

File Purpose
ge-orchestrator/main.py Entry point, 5 async loops, Orchestrator class
ge-orchestrator/router.py Routing resolution (dolly-routing.yaml + registry)
ge-orchestrator/dag.py Work package DAG + swimming lanes
ge-orchestrator/Dockerfile Container build
ge-orchestrator/requirements.txt Python dependencies

Relation to ge_orchestrator/ (underscore)

The ge_orchestrator/ (underscore) Python package contains the completion scanner, hooks engine, and discussion handler — these are older components that run inside the executor, not inside the ge-orchestrator pod. Do not confuse the two:

Directory Runs in Contains
ge-orchestrator/ (kebab) ge-orchestrator pod HA routing + DAG
ge_orchestrator/ (underscore) ge-executor pod completion scanner, hooks, discussions