07 - Event Bus (Swappable) - EOAM Demonstration
Goal: Demonstrate the Event-Oriented Agent Mesh (EOAM) pattern from dir_core Topologies §2, showing how multiple agents reactively collaborate through an event bus.
DIR Alignment: Topologies §2 (EOAM), §2.1 (Scope-Based Choreography), §2.3 (Economic Guardrails), §2.4 (Priority-Based Preemption)
Concepts Demonstrated
| Concept | DIR Section | Implementation |
|---|---|---|
| EOAM Choreography | §2.1 | Agents subscribe to topics, react to events autonomously |
| Scope-Based Routing | §2.1 | scope parameter filters events to relevant agents |
| Wake-up Predicates | §2.3 | Low-cost heuristics prevent Token Burn (expensive LLM calls) |
| Priority-Based Preemption | §2.4 | Priority matrix selects winning proposal (Risk > Strategy) |
| DFID Correlation | §5.4 | Every event carries dfid in metadata for traceability |
| Swappable Backend | §10.2 | Factory pattern: create_event_bus(backend="memory") |
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ Event-Oriented Agent Mesh (EOAM) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ OBSERVATION Event (with DFID) │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ EventBus (swappable) │ │
│ │ Scope: BTC-USD │ Scope: ETH-USD │ Scope: * │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Wake-up │ │ Wake-up │ │ (no │ │
│ │ Predicate│ │ Predicate│ │ predicate│ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ PASS │ SKIP │ PASS │
│ ▼ ✗ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Technical│ │ Risk │ │
│ │ Agent │ │ Monitor │ │
│ └────┬─────┘ └────┬─────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ PolicyProposal Collection │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Priority-Based Arbitration │ │
│ │ RISK_ALERT(1) > CLOSE(2) > OPEN(3) > HOLD(6) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Winning Proposal │
│ │
└─────────────────────────────────────────────────────────────────────┘
How to run
From repo root:
pip install -e .
python samples/07_event_bus_swappable/run.py
Scenarios Demonstrated
Scenario A: Multi-Agent Reactive Activation
- Three agents (Risk, Technical, Sentiment) subscribe to OBSERVATION events
- All agents activated in parallel when observation is published
- Proposals collected and arbitrated
Scenario B: Risk Preemption
- High volatility triggers RISK_ALERT from risk monitor
- Priority matrix ensures RISK_ALERT (priority=1) wins over other proposals
- Demonstrates: "Safety always overrides strategy"
Scenario C: Wake-up Predicates (Token Burn Prevention)
- Expensive LLM agent has wake-up predicates:
significant_move: only wake if price change > 1%btc_only: only wake for BTC-USD- Small moves (0.2%) → agent NOT activated (token saved)
- Large moves (2.5%) → agent activated
Scenario D: Scope-Based Routing
btc_specialistsubscribes withscope="BTC-USD"eth_specialistsubscribes withscope="ETH-USD"global_risksubscribes withscope="*"(broadcast)- BTC event only reaches btc_specialist + global_risk
Scenario E: Swappable Backend Pattern
LoggingEventBuswrapsEventBusfor audit- All events logged for compliance/debugging
- Same interface, different behavior
Key Classes
# EventMetadata - routing and tracing info
metadata = EventMetadata(
dfid="abc-123", # DecisionFlow correlation
priority=1, # Event priority
source_agent="risk_mon", # Who emitted
target_scope="BTC-USD", # Semantic routing filter
)
# Event with metadata
event = Event(
type=EventType.OBSERVATION,
payload={"price": 67500, "volatility": 0.03},
metadata=metadata,
)
# Wake-up Predicate
predicate = WakeupPredicate(
name="significant_move",
condition=lambda p: abs(p.get("price_delta_pct", 0)) > 0.01
)
# Swappable backend factory
bus = create_event_bus(backend="memory", with_logging=True)
Swapping for Kafka/PubSub
The EventBusProtocol defines the interface:
class EventBusProtocol(Protocol):
def subscribe(self, event_type, callback, scope=None) -> None: ...
def unsubscribe(self, event_type, callback) -> None: ...
def dispatch(self, event: Event) -> int: ...
def publish(self, event_type, payload, metadata=None) -> int: ...
To implement Kafka backend:
1. Create KafkaEventBus implementing the protocol
2. Add to factory: create_event_bus(backend="kafka")
3. Set env: EVENT_BUS_BACKEND=kafka
Expected Output
======================================================================
Event Bus Sample - Event-Oriented Agent Mesh (EOAM)
======================================================================
[SCENARIO A] Multi-agent reactive activation
DFID: abc123...
Proposals collected: 3
Winner: SENTIMENT_BULLISH from sentiment_agent
----------------------------------------------------------------------
[SCENARIO B] Risk preemption - high volatility overrides strategy
DFID: def456...
Proposals: 3
Winner: RISK_ALERT (risk alert preempts other proposals)
----------------------------------------------------------------------
[SCENARIO C] Wake-up predicates - preventing Token Burn
Small move (0.2%):
Proposals: 1
Expensive agent stats: {'events_received': 1, 'activated': 0, 'suppressed': 1}
Large move (2.5%):
Proposals: 2
Expensive agent stats: {'events_received': 2, 'activated': 1, 'suppressed': 1}
→ Token Burn prevented: 1 activations skipped
...
Why EOAM Matters
from dir_core Topologies §2:
"EOAM is a decentralized architectural pattern where autonomous agents collaborate through a reactive event substrate. It defines a system that is 'Decentralized in activation, centralized in authority.'"
Key benefits: - Parallelism: Agents reason concurrently - Resilience: One agent failure doesn't stop others - Scalability: Add agents without changing orchestration - Auditability: DFID traces every decision flow