Skip to content

Core Framework

Decision Runtime

dir_core.runtime

DecisionRuntime — facade wiring StorageBundle to kernel services (DIR DX).

Single entry point for AgentRegistry, ContextStore, EscalationManager, AuditStore, plus orchestrated DIM validation with optional decision-audit rows.

DecisionRuntime

Wire :class:~dir_core.storage.StorageBundle to kernel services.

Source code in src/dir_core/runtime.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class DecisionRuntime:
    """Wire :class:`~dir_core.storage.StorageBundle` to kernel services."""

    def __init__(
        self,
        storage_bundle: StorageBundle,
        *,
        supported_versions: str = "1.x",
        max_escalations_per_hour: int = 3,
        escalation_refill_interval_sec: int = 3600,
    ) -> None:
        self.registry = AgentRegistry(
            storage=storage_bundle.agent_registry,
            supported_versions=supported_versions,
        )
        self.context_store = ContextStore(storage=storage_bundle.context)
        self.escalation = EscalationManager(
            storage=storage_bundle.escalation,
            max_escalations_per_hour=max_escalations_per_hour,
            refill_interval_sec=escalation_refill_interval_sec,
        )
        self.audit = AuditStore(
            storage_bundle.decision_audit,
            storage_bundle.idempotency,
        )

    def register_agent(
        self,
        agent_id: str,
        contract: dict[str, Any],
        agent_version: str,
        *,
        priority: int = 0,
    ) -> HandshakeResult:
        return self.registry.handshake(
            agent_id, contract, agent_version, priority=priority
        )

    def evaluate_proposal(
        self,
        proposal: PolicyProposal,
        raw_web_context: dict[str, Any],
        *,
        dim_context: dict[str, Any] | None = None,
        allowed_agents: list[str] | None = None,
        contract: dict[str, Any] | None = None,
        use_registry_contract: bool = True,
        retry_governor: IntentRetryGovernor | None = None,
        custom_validators: Optional[list[CustomValidator]] = None,
        now: datetime | None = None,
        record_audit: bool = True,
    ) -> ValidationResult:
        self.context_store.update_session(proposal.dfid, dict(raw_web_context))

        if dim_context is not None:
            context: dict[str, Any] = dict(dim_context)
            meta = dict(context.get("meta") or {})
            meta.setdefault("dfid", proposal.dfid)
            meta.setdefault("agent_id", proposal.agent_id)
            context["meta"] = meta
        else:
            ctx = self.context_store.compile_working_context(
                proposal.agent_id, proposal.dfid
            )
            ctx = dict(ctx)
            ctx["web"] = dict(raw_web_context)
            meta = dict(ctx.get("meta") or {})
            meta["dfid"] = proposal.dfid
            meta["agent_id"] = proposal.agent_id
            schema = self.registry.get_schema(proposal.agent_id)
            if schema is not None:
                meta["schema"] = schema
            ctx["meta"] = meta
            context = ctx

        resolved_contract = contract
        if resolved_contract is None and use_registry_contract:
            resolved_contract = self.registry.get_agent_contract(proposal.agent_id)

        verdict, reason = validate_proposal(
            proposal,
            context,
            allowed_agents=allowed_agents,
            now=now,
            retry_governor=retry_governor,
            contract=resolved_contract,
            custom_validators=custom_validators,
        )

        if record_audit and verdict in (
            ValidationVerdict.ACCEPT,
            ValidationVerdict.REJECT,
        ):
            event = (
                "PROPOSAL_ACCEPT"
                if verdict == ValidationVerdict.ACCEPT
                else "PROPOSAL_REJECT"
            )
            details: dict[str, Any] = {
                "agent_id": proposal.agent_id,
                "policy_kind": proposal.policy_kind,
                "reason": str(reason),
                "verdict": str(verdict),
                "confidence": proposal.confidence,
            }
            self.audit.record(proposal.dfid, event, details=details)

        result: ValidationResult = (verdict, reason)
        return result

Decision Integrity Module (DIM)

dir_core.dim

Decision Integrity Module (DIM): schema + RBAC + state consistency.

DIR §6. Validates PolicyProposal; returns ValidationVerdict with a reason (str or DimReasonCode). Ensures that only authorized agents can execute specific policies within safe bounds.

validate_proposal(proposal, context, allowed_agents=None, now=None, retry_governor=None, contract=None, custom_validators=None)

Validate a PolicyProposal against schema, RBAC, TTL, generic contract boundaries, and custom domain-specific validators.

