ge-orchestrator¶
Event-driven HA orchestrator. Replaces dolly-orchestrator.py (2130-line monolith) with a pure Redis Streams architecture. Multiple replicas in the same consumer group for automatic failover.
Source: ge-orchestrator/ (kebab-case directory, 4 Python files).
Architecture¶
ge:work:incoming → Orchestrator (consumer group) → triggers.{agent}
ge:work:completed → Orchestrator → PostgreSQL update + next-step routing
No filesystem polling. No disk I/O. All state in PostgreSQL. All routing via Redis.
5 Async Loops¶
| Loop | Purpose | Interval |
|---|---|---|
route_incoming |
Reads ge:work:incoming, routes to triggers.{agent} |
Continuous (XREADGROUP, 5s block) |
process_completions |
Reads ge:work:completed, updates DB, routes next steps |
Continuous (XREADGROUP, 5s block) |
health_reporter |
Publishes heartbeat to system.health |
Every 30s |
recover_pending |
Claims orphaned messages from dead consumers | Every 60s |
config_watcher |
Listens for config reload signals (pub/sub) | Continuous |
Note: ge:work:completed stream has no publisher yet — executor writes COMP files instead. This loop is ready but idle until the executor XADD is implemented.
Routing Logic¶
ge-orchestrator/router.py resolves incoming messages to agents. Priority order:
- Explicit agent_id — message specifies target agent directly
- Work type match — maps work types to agent roles from
dolly-routing.yaml - Trigger pattern match — fnmatch patterns in routing config
Config: config/dolly-routing.yaml (hot-reloadable via pub/sub, no file watcher)
DAG Enforcement¶
ge-orchestrator/dag.py manages work package dependencies:
- When a WP completes, checks which blocked WPs are now unblocked
- Swimming lanes: same lane is sequential, different lanes run in parallel
- All state in
work_package_depsPostgreSQL table - Prevents out-of-order execution in dependent workflows
Streams¶
| Stream | Direction | Consumer Group |
|---|---|---|
ge:work:incoming |
Read | orchestrator-group |
ge:work:completed |
Read | orchestrator-group |
ge:work:failed |
Read | orchestrator-group |
triggers.{agent} |
Write | — |
system.health |
Write | — |
K8s Deployment¶
- Namespace:
ge-agents - Replicas: 2 (for HA — consumer group ensures no double-processing)
- PDB: PodDisruptionBudget ensures at least 1 replica during updates
- Manifest:
k8s/base/agents/ge-orchestrator.yaml - Image: Built alongside executor via build script
Key Files¶
| File | Purpose |
|---|---|
ge-orchestrator/main.py |
Entry point, 5 async loops, Orchestrator class |
ge-orchestrator/router.py |
Routing resolution (dolly-routing.yaml + registry) |
ge-orchestrator/dag.py |
Work package DAG + swimming lanes |
ge-orchestrator/Dockerfile |
Container build |
ge-orchestrator/requirements.txt |
Python dependencies |
Relation to ge_orchestrator/ (underscore)¶
The ge_orchestrator/ (underscore) Python package contains the completion scanner, hooks engine, and discussion handler — these are older components that run inside the executor, not inside the ge-orchestrator pod. Do not confuse the two:
| Directory | Runs in | Contains |
|---|---|---|
ge-orchestrator/ (kebab) |
ge-orchestrator pod | HA routing + DAG |
ge_orchestrator/ (underscore) |
ge-executor pod | completion scanner, hooks, discussions |