Skip to content

Topology A (EOAM)

Event Bus

dir_core.event_bus

In-memory Event Bus for agent/runtime communication.

Swappable: same subscribe/dispatch interface can be backed by Kafka/PubSub later. See docs/00-do-not-publish/event_bus.py for original reference.

DIR Topologies §2: Event-Oriented Agent Mesh (EOAM) uses event bus for decentralized agent activation. Agents subscribe to topics matching their Responsibility Contract scope.

Event dataclass

Event with type, payload, and metadata (DIR Topologies §2.2).

Source code in src/dir_core/event_bus.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass
class Event:
    """Event with type, payload, and metadata (DIR Topologies §2.2)."""

    type: Union[str, EventType]
    payload: Dict[str, Any]
    metadata: EventMetadata = field(default_factory=EventMetadata)

    def __post_init__(self):
        if isinstance(self.type, str):
            try:
                self.type = EventType(self.type)
            except ValueError:
                pass  # Allow custom string types

    @property
    def type_key(self) -> str:
        return self.type.value if isinstance(self.type, EventType) else self.type

EventBus

Synchronous in-memory EventBus implementing EventBusProtocol. Replace with a Kafka/PubSub-backed implementation using the same interface.

Features: - Scope-based filtering (EOAM semantic routing) - Priority ordering - DFID correlation logging

Source code in src/dir_core/event_bus.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class EventBus:
    """
    Synchronous in-memory EventBus implementing EventBusProtocol.
    Replace with a Kafka/PubSub-backed implementation using the same interface.

    Features:
    - Scope-based filtering (EOAM semantic routing)
    - Priority ordering
    - DFID correlation logging
    """

    def __init__(self, name: str = "InMemory") -> None:
        self._name = name
        self._listeners: Dict[str, List[Subscription]] = {}
        self._lock = threading.Lock()
        self._event_count = 0

    def subscribe(
        self, 
        event_type: Union[EventType, str], 
        callback: Callable[[Dict[str, Any]], None],
        scope: Optional[str] = None,
    ) -> None:
        key = event_type.value if isinstance(event_type, EventType) else event_type
        with self._lock:
            if key not in self._listeners:
                self._listeners[key] = []
            self._listeners[key].append(Subscription(callback=callback, scope=scope))
            logger.debug("[%s] Subscribed to %s (scope=%s)", self._name, key, scope)

    def unsubscribe(
        self, 
        event_type: Union[EventType, str], 
        callback: Callable[[Dict[str, Any]], None],
    ) -> None:
        key = event_type.value if isinstance(event_type, EventType) else event_type
        with self._lock:
            if key in self._listeners:
                self._listeners[key] = [s for s in self._listeners[key] if s.callback != callback]
                if not self._listeners[key]:
                    del self._listeners[key]

    def dispatch(self, event: Event) -> int:
        key = event.type_key
        target_scope = event.metadata.target_scope
        dfid = event.metadata.dfid

        with self._lock:
            all_subs = list(self._listeners.get(key, []))

        # Filter by scope
        matching_subs = []
        for sub in all_subs:
            if sub.scope is None or sub.scope == "*":
                matching_subs.append(sub)
            elif target_scope and sub.scope == target_scope:
                matching_subs.append(sub)

        dfid_prefix = f"[DFID={dfid}] " if dfid else ""
        logger.info("%s[%s] Dispatching %s to %d/%d listeners (scope=%s)", 
                   dfid_prefix, self._name, key, len(matching_subs), len(all_subs), target_scope)

        notified = 0
        for sub in matching_subs:
            try:
                sub.callback(event.payload)
                notified += 1
            except Exception:
                logger.exception("%sEventBus listener error for %s", dfid_prefix, key)

        self._event_count += 1
        return notified

    def publish(
        self, 
        event_type: Union[EventType, str], 
        payload: Dict[str, Any],
        metadata: Optional[EventMetadata] = None,
    ) -> int:
        """Convenience: create Event and dispatch."""
        event = Event(type=event_type, payload=payload, metadata=metadata or EventMetadata())
        return self.dispatch(event)

    @property
    def event_count(self) -> int:
        return self._event_count

    @property
    def subscription_count(self) -> int:
        with self._lock:
            return sum(len(subs) for subs in self._listeners.values())

publish(event_type, payload, metadata=None)

Convenience: create Event and dispatch.