Source code in src/dir_core/dim.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def validate_proposal(
    proposal: PolicyProposal,
    context: Dict[str, Any],
    allowed_agents: Optional[List[str]] = None,
    now: Optional[datetime] = None,
    retry_governor: Optional["IntentRetryGovernor"] = None,
    contract: Optional[Dict[str, Any]] = None,
    custom_validators: Optional[List[Callable[[PolicyProposal, Dict[str, Any], Dict[str, Any]], Optional[str]]]] = None,
) -> ValidationResult:
    """
    Validate a PolicyProposal against schema, RBAC, TTL, generic contract boundaries,
    and custom domain-specific validators.
    """
    now = now or datetime.now(timezone.utc)

    # 0. Intent Retry Governor
    if retry_governor is not None and retry_governor.should_abort(proposal.dfid):
        return ValidationVerdict.REJECT, DimReasonCode.REASONING_EXHAUSTION

    def _reject(reason: str | DimReasonCode) -> ValidationResult:
        if retry_governor is not None:
            retry_governor.record_rejection(proposal.dfid)
        return ValidationVerdict.REJECT, reason

    # 1. Schema Validation
    if not proposal.policy_kind:
        return _reject("Missing policy_kind")
    if not proposal.agent_id:
        return _reject("Missing agent_id")

    # 2. TTL / Decision Validity Window
    valid_until = _resolve_valid_until(proposal)
    if valid_until is not None and now > valid_until:
        return _reject(DimReasonCode.TTL_EXPIRED)

    # 3. RBAC (Role-Based Access Control)
    if allowed_agents is not None:
        if proposal.agent_id not in allowed_agents:
            return _reject(f"Agent '{proposal.agent_id}' not authorized (RBAC)")

    # 4. Generic Contract Boundaries (if contract provided)
    if contract:
        # Handle nested variants (e.g. from samples/00_quick_start) vs flat
        permissions = contract.get("permissions", contract)
        safety_rules = contract.get("safety_rules", contract)

        # 4a. Validate allowed policies
        allowed_policies = permissions.get("allowed_policy_types")
        if allowed_policies is not None and proposal.policy_kind not in allowed_policies:
            return _reject(
                f"Policy '{proposal.policy_kind}' is not in allowed_policy_types: {allowed_policies}"
            )

        # 4b. Validate minimum confidence
        min_conf = safety_rules.get("min_confidence_threshold")
        if min_conf is not None and proposal.confidence < float(min_conf):
            return _reject(
                f"Proposal confidence ({proposal.confidence}) is below threshold ({min_conf})"
            )

    # 4.1. Context/State Consistency (Legacy stub)
    state = context.get("state", {})
    risk_score = state.get("risk_score", 0.0)
    if proposal.policy_kind == "deploy_to_production" and risk_score > 0.8:
        return _reject(f"Risk score {risk_score} too high for deployment")

    # 5. Domain-Specific Validators (Edge cases & business logic)
    if custom_validators:
        for validator in custom_validators:
            # Custom validator returns a string reason if rejected, else None
            reason = validator(proposal, context, contract or {})
            if reason is not None:
                return _reject(f"Custom validation failed: {reason}")

    return ValidationVerdict.ACCEPT, DimReasonCode.VALIDATION_PASSED

Lifecycle

dir_core.lifecycle

DecisionFlow lifecycle: CREATED -> ACTIVE -> VALIDATING -> ACCEPTED|ABORTED|ESCALATED -> EXECUTING -> CLOSED.

DIR §4.3, §9. Persists transitions; resets IntentRetryGovernor on terminal states.

transition(dfid, from_status, to_status, retry_governor=None, db_path=None, *, storage=None)

Record a flow status transition.

On CLOSED/ABORTED, resets the retry governor for dfid.

Parameters:

Name Type Description Default
dfid str

DecisionFlow identifier.

required
from_status FlowStatus

Current status.

required
to_status FlowStatus

Target status.

required
retry_governor Optional[IntentRetryGovernor]

Optional governor to reset on terminal transitions.

None
db_path Optional[str]

SQLite path for persistence (legacy kwarg). When storage is also provided, storage takes precedence.

None
storage Optional[LifecycleStorage]

Custom :class:~dir_core.storage.LifecycleStorage backend. Pass None to skip persistence entirely.

None
Source code in src/dir_core/lifecycle.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def transition(
    dfid: str,
    from_status: FlowStatus,
    to_status: FlowStatus,
    retry_governor: Optional["IntentRetryGovernor"] = None,
    db_path: Optional[str] = None,
    *,
    storage: Optional["LifecycleStorage"] = None,
) -> None:
    """Record a flow status transition.

    On CLOSED/ABORTED, resets the retry governor for dfid.

    Args:
        dfid: DecisionFlow identifier.
        from_status: Current status.
        to_status: Target status.
        retry_governor: Optional governor to reset on terminal transitions.
        db_path: SQLite path for persistence (legacy kwarg). When ``storage``
            is also provided, ``storage`` takes precedence.
        storage: Custom :class:`~dir_core.storage.LifecycleStorage` backend.
            Pass ``None`` to skip persistence entirely.
    """
    if retry_governor is not None and to_status in (FlowStatus.CLOSED, FlowStatus.ABORTED):
        retry_governor.reset(dfid)

    _storage = storage
    if _storage is None and db_path is not None:
        from .storage.sqlite import SqliteLifecycleStorage
        _storage = SqliteLifecycleStorage(db_path)

    if _storage is not None:
        _storage.record_transition(dfid, from_status.value, to_status.value)

DFID

dir_core.dfid

DecisionFlow ID (DFID) – correlation identifier for the full decision lifecycle.

See DIR Architectural Pattern §4. All operations (observation, reasoning, validation, execution) are tagged with the same DFID for audit and traceability.

new_dfid()

Generate a new DecisionFlow ID (UUID v4).

Source code in src/dir_core/dfid.py
14
15
16
def new_dfid() -> str:
    """Generate a new DecisionFlow ID (UUID v4)."""
    return str(uuid.uuid4())

new_dfid_with_parent(parent_dfid)

Generate a child DFID for hierarchical flows.

Parent is not encoded in the ID; relationship is stored in context/ledger. Parent DFID is logged for traceability.

Source code in src/dir_core/dfid.py
19
20
21
22
23
24
25
26
27
def new_dfid_with_parent(parent_dfid: str) -> str:
    """Generate a child DFID for hierarchical flows.

    Parent is not encoded in the ID; relationship is stored in context/ledger.
    Parent DFID is logged for traceability.
    """
    child_id = str(uuid.uuid4())
    logger.debug("Child DFID %s created with parent %s", child_id[:8], parent_dfid[:8])
    return child_id