Source code in src/dir_core/event_bus.py
181
182
183
184
185
186
187
188
189
def publish(
    self, 
    event_type: Union[EventType, str], 
    payload: Dict[str, Any],
    metadata: Optional[EventMetadata] = None,
) -> int:
    """Convenience: create Event and dispatch."""
    event = Event(type=event_type, payload=payload, metadata=metadata or EventMetadata())
    return self.dispatch(event)

EventBusProtocol

Bases: Protocol

Protocol for swappable EventBus implementations (DIR §10.2).

Source code in src/dir_core/event_bus.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class EventBusProtocol(Protocol):
    """Protocol for swappable EventBus implementations (DIR §10.2)."""

    def subscribe(
        self, 
        event_type: Union[EventType, str], 
        callback: Callable[[Dict[str, Any]], None],
        scope: Optional[str] = None,
    ) -> None:
        """Subscribe to events of given type, optionally filtered by scope."""
        ...

    def unsubscribe(
        self, 
        event_type: Union[EventType, str], 
        callback: Callable[[Dict[str, Any]], None],
    ) -> None:
        """Remove subscription."""
        ...

    def dispatch(self, event: Event) -> int:
        """Dispatch event to subscribers. Returns number of listeners notified."""
        ...

    def publish(
        self, 
        event_type: Union[EventType, str], 
        payload: Dict[str, Any],
        metadata: Optional[EventMetadata] = None,
    ) -> int:
        """Convenience: create Event and dispatch."""
        ...

dispatch(event)

Dispatch event to subscribers. Returns number of listeners notified.

Source code in src/dir_core/event_bus.py
82
83
84
def dispatch(self, event: Event) -> int:
    """Dispatch event to subscribers. Returns number of listeners notified."""
    ...

publish(event_type, payload, metadata=None)

Convenience: create Event and dispatch.

Source code in src/dir_core/event_bus.py
86
87
88
89
90
91
92
93
def publish(
    self, 
    event_type: Union[EventType, str], 
    payload: Dict[str, Any],
    metadata: Optional[EventMetadata] = None,
) -> int:
    """Convenience: create Event and dispatch."""
    ...

subscribe(event_type, callback, scope=None)

Subscribe to events of given type, optionally filtered by scope.

Source code in src/dir_core/event_bus.py
65
66
67
68
69
70
71
72
def subscribe(
    self, 
    event_type: Union[EventType, str], 
    callback: Callable[[Dict[str, Any]], None],
    scope: Optional[str] = None,
) -> None:
    """Subscribe to events of given type, optionally filtered by scope."""
    ...

unsubscribe(event_type, callback)

Remove subscription.

Source code in src/dir_core/event_bus.py
74
75
76
77
78
79
80
def unsubscribe(
    self, 
    event_type: Union[EventType, str], 
    callback: Callable[[Dict[str, Any]], None],
) -> None:
    """Remove subscription."""
    ...

EventMetadata dataclass

Metadata for event routing and tracing (DIR Topologies §2.2).

Source code in src/dir_core/event_bus.py
25
26
27
28
29
30
31
32
33
34
@dataclass
class EventMetadata:
    """Metadata for event routing and tracing (DIR Topologies §2.2)."""

    dfid: Optional[str] = None  # DecisionFlow ID for correlation
    priority: int = 5  # 1=highest, 10=lowest
    source_agent: Optional[str] = None
    target_scope: Optional[str] = None  # e.g., "BTC-USD", "*" for broadcast
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    context_snapshot_id: Optional[str] = None  # For JIT verification

LoggingEventBus

Wrapper that logs all events for debugging/audit.

Source code in src/dir_core/event_bus.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class LoggingEventBus:
    """Wrapper that logs all events for debugging/audit."""

    def __init__(self, wrapped: EventBus):
        self._wrapped = wrapped
        self._event_log: List[Event] = []

    def subscribe(self, *args, **kwargs) -> None:
        return self._wrapped.subscribe(*args, **kwargs)

    def unsubscribe(self, *args, **kwargs) -> None:
        return self._wrapped.unsubscribe(*args, **kwargs)

    def dispatch(self, event: Event) -> int:
        self._event_log.append(event)
        logger.info("[AUDIT] Event logged: type=%s dfid=%s", 
                   event.type_key, event.metadata.dfid)
        return self._wrapped.dispatch(event)

    def publish(self, event_type, payload, metadata=None) -> int:
        event = Event(type=event_type, payload=payload, metadata=metadata or EventMetadata())
        return self.dispatch(event)

    def get_event_log(self) -> List[Event]:
        return self._event_log.copy()

    @property
    def event_count(self) -> int:
        return self._wrapped.event_count