Models

dir_core.models

Shared Pydantic models: ResponsibilityContract, PolicyProposal, ExecutionIntent, etc.

Aligned with ROA Manifesto §3 and DIR Architectural Pattern §5.

Extended with: - ExplainResult (§4.1): Structured reasoning output from context interpretation - Policy (§4.2): Structured recommendation with justification and confidence - SelfCheckResult (§4.3): Introspection result for boundary validation - AgentState (§3.4): Long-lived state with decision trajectory and memory - EscalationRequest (§5.3): Structure for authority escalation

AgentState

Bases: BaseModel

ROA: Long-lived agent state with memory (Manifesto §3.4).

Provides continuity, self-awareness, and trajectory for reasoning.

Source code in src/dir_core/models.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class AgentState(BaseModel):
    """ROA: Long-lived agent state with memory (Manifesto §3.4).

    Provides continuity, self-awareness, and trajectory for reasoning.
    """

    agent_id: str
    created_at: datetime = Field(default_factory=_utcnow)
    last_active: datetime = Field(default_factory=_utcnow)
    decision_trajectory: List[DecisionRecord] = Field(
        default_factory=list, description="History of past decisions and rationales"
    )
    policy_version: int = Field(default=1, description="Version of agent's strategy/policy")
    current_context: Dict[str, Any] = Field(default_factory=dict, description="Current operational context")
    is_active: bool = Field(default=True, description="Whether agent is still active in lifecycle")

CompensationAction

Bases: StrEnum

Deterministic Compensation Menu (DIR Topologies §6.4).

Source code in src/dir_core/models.py
29
30
31
32
33
34
35
class CompensationAction(StrEnum):
    """Deterministic Compensation Menu (DIR Topologies §6.4)."""

    REVERT = "REVERT"
    CLOSE_ALL = "CLOSE_ALL"
    ALERT_HUMAN = "ALERT_HUMAN"
    NOOP = "NOOP"

ContextSnapshot

Bases: BaseModel

Frozen state of relevant context at a point in time (DIR Topologies §2.2).

ContextSnapshotID is the hash binding, ensuring every PolicyProposal is linked to the exact version of the world the agent 'saw'.

Source code in src/dir_core/models.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class ContextSnapshot(BaseModel):
    """Frozen state of relevant context at a point in time (DIR Topologies §2.2).

    ContextSnapshotID is the hash binding, ensuring every PolicyProposal
    is linked to the exact version of the world the agent 'saw'.
    """

    snapshot_id: str = Field(description="Unique hash/ID for this snapshot")
    dfid: str = Field(description="Associated DecisionFlow ID")
    timestamp: datetime = Field(default_factory=_utcnow)
    data: Dict[str, Any] = Field(default_factory=dict, description="Frozen context data")
    source: str = Field(default="context_store", description="Origin of context")

    @classmethod
    def create(cls, dfid: str, data: Dict[str, Any], source: str = "context_store") -> "ContextSnapshot":
        """Factory method that generates snapshot_id from content hash."""
        import hashlib
        import json
        content = json.dumps(data, sort_keys=True, default=str)
        snapshot_id = hashlib.sha256(content.encode()).hexdigest()[:16]
        return cls(snapshot_id=snapshot_id, dfid=dfid, data=data, source=source)

create(dfid, data, source='context_store') classmethod

Factory method that generates snapshot_id from content hash.

Source code in src/dir_core/models.py
272
273
274
275
276
277
278
279
@classmethod
def create(cls, dfid: str, data: Dict[str, Any], source: str = "context_store") -> "ContextSnapshot":
    """Factory method that generates snapshot_id from content hash."""
    import hashlib
    import json
    content = json.dumps(data, sort_keys=True, default=str)
    snapshot_id = hashlib.sha256(content.encode()).hexdigest()[:16]
    return cls(snapshot_id=snapshot_id, dfid=dfid, data=data, source=source)

DecisionAtom

Bases: BaseModel

Decision Atom for Topology B (SDS) — snapshot-bound decision.

DIR Topologies §3.1.2: The DecisionAtom MUST include snapshot_id hash-binding so the JIT Validator can verify state has not drifted since the snapshot.

Source code in src/dir_core/models.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class DecisionAtom(BaseModel):
    """Decision Atom for Topology B (SDS) — snapshot-bound decision.

    DIR Topologies §3.1.2: The DecisionAtom MUST include snapshot_id hash-binding
    so the JIT Validator can verify state has not drifted since the snapshot.
    """

    dfid: str = Field(description="DecisionFlow ID for correlation")
    snapshot_id: str = Field(
        description="Context snapshot hash for JIT drift check"
    )
    params: Dict[str, Any] = Field(
        default_factory=dict,
        description="Decision payload (action, amount, user_id, etc.)",
    )

DecisionFlow

Bases: BaseModel

DIR: Container for the entire decision lifecycle (Manifesto §5.4).

A DecisionFlow aggregates: - Initial context (ContextSnapshot) - All policy proposals - Validation results - Escalations - Execution events - Final outcomes

DFID allows auditability, debugging, compliance reporting, causal reasoning, replaying decisions, and clean separation of concurrent decision processes.