Subscription dataclass

Subscription with optional scope filter.

Source code in src/dir_core/event_bus.py
101
102
103
104
105
@dataclass
class Subscription:
    """Subscription with optional scope filter."""
    callback: Callable[[Dict[str, Any]], None]
    scope: Optional[str] = None  # None = all, "*" = broadcast, "BTC-USD" = specific

create_event_bus(backend=None, with_logging=False)

Factory for creating EventBus instances.

Parameters:

Name Type Description Default
backend Optional[str]

"memory" (default), or future: "kafka", "pubsub"

None
with_logging bool

Wrap in LoggingEventBus for audit

False
Environment

EVENT_BUS_BACKEND: Override backend selection

Source code in src/dir_core/event_bus.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def create_event_bus(backend: Optional[str] = None, with_logging: bool = False) -> EventBus:
    """Factory for creating EventBus instances.

    Args:
        backend: "memory" (default), or future: "kafka", "pubsub"
        with_logging: Wrap in LoggingEventBus for audit

    Environment:
        EVENT_BUS_BACKEND: Override backend selection
    """
    raw = (backend or os.environ.get("EVENT_BUS_BACKEND", EventBusBackend.MEMORY)).strip().lower()
    try:
        backend_e = EventBusBackend(raw)
    except ValueError as e:
        raise ValueError(f"Unknown backend: {raw!r}") from e

    if backend_e == EventBusBackend.MEMORY:
        bus = EventBus(name="InMemory")
    elif backend_e == EventBusBackend.KAFKA:
        # Future: return KafkaEventBus()
        raise NotImplementedError("Kafka backend not yet implemented")
    elif backend_e == EventBusBackend.PUBSUB:
        # Future: return PubSubEventBus()
        raise NotImplementedError("PubSub backend not yet implemented")
    else:
        raise ValueError(f"Unknown backend: {raw!r}")

    if with_logging:
        return LoggingEventBus(bus)

    return bus

Wakeup Predicates

dir_core.wakeup

Wake-up Predicates (DIR Topologies §2.3) — Signal Suppression.

Low-cost heuristics evaluated BEFORE activating expensive LLM agents. If any predicate returns False, the agent is not woken up (Token Burn prevention).

WakeupPredicate dataclass

Low-cost heuristic to prevent Token Burn.

Evaluated BEFORE activating expensive LLM agent. If predicate returns False, agent is not woken up.

Source code in src/dir_core/wakeup.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclass
class WakeupPredicate:
    """Low-cost heuristic to prevent Token Burn.

    Evaluated BEFORE activating expensive LLM agent.
    If predicate returns False, agent is not woken up.
    """

    name: str
    condition: Callable[[Dict[str, Any]], bool]

    def evaluate(self, payload: Dict[str, Any]) -> bool:
        result = self.condition(payload)
        logger.debug(
            "  Predicate '%s': %s", self.name, "PASS" if result else "SKIP"
        )
        return result

is_relevant_instrument(payload, instruments)

Wake up only for specific instruments.

Source code in src/dir_core/wakeup.py
49
50
51
52
53
def is_relevant_instrument(
    payload: Dict[str, Any], instruments: List[str]
) -> bool:
    """Wake up only for specific instruments."""
    return payload.get("instrument") in instruments

price_change_significant(payload, threshold=0.005)

Wake up only if price change > threshold (0.5% default).

Source code in src/dir_core/wakeup.py
34
35
36
37
38
39
def price_change_significant(
    payload: Dict[str, Any], threshold: float = 0.005
) -> bool:
    """Wake up only if price change > threshold (0.5% default)."""
    delta = abs(payload.get("price_delta_pct", 0))
    return delta > threshold

should_wake(payload, predicates)

Evaluate all wake-up predicates. All must pass for agent to wake.

Source code in src/dir_core/wakeup.py
56
57
58
59
60
61
62
63
def should_wake(
    payload: Dict[str, Any], predicates: List[WakeupPredicate]
) -> bool:
    """Evaluate all wake-up predicates. All must pass for agent to wake."""
    for predicate in predicates:
        if not predicate.evaluate(payload):
            return False
    return True

volatility_elevated(payload, threshold=0.03)

Wake up only if volatility is elevated.

Source code in src/dir_core/wakeup.py
42
43
44
45
46
def volatility_elevated(
    payload: Dict[str, Any], threshold: float = 0.03
) -> bool:
    """Wake up only if volatility is elevated."""
    return payload.get("volatility", 0) > threshold