Source code in src/dir_core/models.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
class DecisionFlow(BaseModel):
    """DIR: Container for the entire decision lifecycle (Manifesto §5.4).

    A DecisionFlow aggregates:
    - Initial context (ContextSnapshot)
    - All policy proposals
    - Validation results
    - Escalations
    - Execution events
    - Final outcomes

    DFID allows auditability, debugging, compliance reporting, causal reasoning,
    replaying decisions, and clean separation of concurrent decision processes.
    """

    dfid: str = Field(description="The immutable DecisionFlow ID")
    parent_dfid: Optional[str] = Field(default=None, description="Parent flow for hierarchical decisions")
    created_at: datetime = Field(default_factory=_utcnow)
    completed_at: Optional[datetime] = None

    # Lifecycle artifacts
    context_snapshot: Optional[ContextSnapshot] = None
    explain_results: List[ExplainResult] = Field(default_factory=list)
    policies: List[Policy] = Field(default_factory=list)
    proposals: List[PolicyProposal] = Field(default_factory=list)
    escalations: List[EscalationRequest] = Field(default_factory=list)
    execution_intents: List[ExecutionIntent] = Field(default_factory=list)

    # Timeline for reconstruction
    timeline: List[FlowEvent] = Field(default_factory=list)

    # Outcome
    status: DecisionFlowStatus = DecisionFlowStatus.IN_PROGRESS
    outcome_summary: Optional[str] = None

    # Participating agents
    participating_agents: List[str] = Field(default_factory=list)

    # Child flows (for hierarchical decisions)
    child_dfids: List[str] = Field(default_factory=list)

    def add_event(
        self,
        event_type: Union[str, FlowTimelineEventType],
        summary: str,
        agent_id: Optional[str] = None,
        details: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Add an event to the timeline."""
        event = FlowEvent(
            event_type=FlowTimelineEventType(event_type)
            if isinstance(event_type, str)
            else event_type,
            agent_id=agent_id,
            summary=summary,
            details=details or {}
        )
        self.timeline.append(event)
        if agent_id and agent_id not in self.participating_agents:
            self.participating_agents.append(agent_id)

    def set_context(self, snapshot: ContextSnapshot) -> None:
        """Bind context snapshot to this flow."""
        self.context_snapshot = snapshot
        self.add_event(
            FlowTimelineEventType.CONTEXT_SNAPSHOT,
            f"Context bound: {snapshot.snapshot_id}",
            details={"snapshot_id": snapshot.snapshot_id},
        )

    def record_explain(self, result: ExplainResult) -> None:
        """Record an Explain stage result."""
        self.explain_results.append(result)
        self.add_event(
            FlowTimelineEventType.EXPLAIN,
            f"Explain: {len(result.identified_signals)} signals, {len(result.risks)} risks",
            agent_id=result.agent_id,
            details={
                "narrative": result.narrative[:100] + "..."
                if len(result.narrative) > 100
                else result.narrative
            },
        )

    def record_policy(self, policy: Policy) -> None:
        """Record a Policy formation."""
        self.policies.append(policy)
        self.add_event(
            FlowTimelineEventType.POLICY,
            f"Policy: {policy.proposed_action} (conf={policy.confidence:.2f})",
            agent_id=policy.agent_id,
            details={"action": policy.proposed_action, "confidence": policy.confidence},
        )

    def record_proposal(self, proposal: PolicyProposal) -> None:
        """Record a PolicyProposal emission."""
        self.proposals.append(proposal)
        self.add_event(
            FlowTimelineEventType.PROPOSAL,
            f"Proposal: {proposal.policy_kind}",
            agent_id=proposal.agent_id,
            details={"policy_kind": proposal.policy_kind, "confidence": proposal.confidence},
        )

    def record_escalation(self, escalation: EscalationRequest) -> None:
        """Record an escalation event."""
        self.escalations.append(escalation)
        self.add_event(
            FlowTimelineEventType.ESCALATION,
            f"Escalated: {escalation.trigger} ({escalation.severity})",
            agent_id=escalation.from_agent_id,
            details={"trigger": escalation.trigger, "severity": str(escalation.severity)},
        )
        self.status = DecisionFlowStatus.ESCALATED

    def record_execution(self, intent: ExecutionIntent) -> None:
        """Record an execution intent."""
        self.execution_intents.append(intent)
        self.add_event(
            FlowTimelineEventType.EXECUTION,
            f"Executed: {intent.policy_kind}",
            details={"idempotency_key": intent.idempotency_key},
        )

    def complete(self, summary: str) -> None:
        """Mark flow as completed."""
        self.completed_at = _utcnow()
        self.status = DecisionFlowStatus.COMPLETED
        self.outcome_summary = summary
        self.add_event(FlowTimelineEventType.FLOW_COMPLETED, summary)

    def abort(self, reason: str) -> None:
        """Mark flow as aborted."""
        self.completed_at = _utcnow()
        self.status = DecisionFlowStatus.ABORTED
        self.outcome_summary = reason
        self.add_event(FlowTimelineEventType.FLOW_ABORTED, reason)

    def create_child_flow(self, child_dfid: str) -> None:
        """Record creation of a child flow."""
        self.child_dfids.append(child_dfid)
        self.add_event(
            FlowTimelineEventType.CHILD_FLOW_CREATED,
            f"Child flow: {child_dfid}",
            details={"child_dfid": child_dfid},
        )

    def get_timeline_report(self) -> str:
        """Generate human-readable timeline report."""
        lines = [
            f"=== DecisionFlow Report ===",
            f"DFID: {self.dfid}",
            f"Status: {self.status}",
            f"Created: {self.created_at.isoformat()}",
        ]
        if self.parent_dfid:
            lines.append(f"Parent: {self.parent_dfid}")
        if self.context_snapshot:
            lines.append(f"Context: {self.context_snapshot.snapshot_id}")
        lines.append(f"Agents: {', '.join(self.participating_agents) or 'none'}")
        lines.append(f"\n--- Timeline ({len(self.timeline)} events) ---")

        for i, event in enumerate(self.timeline, 1):
            time_str = event.timestamp.strftime("%H:%M:%S.%f")[:-3]
            agent_str = f" [{event.agent_id}]" if event.agent_id else ""
            lines.append(
                f"  {i:2}. [{time_str}] {event.event_type.value}{agent_str}: {event.summary}"
            )

        if self.outcome_summary:
            lines.append(f"\n--- Outcome ---")
            lines.append(f"  {self.outcome_summary}")

        return "\n".join(lines)

abort(reason)

Mark flow as aborted.

Source code in src/dir_core/models.py
423
424
425
426
427
428
def abort(self, reason: str) -> None:
    """Mark flow as aborted."""
    self.completed_at = _utcnow()
    self.status = DecisionFlowStatus.ABORTED
    self.outcome_summary = reason
    self.add_event(FlowTimelineEventType.FLOW_ABORTED, reason)

add_event(event_type, summary, agent_id=None, details=None)

Add an event to the timeline.

Source code in src/dir_core/models.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def add_event(
    self,
    event_type: Union[str, FlowTimelineEventType],
    summary: str,
    agent_id: Optional[str] = None,
    details: Optional[Dict[str, Any]] = None,
) -> None:
    """Add an event to the timeline."""
    event = FlowEvent(
        event_type=FlowTimelineEventType(event_type)
        if isinstance(event_type, str)
        else event_type,
        agent_id=agent_id,
        summary=summary,
        details=details or {}
    )
    self.timeline.append(event)
    if agent_id and agent_id not in self.participating_agents:
        self.participating_agents.append(agent_id)

complete(summary)

Mark flow as completed.

Source code in src/dir_core/models.py
416
417
418
419
420
421
def complete(self, summary: str) -> None:
    """Mark flow as completed."""
    self.completed_at = _utcnow()
    self.status = DecisionFlowStatus.COMPLETED
    self.outcome_summary = summary
    self.add_event(FlowTimelineEventType.FLOW_COMPLETED, summary)

create_child_flow(child_dfid)

Record creation of a child flow.

Source code in src/dir_core/models.py
430
431
432
433
434
435
436
437
def create_child_flow(self, child_dfid: str) -> None:
    """Record creation of a child flow."""
    self.child_dfids.append(child_dfid)
    self.add_event(
        FlowTimelineEventType.CHILD_FLOW_CREATED,
        f"Child flow: {child_dfid}",
        details={"child_dfid": child_dfid},
    )

get_timeline_report()

Generate human-readable timeline report.

Source code in src/dir_core/models.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def get_timeline_report(self) -> str:
    """Generate human-readable timeline report."""
    lines = [
        f"=== DecisionFlow Report ===",
        f"DFID: {self.dfid}",
        f"Status: {self.status}",
        f"Created: {self.created_at.isoformat()}",
    ]
    if self.parent_dfid:
        lines.append(f"Parent: {self.parent_dfid}")
    if self.context_snapshot:
        lines.append(f"Context: {self.context_snapshot.snapshot_id}")
    lines.append(f"Agents: {', '.join(self.participating_agents) or 'none'}")
    lines.append(f"\n--- Timeline ({len(self.timeline)} events) ---")

    for i, event in enumerate(self.timeline, 1):
        time_str = event.timestamp.strftime("%H:%M:%S.%f")[:-3]
        agent_str = f" [{event.agent_id}]" if event.agent_id else ""
        lines.append(
            f"  {i:2}. [{time_str}] {event.event_type.value}{agent_str}: {event.summary}"
        )

    if self.outcome_summary:
        lines.append(f"\n--- Outcome ---")
        lines.append(f"  {self.outcome_summary}")

    return "\n".join(lines)

record_escalation(escalation)

Record an escalation event.

Source code in src/dir_core/models.py
396
397
398
399
400
401
402
403
404
405
def record_escalation(self, escalation: EscalationRequest) -> None:
    """Record an escalation event."""
    self.escalations.append(escalation)
    self.add_event(
        FlowTimelineEventType.ESCALATION,
        f"Escalated: {escalation.trigger} ({escalation.severity})",
        agent_id=escalation.from_agent_id,
        details={"trigger": escalation.trigger, "severity": str(escalation.severity)},
    )
    self.status = DecisionFlowStatus.ESCALATED

record_execution(intent)

Record an execution intent.

Source code in src/dir_core/models.py
407
408
409
410
411
412
413
414
def record_execution(self, intent: ExecutionIntent) -> None:
    """Record an execution intent."""
    self.execution_intents.append(intent)
    self.add_event(
        FlowTimelineEventType.EXECUTION,
        f"Executed: {intent.policy_kind}",
        details={"idempotency_key": intent.idempotency_key},
    )

record_explain(result)

Record an Explain stage result.

Source code in src/dir_core/models.py
362
363
364
365
366
367
368
369
370
371
372
373
374
def record_explain(self, result: ExplainResult) -> None:
    """Record an Explain stage result."""
    self.explain_results.append(result)
    self.add_event(
        FlowTimelineEventType.EXPLAIN,
        f"Explain: {len(result.identified_signals)} signals, {len(result.risks)} risks",
        agent_id=result.agent_id,
        details={
            "narrative": result.narrative[:100] + "..."
            if len(result.narrative) > 100
            else result.narrative
        },
    )

record_policy(policy)

Record a Policy formation.

Source code in src/dir_core/models.py
376
377
378
379
380
381
382
383
384
def record_policy(self, policy: Policy) -> None:
    """Record a Policy formation."""
    self.policies.append(policy)
    self.add_event(
        FlowTimelineEventType.POLICY,
        f"Policy: {policy.proposed_action} (conf={policy.confidence:.2f})",
        agent_id=policy.agent_id,
        details={"action": policy.proposed_action, "confidence": policy.confidence},
    )

record_proposal(proposal)

Record a PolicyProposal emission.

Source code in src/dir_core/models.py
386
387
388
389
390
391
392
393
394
def record_proposal(self, proposal: PolicyProposal) -> None:
    """Record a PolicyProposal emission."""
    self.proposals.append(proposal)
    self.add_event(
        FlowTimelineEventType.PROPOSAL,
        f"Proposal: {proposal.policy_kind}",
        agent_id=proposal.agent_id,
        details={"policy_kind": proposal.policy_kind, "confidence": proposal.confidence},
    )

set_context(snapshot)

Bind context snapshot to this flow.

Source code in src/dir_core/models.py
353
354
355
356
357
358
359
360
def set_context(self, snapshot: ContextSnapshot) -> None:
    """Bind context snapshot to this flow."""
    self.context_snapshot = snapshot
    self.add_event(
        FlowTimelineEventType.CONTEXT_SNAPSHOT,
        f"Context bound: {snapshot.snapshot_id}",
        details={"snapshot_id": snapshot.snapshot_id},
    )

DecisionRecord

Bases: BaseModel

Single decision in agent's trajectory history.

Source code in src/dir_core/models.py
126
127
128
129
130
131
132
133
134
135
class DecisionRecord(BaseModel):
    """Single decision in agent's trajectory history."""

    dfid: str
    timestamp: datetime = Field(default_factory=_utcnow)
    explain_summary: str = ""
    policy_action: str = ""
    policy_confidence: float = 0.0
    outcome: DecisionRecordOutcome = DecisionRecordOutcome.PENDING
    outcome_reason: Optional[str] = None

EscalationRequest

Bases: BaseModel

ROA: Request for escalation to higher-level agent (Manifesto §5.3).

Escalation is not failure - it's essential for bounded responsibility.

Source code in src/dir_core/models.py
160
161
162
163
164
165
166
167
168
169
170
171
172
class EscalationRequest(BaseModel):
    """ROA: Request for escalation to higher-level agent (Manifesto §5.3).

    Escalation is not failure - it's essential for bounded responsibility.
    """

    dfid: str
    from_agent_id: str
    to_agent_id: Optional[str] = Field(default=None, description="Target parent agent, if known")
    trigger: str = Field(description="What triggered escalation")
    context: Dict[str, Any] = Field(default_factory=dict)
    original_policy: Optional[Policy] = Field(default=None, description="Policy that couldn't be executed")
    severity: EscalationSeverity = EscalationSeverity.MEDIUM

ExecutionIntent

Bases: BaseModel

Validated proposal; only this type is authorized to trigger side effects (DIR §7).

Source code in src/dir_core/models.py
201
202
203
204
205
206
207
class ExecutionIntent(BaseModel):
    """Validated proposal; only this type is authorized to trigger side effects (DIR §7)."""

    dfid: str
    idempotency_key: str
    policy_kind: str
    params: Dict[str, Any] = Field(default_factory=dict)

ExplainResult

Bases: BaseModel

ROA: Output of Explain stage - context interpretation (Manifesto §4.1).

Answers: 'What is happening, and why does it matter for my mission?'

Source code in src/dir_core/models.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class ExplainResult(BaseModel):
    """ROA: Output of Explain stage - context interpretation (Manifesto §4.1).

    Answers: 'What is happening, and why does it matter for my mission?'
    """

    dfid: str
    agent_id: str
    timestamp: datetime = Field(default_factory=_utcnow)
    narrative: str = Field(description="Natural language interpretation of the situation")
    identified_signals: List[str] = Field(default_factory=list, description="Relevant patterns detected")
    risks: List[str] = Field(default_factory=list, description="Identified risks")
    opportunities: List[str] = Field(default_factory=list, description="Identified opportunities")
    context_summary: Dict[str, Any] = Field(default_factory=dict, description="Key context facts used")

FlowEvent

Bases: BaseModel

Single event in a DecisionFlow timeline.

Source code in src/dir_core/models.py
282
283
284
285
286
287
288
289
class FlowEvent(BaseModel):
    """Single event in a DecisionFlow timeline."""

    timestamp: datetime = Field(default_factory=_utcnow)
    event_type: FlowTimelineEventType
    agent_id: Optional[str] = None
    summary: str = ""
    details: Dict[str, Any] = Field(default_factory=dict)

Policy

Bases: BaseModel

ROA: Structured recommendation from Policy stage (Manifesto §4.2).

A Policy is NOT an action - it's an interpretable recommendation with justification.

Source code in src/dir_core/models.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class Policy(BaseModel):
    """ROA: Structured recommendation from Policy stage (Manifesto §4.2).

    A Policy is NOT an action - it's an interpretable recommendation with justification.
    """

    dfid: str
    agent_id: str
    timestamp: datetime = Field(default_factory=_utcnow)
    proposed_action: str = Field(description="The recommended course of action")
    justification: str = Field(description="Reasoning rooted in Explain stage")
    confidence: float = Field(ge=0.0, le=1.0, description="Confidence/uncertainty indicator")
    assumptions: List[str] = Field(default_factory=list, description="Required assumptions for this policy")
    expected_outcomes: List[str] = Field(default_factory=list, description="Expected results or risks")
    explain_ref: Optional[str] = Field(default=None, description="Reference to ExplainResult")

PolicyProposal

Bases: BaseModel

Structured intent from agent; validated by DIM before execution (DIR §5).

Source code in src/dir_core/models.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class PolicyProposal(BaseModel):
    """Structured intent from agent; validated by DIM before execution (DIR §5)."""

    dfid: str
    agent_id: str
    policy_kind: str
    params: Dict[str, Any] = Field(default_factory=dict)
    context_ref: Optional[str] = None
    execution_constraints: Dict[str, Any] = Field(default_factory=dict)
    # TTL / Decision Validity Window (DIR §6.4)
    valid_until: Optional[datetime] = Field(
        default=None,
        description="If set, proposal is rejected when current_time > valid_until",
    )
    created_at: datetime = Field(default_factory=_utcnow, description="Proposal creation time for validity_window_sec")
    # Extended fields linking to ROA lifecycle
    confidence: float = Field(default=1.0, ge=0.0, le=1.0)
    justification: Optional[str] = Field(default=None, description="From Policy stage")
    explain_ref: Optional[str] = Field(default=None, description="Reference to ExplainResult")

ProofCarryingIntent

Bases: BaseModel

Proof-Carrying Intent (PCI) for Topology C (DL+PCI).

The agent submits this to the DIM. The evidence_hash is a CLAIM; the DIM independently recalculates it using authoritative Context and Contract. Mismatch = reject (Zero Trust).

intent_payload: Domain-specific proposal as dict for flexibility. Sample 33 uses policy_proposal.model_dump(); DIM uses PolicyProposal.model_validate().

Source code in src/dir_core/models.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class ProofCarryingIntent(BaseModel):
    """Proof-Carrying Intent (PCI) for Topology C (DL+PCI).

    The agent submits this to the DIM. The evidence_hash is a CLAIM; the DIM
    independently recalculates it using authoritative Context and Contract.
    Mismatch = reject (Zero Trust).

    intent_payload: Domain-specific proposal as dict for flexibility.
    Sample 33 uses policy_proposal.model_dump(); DIM uses PolicyProposal.model_validate().
    """

    dfid: str = Field(description="DecisionFlow ID for traceability")
    intent_payload: Dict[str, Any] = Field(
        description="Structured decision (e.g. coverage, premium); domain-specific"
    )
    context_ref: str = Field(
        description="ContextSnapshotID / hash for Evidence Hash binding"
    )
    evidence_hash: str = Field(
        description="SHA256(DFID || Context_Hash || Contract_Hash || Proposal_Params)"
    )
    signature: str = Field(
        default="",
        description="Cryptographic signature (HMAC or placeholder)",
    )

ResponsibilityContract

Bases: BaseModel

ROA: scope, authority, mission, escalation (Manifesto §3.1).

Source code in src/dir_core/models.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class ResponsibilityContract(BaseModel):
    """ROA: scope, authority, mission, escalation (Manifesto §3.1)."""

    agent_id: str
    role: ContractRole = ContractRole.EXECUTOR
    mission: str = ""
    authorized_instruments: List[str] = Field(default_factory=list)
    allowed_policy_types: List[str] = Field(default_factory=list)
    escalate_on_uncertainty: float = 0.7
    # Extended fields for ROA compliance
    max_drawdown_limit: float = Field(default=0.05, description="Maximum drawdown limit (5%)")
    escalation_triggers: List[str] = Field(
        default_factory=lambda: ["uncertainty_threshold", "authority_breach", "risk_limit_exceeded"]
    )
    parent_agent_id: Optional[str] = Field(default=None, description="Parent agent for hierarchy")
    # Wake-up Predicates (DIR Topologies §2.3)
    wake_up_threshold_pct: float = Field(
        default=0.5,
        description="Minimum price change (%) to wake up agent - prevents Token Burn on minor signals"
    )

SelfCheckResult

Bases: BaseModel

ROA: Result of agent self-check before emitting proposal (Manifesto §4.3).

Self-Check is a cost-optimization heuristic - it has no security value. Catches obvious issues before reaching the Runtime.

Source code in src/dir_core/models.py
108
109
110
111
112
113
114
115
116
117
118
class SelfCheckResult(BaseModel):
    """ROA: Result of agent self-check before emitting proposal (Manifesto §4.3).

    Self-Check is a cost-optimization heuristic - it has no security value.
    Catches obvious issues before reaching the Runtime.
    """

    passed: bool = Field(description="Whether policy passes self-check")
    reason: Optional[str] = Field(default=None, description="Reason if failed")
    should_escalate: bool = Field(default=False, description="Whether to escalate to parent agent")
    escalation_trigger: Optional[str] = Field(default=None, description="Which trigger caused escalation")

Data Types

dir_core.data_types

Shared data types: validation verdicts, DIM reasons, models, registry, event bus.

AgentRegistryStatus

Bases: StrEnum

Agent row status in registry storage.

Source code in src/dir_core/data_types.py
87
88
89
90
class AgentRegistryStatus(StrEnum):
    """Agent row status in registry storage."""

    ACTIVE = "ACTIVE"

ContractRole

Bases: StrEnum

ROA responsibility role (Manifesto §3.1).

Source code in src/dir_core/data_types.py
27
28
29
30
31
32
class ContractRole(StrEnum):
    """ROA responsibility role (Manifesto §3.1)."""

    STRATEGIST = "STRATEGIST"
    EXECUTOR = "EXECUTOR"
    MONITOR = "MONITOR"

DecisionFlowStatus

Bases: StrEnum

High-level status on DecisionFlow aggregate (distinct from lifecycle.FlowStatus).

Source code in src/dir_core/data_types.py
70
71
72
73
74
75
76
class DecisionFlowStatus(StrEnum):
    """High-level status on DecisionFlow aggregate (distinct from lifecycle.FlowStatus)."""

    IN_PROGRESS = "IN_PROGRESS"
    COMPLETED = "COMPLETED"
    ESCALATED = "ESCALATED"
    ABORTED = "ABORTED"

DecisionRecordOutcome

Bases: StrEnum

Outcome of a single decision in agent trajectory.

Source code in src/dir_core/data_types.py
35
36
37
38
39
40
41
class DecisionRecordOutcome(StrEnum):
    """Outcome of a single decision in agent trajectory."""

    ACCEPTED = "ACCEPTED"
    REJECTED = "REJECTED"
    ESCALATED = "ESCALATED"
    PENDING = "PENDING"

DimReasonCode

Bases: StrEnum

Stable machine-readable DIM reasons (subset; dynamic detail remains plain str).

Source code in src/dir_core/data_types.py
15
16
17
18
19
20
class DimReasonCode(StrEnum):
    """Stable machine-readable DIM reasons (subset; dynamic detail remains plain str)."""

    REASONING_EXHAUSTION = "REASONING_EXHAUSTION"
    TTL_EXPIRED = "TTL_EXPIRED"
    VALIDATION_PASSED = "VALIDATION_PASSED"

EscalationSeverity

Bases: StrEnum

Severity on EscalationRequest (Manifesto §5.3).

Source code in src/dir_core/data_types.py
44
45
46
47
48
49
50
class EscalationSeverity(StrEnum):
    """Severity on EscalationRequest (Manifesto §5.3)."""

    LOW = "LOW"
    MEDIUM = "MEDIUM"
    HIGH = "HIGH"
    CRITICAL = "CRITICAL"

EventBusBackend

Bases: StrEnum

Pluggable event bus implementation (factory).

Source code in src/dir_core/data_types.py
 99
100
101
102
103
104
class EventBusBackend(StrEnum):
    """Pluggable event bus implementation (factory)."""

    MEMORY = "memory"
    KAFKA = "kafka"
    PUBSUB = "pubsub"

EventType

Bases: StrEnum

Canonical event types for EOAM (DIR Topologies §2.4).

Source code in src/dir_core/data_types.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class EventType(StrEnum):
    """Canonical event types for EOAM (DIR Topologies §2.4)."""

    OBSERVATION = "OBSERVATION"
    MARKET_SIGNAL = "MARKET_SIGNAL"
    NEWS = "NEWS"
    RISK_ALERT = "RISK_ALERT"
    POLICY_PROPOSAL = "POLICY_PROPOSAL"
    ESCALATION = "ESCALATION"
    VALIDATION_RESULT = "VALIDATION_RESULT"
    VALIDATION_REJECTED = "VALIDATION_REJECTED"
    EXECUTION_INTENT = "EXECUTION_INTENT"
    EXECUTION_RESULT = "EXECUTION_RESULT"
    AGENT_ACTIVATED = "AGENT_ACTIVATED"
    FLOW_COMPLETED = "FLOW_COMPLETED"

FlowTimelineEventType

Bases: StrEnum

Event type on DecisionFlow.timeline (DIR §5.4).

Source code in src/dir_core/data_types.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class FlowTimelineEventType(StrEnum):
    """Event type on DecisionFlow.timeline (DIR §5.4)."""

    FLOW_STARTED = "FLOW_STARTED"
    CONTEXT_SNAPSHOT = "CONTEXT_SNAPSHOT"
    EXPLAIN = "EXPLAIN"
    POLICY = "POLICY"
    SELF_CHECK = "SELF_CHECK"
    PROPOSAL = "PROPOSAL"
    VALIDATION = "VALIDATION"
    EXECUTION = "EXECUTION"
    ESCALATION = "ESCALATION"
    CHILD_FLOW_CREATED = "CHILD_FLOW_CREATED"
    FLOW_COMPLETED = "FLOW_COMPLETED"
    FLOW_ABORTED = "FLOW_ABORTED"

HandshakeRejectionReason

Bases: StrEnum

Handshake failure codes (DIR §2.3).

Source code in src/dir_core/data_types.py
93
94
95
96
class HandshakeRejectionReason(StrEnum):
    """Handshake failure codes (DIR §2.3)."""

    VERSION_MISMATCH = "VERSION_MISMATCH"

HumanDecision

Bases: StrEnum

Human resolution of an escalation (DIR §9).

Source code in src/dir_core/data_types.py
79
80
81
82
83
84
class HumanDecision(StrEnum):
    """Human resolution of an escalation (DIR §9)."""

    OVERRIDE = "OVERRIDE"
    MODIFY = "MODIFY"
    ABORT = "ABORT"

ValidationVerdict

Bases: StrEnum

Outcome of deterministic validation (DIM, JIT, domain gates).

Source code in src/dir_core/data_types.py
 7
 8
 9
10
11
12
class ValidationVerdict(StrEnum):
    """Outcome of deterministic validation (DIM, JIT, domain gates)."""

    ACCEPT = "ACCEPT"
    REJECT = "REJECT"
    ESCALATE = "ESCALATE"