Skip to content

Kernel Services

Agent Registry

dir_core.agent_registry

Agent Registry: contract, handshake, lookup by agent_id.

DIR §2.3. Maintains a registry of active agents, their capability contracts, and metadata. Handshake with SemVer alignment; schema serving for Context compilation.

AgentRegistry

Registry of active agents with SemVer handshake (DIR §2.3).

Storage backend is pluggable. Pass storage= for a custom backend, or db_path= to use the built-in SQLite backend (default behaviour).

Parameters:

Name Type Description Default
db_path Optional[str]

Path to SQLite database. Used when storage is not provided.

None
supported_versions str

SemVer constraint for handshake (e.g. "1.x").

'1.x'
storage Optional[AgentRegistryStorage]

Custom :class:~dir_core.storage.AgentRegistryStorage backend. When provided, db_path is ignored.

None

Raises:

Type Description
ValueError

When neither db_path nor storage is supplied.

Source code in src/dir_core/agent_registry.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class AgentRegistry:
    """Registry of active agents with SemVer handshake (DIR §2.3).

    Storage backend is pluggable. Pass ``storage=`` for a custom backend, or
    ``db_path=`` to use the built-in SQLite backend (default behaviour).

    Args:
        db_path: Path to SQLite database. Used when ``storage`` is not provided.
        supported_versions: SemVer constraint for handshake (e.g. ``"1.x"``).
        storage: Custom :class:`~dir_core.storage.AgentRegistryStorage` backend.
            When provided, ``db_path`` is ignored.

    Raises:
        ValueError: When neither ``db_path`` nor ``storage`` is supplied.
    """

    def __init__(
        self,
        db_path: Optional[str] = None,
        supported_versions: str = "1.x",
        *,
        storage: Optional[AgentRegistryStorage] = None,
    ):
        self.supported_versions = supported_versions
        if storage is not None:
            self._storage: AgentRegistryStorage = storage
        elif db_path is not None:
            self.db_path = db_path  # kept for backward compatibility
            self._storage = SqliteAgentRegistryStorage(db_path)
        else:
            raise ValueError(
                "Provide either 'db_path' (SQLite) or 'storage' (custom backend)."
            )

    def handshake(
        self,
        agent_id: str,
        contract: Dict[str, Any],
        agent_version: str,
        priority: int = 0,
    ) -> HandshakeResult:
        """
        Handshake with version check. REJECT on VERSION_MISMATCH.
        Returns ACCEPTED with session_token or REJECTED with reason.
        """
        if not _version_compatible(agent_version, self.supported_versions):
            return HandshakeResult(
                accepted=False,
                reason=HandshakeRejectionReason.VERSION_MISMATCH.value,
            )
        token = str(uuid.uuid4())
        self._storage.upsert_agent(
            agent_id=agent_id,
            contract_json=json.dumps(contract),
            priority=priority,
            status=AgentRegistryStatus.ACTIVE,
            agent_version=agent_version,
            session_token=token,
        )
        logger.info("Handshake: agent_id=%s ver=%s accepted", agent_id, agent_version)
        return HandshakeResult(accepted=True, session_token=token)

    def get_schema(
        self, agent_id: str, schema_kind: Optional[str] = None
    ) -> Optional[dict]:
        """Return schema from contract: schema_kind=None -> contract['schema'];
        else -> contract['schemas'][kind] or contract['schema']."""
        contract = self.get_agent_contract(agent_id)
        if not contract:
            return None
        if schema_kind is not None and "schemas" in contract and schema_kind in contract["schemas"]:
            return contract["schemas"][schema_kind]
        return contract.get("schema")

    def register_agent(
        self,
        agent_id: str,
        contract: Dict[str, Any],
        priority: int = 0,
    ) -> None:
        """Register or update an agent with capability contract.

        Deprecated: Use handshake() for version checking and session tokens (DIR §2.3).
        register_agent does not enforce SemVer compatibility.
        """
        warnings.warn(
            "register_agent is deprecated; use handshake() for version checking (DIR §2.3)",
            DeprecationWarning,
            stacklevel=2,
        )
        self._storage.upsert_agent(
            agent_id=agent_id,
            contract_json=json.dumps(contract),
            priority=priority,
            status=AgentRegistryStatus.ACTIVE,
            agent_version=None,
            session_token=None,
        )
        logger.info("Registered agent: %s (priority=%d)", agent_id, priority)

    def get_agent_contract(self, agent_id: str) -> Optional[Dict[str, Any]]:
        """Retrieve agent capability contract."""
        rec = self._storage.get_agent(agent_id)
        return rec["contract"] if rec else None

    def get_agent_manifest(self, agent_id: str) -> Optional[Dict[str, Any]]:
        """Retrieve agent capability contract. Deprecated: use get_agent_contract."""
        return self.get_agent_contract(agent_id)

    def get_agent_priority(self, agent_id: str) -> int:
        """Retrieve agent priority (default 0)."""
        rec = self._storage.get_agent(agent_id)
        return rec["priority"] if rec else 0

    def list_agents(self) -> List[str]:
        """List all active agent IDs."""
        return self._storage.list_active_agents()

    def set_agent_status(
        self,
        agent_id: str,
        status: str,
        suspension_reason: Optional[str] = None,
    ) -> bool:
        """
        Transition agent lifecycle status (e.g. ACTIVE -> SUSPENDED).

        Args:
            agent_id: Registered agent identifier.
            status: New status value (e.g. 'SUSPENDED', 'ACTIVE').
            suspension_reason: Optional machine-oriented reason (audit / ops).

        Returns:
            True if a row was updated.
        """
        updated = self._storage.update_status(agent_id, status, suspension_reason)
        if updated:
            logger.info(
                "Agent status: agent_id=%s status=%s reason=%s",
                agent_id,
                status,
                suspension_reason,
            )
        return updated

    def get_agent_status(self, agent_id: str) -> Optional[tuple]:
        """Return (status, suspension_reason) if the agent exists, else None."""
        return self._storage.get_status(agent_id)

get_agent_contract(agent_id)

Retrieve agent capability contract.

Source code in src/dir_core/agent_registry.py
162
163
164
165
def get_agent_contract(self, agent_id: str) -> Optional[Dict[str, Any]]:
    """Retrieve agent capability contract."""
    rec = self._storage.get_agent(agent_id)
    return rec["contract"] if rec else None

get_agent_manifest(agent_id)

Retrieve agent capability contract. Deprecated: use get_agent_contract.

Source code in src/dir_core/agent_registry.py
167
168
169
def get_agent_manifest(self, agent_id: str) -> Optional[Dict[str, Any]]:
    """Retrieve agent capability contract. Deprecated: use get_agent_contract."""
    return self.get_agent_contract(agent_id)

get_agent_priority(agent_id)

Retrieve agent priority (default 0).

Source code in src/dir_core/agent_registry.py
171
172
173
174
def get_agent_priority(self, agent_id: str) -> int:
    """Retrieve agent priority (default 0)."""
    rec = self._storage.get_agent(agent_id)
    return rec["priority"] if rec else 0

get_agent_status(agent_id)

Return (status, suspension_reason) if the agent exists, else None.

Source code in src/dir_core/agent_registry.py
207
208
209
def get_agent_status(self, agent_id: str) -> Optional[tuple]:
    """Return (status, suspension_reason) if the agent exists, else None."""
    return self._storage.get_status(agent_id)

get_schema(agent_id, schema_kind=None)

Return schema from contract: schema_kind=None -> contract['schema']; else -> contract['schemas'][kind] or contract['schema'].

Source code in src/dir_core/agent_registry.py
124
125
126
127
128
129
130
131
132
133
134
def get_schema(
    self, agent_id: str, schema_kind: Optional[str] = None
) -> Optional[dict]:
    """Return schema from contract: schema_kind=None -> contract['schema'];
    else -> contract['schemas'][kind] or contract['schema']."""
    contract = self.get_agent_contract(agent_id)
    if not contract:
        return None
    if schema_kind is not None and "schemas" in contract and schema_kind in contract["schemas"]:
        return contract["schemas"][schema_kind]
    return contract.get("schema")

handshake(agent_id, contract, agent_version, priority=0)

Handshake with version check. REJECT on VERSION_MISMATCH. Returns ACCEPTED with session_token or REJECTED with reason.

Source code in src/dir_core/agent_registry.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def handshake(
    self,
    agent_id: str,
    contract: Dict[str, Any],
    agent_version: str,
    priority: int = 0,
) -> HandshakeResult:
    """
    Handshake with version check. REJECT on VERSION_MISMATCH.
    Returns ACCEPTED with session_token or REJECTED with reason.
    """
    if not _version_compatible(agent_version, self.supported_versions):
        return HandshakeResult(
            accepted=False,
            reason=HandshakeRejectionReason.VERSION_MISMATCH.value,
        )
    token = str(uuid.uuid4())
    self._storage.upsert_agent(
        agent_id=agent_id,
        contract_json=json.dumps(contract),
        priority=priority,
        status=AgentRegistryStatus.ACTIVE,
        agent_version=agent_version,
        session_token=token,
    )
    logger.info("Handshake: agent_id=%s ver=%s accepted", agent_id, agent_version)
    return HandshakeResult(accepted=True, session_token=token)

list_agents()

List all active agent IDs.

Source code in src/dir_core/agent_registry.py
176
177
178
def list_agents(self) -> List[str]:
    """List all active agent IDs."""
    return self._storage.list_active_agents()

register_agent(agent_id, contract, priority=0)

Register or update an agent with capability contract.

Deprecated: Use handshake() for version checking and session tokens (DIR §2.3). register_agent does not enforce SemVer compatibility.

Source code in src/dir_core/agent_registry.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def register_agent(
    self,
    agent_id: str,
    contract: Dict[str, Any],
    priority: int = 0,
) -> None:
    """Register or update an agent with capability contract.

    Deprecated: Use handshake() for version checking and session tokens (DIR §2.3).
    register_agent does not enforce SemVer compatibility.
    """
    warnings.warn(
        "register_agent is deprecated; use handshake() for version checking (DIR §2.3)",
        DeprecationWarning,
        stacklevel=2,
    )
    self._storage.upsert_agent(
        agent_id=agent_id,
        contract_json=json.dumps(contract),
        priority=priority,
        status=AgentRegistryStatus.ACTIVE,
        agent_version=None,
        session_token=None,
    )
    logger.info("Registered agent: %s (priority=%d)", agent_id, priority)

set_agent_status(agent_id, status, suspension_reason=None)

Transition agent lifecycle status (e.g. ACTIVE -> SUSPENDED).

Parameters:

Name Type Description Default
agent_id str

Registered agent identifier.

required
status str

New status value (e.g. 'SUSPENDED', 'ACTIVE').

required
suspension_reason Optional[str]

Optional machine-oriented reason (audit / ops).

None

Returns:

Type Description
bool

True if a row was updated.

Source code in src/dir_core/agent_registry.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def set_agent_status(
    self,
    agent_id: str,
    status: str,
    suspension_reason: Optional[str] = None,
) -> bool:
    """
    Transition agent lifecycle status (e.g. ACTIVE -> SUSPENDED).

    Args:
        agent_id: Registered agent identifier.
        status: New status value (e.g. 'SUSPENDED', 'ACTIVE').
        suspension_reason: Optional machine-oriented reason (audit / ops).

    Returns:
        True if a row was updated.
    """
    updated = self._storage.update_status(agent_id, status, suspension_reason)
    if updated:
        logger.info(
            "Agent status: agent_id=%s status=%s reason=%s",
            agent_id,
            status,
            suspension_reason,
        )
    return updated

HandshakeResult dataclass

Result of agent handshake (DIR §2.3).

Source code in src/dir_core/agent_registry.py
25
26
27
28
29
30
31
@dataclass
class HandshakeResult:
    """Result of agent handshake (DIR §2.3)."""

    accepted: bool
    session_token: Optional[str] = None
    reason: Optional[str] = None

Context Store

dir_core.context_store

Context Store (DIR §8) - Manages multi-layered context for agents.

Layers: 1. Session (Ephemeral): Context specific to the current DecisionFlow (dfid). 2. State (Authoritative): Long-lived agent state (policy versions, trajectory). 3. Memory (Long-term): Vector DB or archival storage (Stub for MVP). 4. Artifacts (Reference): Static docs/rules (Stub).

Provides compile_working_context to assemble a frozen view for decision making.

Implementation note: Memory and Artifacts layers are stubs (return {}). Full implementation requires: Memory (vector DB / archival), Artifacts (RAG / static docs). See DIR §8.1, ROA §7.2 for layer definitions.

ContextStore

Multi-layered context store for agent state (DIR §8).

Storage backend is pluggable. Pass storage= for a custom backend, or db_path= to use the built-in SQLite backend (default behaviour).

Parameters:

Name Type Description Default
db_path Optional[str]

Path to SQLite database. Used when storage is not provided.

None
storage Optional[ContextStorage]

Custom :class:~dir_core.storage.ContextStorage backend. When provided, db_path is ignored.

None

Raises:

Type Description
ValueError

When neither db_path nor storage is supplied.

Source code in src/dir_core/context_store.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class ContextStore:
    """Multi-layered context store for agent state (DIR §8).

    Storage backend is pluggable. Pass ``storage=`` for a custom backend, or
    ``db_path=`` to use the built-in SQLite backend (default behaviour).

    Args:
        db_path: Path to SQLite database. Used when ``storage`` is not provided.
        storage: Custom :class:`~dir_core.storage.ContextStorage` backend.
            When provided, ``db_path`` is ignored.

    Raises:
        ValueError: When neither ``db_path`` nor ``storage`` is supplied.
    """

    def __init__(
        self,
        db_path: Optional[str] = None,
        *,
        storage: Optional[ContextStorage] = None,
    ):
        if storage is not None:
            self._storage: ContextStorage = storage
        elif db_path is not None:
            self.db_path = db_path  # kept for backward compatibility
            self._storage = SqliteContextStorage(db_path)
        else:
            raise ValueError(
                "Provide either 'db_path' (SQLite) or 'storage' (custom backend)."
            )

    # -------------------------------------------------------------------------
    # Layer 1: Session (Ephemeral)
    # -------------------------------------------------------------------------

    def get_session(self, dfid: str) -> Dict[str, Any]:
        raw = self._storage.get_session(dfid)
        return json.loads(raw) if raw else {}

    def update_session(self, dfid: str, updates: Dict[str, Any]) -> None:
        """Merge updates into existing session."""
        current = self.get_session(dfid)
        current.update(updates)
        self._storage.set_session(dfid, json.dumps(current))

    # -------------------------------------------------------------------------
    # Layer 2: State (Authoritative)
    # -------------------------------------------------------------------------

    def get_state(self, agent_id: str) -> Dict[str, Any]:
        raw = self._storage.get_state(agent_id)
        return json.loads(raw) if raw else {}

    def update_state(self, agent_id: str, updates: Dict[str, Any]) -> None:
        """Merge updates into existing state."""
        current = self.get_state(agent_id)
        current.update(updates)
        self._storage.set_state(agent_id, json.dumps(current))

    # -------------------------------------------------------------------------
    # Compiler
    # -------------------------------------------------------------------------

    def compile_working_context(self, agent_id: str, dfid: str) -> Dict[str, Any]:
        """
        Assemble all layers into a single Working Context.
        Returns immutable dictionary (snapshot).
        """
        session_data = self.get_session(dfid)
        state_data = self.get_state(agent_id)

        # In a real system, we'd fetch Memory and Artifacts here too.

        return {
            "meta": {
                "agent_id": agent_id,
                "dfid": dfid,
                "source": "ContextStore",
            },
            "session": session_data,
            "state": state_data,
            "memory": {},    # Stub
            "artifacts": {}  # Stub
        }

compile_working_context(agent_id, dfid)

Assemble all layers into a single Working Context. Returns immutable dictionary (snapshot).

Source code in src/dir_core/context_store.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def compile_working_context(self, agent_id: str, dfid: str) -> Dict[str, Any]:
    """
    Assemble all layers into a single Working Context.
    Returns immutable dictionary (snapshot).
    """
    session_data = self.get_session(dfid)
    state_data = self.get_state(agent_id)

    # In a real system, we'd fetch Memory and Artifacts here too.

    return {
        "meta": {
            "agent_id": agent_id,
            "dfid": dfid,
            "source": "ContextStore",
        },
        "session": session_data,
        "state": state_data,
        "memory": {},    # Stub
        "artifacts": {}  # Stub
    }

update_session(dfid, updates)

Merge updates into existing session.

Source code in src/dir_core/context_store.py
66
67
68
69
70
def update_session(self, dfid: str, updates: Dict[str, Any]) -> None:
    """Merge updates into existing session."""
    current = self.get_session(dfid)
    current.update(updates)
    self._storage.set_session(dfid, json.dumps(current))

update_state(agent_id, updates)

Merge updates into existing state.

Source code in src/dir_core/context_store.py
80
81
82
83
84
def update_state(self, agent_id: str, updates: Dict[str, Any]) -> None:
    """Merge updates into existing state."""
    current = self.get_state(agent_id)
    current.update(updates)
    self._storage.set_state(agent_id, json.dumps(current))

Idempotency

dir_core.idempotency

Idempotency Guard (DIR §7) - ensures operations are executed exactly once.

Key is formed from: - DFID (decision flow ID) - Step ID (unique within flow) - Canonical parameters (sorted JSON)

Backend can be in-memory (testing) or persistent (production). The built-in backends are :class:MemoryBackend and :class:SQLiteBackend; custom backends must satisfy the :class:IdempotencyBackend protocol.

IdempotencyBackend

Bases: Protocol

Protocol satisfied by all idempotency storage backends.

Source code in src/dir_core/idempotency.py
32
33
34
35
36
class IdempotencyBackend(Protocol):
    """Protocol satisfied by all idempotency storage backends."""

    def get(self, key: str) -> Optional[Dict[str, Any]]: ...
    def set(self, key: str, result: Dict[str, Any]) -> None: ...

IdempotencyGuard

Guard that checks cache before execution and records result after.

Parameters:

Name Type Description Default
backend IdempotencyBackend

Any object satisfying :class:IdempotencyBackend (e.g. MemoryBackend(), SQLiteBackend("cache.db"), or a custom implementation).

required
Source code in src/dir_core/idempotency.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class IdempotencyGuard:
    """Guard that checks cache before execution and records result after.

    Args:
        backend: Any object satisfying :class:`IdempotencyBackend`
            (e.g. ``MemoryBackend()``, ``SQLiteBackend("cache.db")``,
            or a custom implementation).
    """

    def __init__(self, backend: IdempotencyBackend):
        self.backend = backend

    def run(self, dfid: str, step_id: str, params: Dict[str, Any], func: Callable[..., Any]) -> Any:
        """Run func(params) with idempotency protection."""
        key = idempotency_key(dfid, step_id, params)

        cached = self.backend.get(key)
        if cached is not None:
            logger.info(f"[Idempotency] HIT key={key[:8]}...")
            return cached

        logger.info(f"[Idempotency] MISS key={key[:8]}... Executing.")
        result = func(**params)

        self.backend.set(key, result)
        return result

run(dfid, step_id, params, func)

Run func(params) with idempotency protection.

Source code in src/dir_core/idempotency.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def run(self, dfid: str, step_id: str, params: Dict[str, Any], func: Callable[..., Any]) -> Any:
    """Run func(params) with idempotency protection."""
    key = idempotency_key(dfid, step_id, params)

    cached = self.backend.get(key)
    if cached is not None:
        logger.info(f"[Idempotency] HIT key={key[:8]}...")
        return cached

    logger.info(f"[Idempotency] MISS key={key[:8]}... Executing.")
    result = func(**params)

    self.backend.set(key, result)
    return result

idempotency_key(dfid, step_id, params)

Compute deterministic key: SHA256(dfid|step_id|canonical_params).

Source code in src/dir_core/idempotency.py
25
26
27
28
29
def idempotency_key(dfid: str, step_id: str, params: Dict[str, Any]) -> str:
    """Compute deterministic key: SHA256(dfid|step_id|canonical_params)."""
    canonical = json.dumps(params, sort_keys=True)
    raw = f"{dfid}:{step_id}:{canonical}"
    return hashlib.sha256(raw.encode()).hexdigest()

Escalation

dir_core.escalation

Escalation Manager (DIR §9).

Human-in-the-Loop: request escalation, budget (token bucket), resolve.

EscalationManager

Manages escalation requests, budget, and resolution (DIR §9).

Storage backend is pluggable. Pass storage= for a custom backend, or db_path= to use the built-in SQLite backend (default behaviour).

Parameters:

Name Type Description Default
db_path Optional[str]

Path to SQLite database. Used when storage is not provided.

None
max_escalations_per_hour int

Token-bucket capacity per agent per window.

3
refill_interval_sec int

Window length in seconds (default 3600 = 1 hour).

3600
storage Optional[EscalationStorage]

Custom :class:~dir_core.storage.EscalationStorage backend. When provided, db_path is ignored.

None

Raises:

Type Description
ValueError

When neither db_path nor storage is supplied.

Source code in src/dir_core/escalation.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class EscalationManager:
    """Manages escalation requests, budget, and resolution (DIR §9).

    Storage backend is pluggable. Pass ``storage=`` for a custom backend, or
    ``db_path=`` to use the built-in SQLite backend (default behaviour).

    Args:
        db_path: Path to SQLite database. Used when ``storage`` is not provided.
        max_escalations_per_hour: Token-bucket capacity per agent per window.
        refill_interval_sec: Window length in seconds (default 3600 = 1 hour).
        storage: Custom :class:`~dir_core.storage.EscalationStorage` backend.
            When provided, ``db_path`` is ignored.

    Raises:
        ValueError: When neither ``db_path`` nor ``storage`` is supplied.
    """

    def __init__(
        self,
        db_path: Optional[str] = None,
        max_escalations_per_hour: int = 3,
        refill_interval_sec: int = 3600,
        *,
        storage: Optional[EscalationStorage] = None,
    ):
        self.max_escalations_per_hour = max_escalations_per_hour
        self.refill_interval_sec = refill_interval_sec

        if storage is not None:
            self._storage: EscalationStorage = storage
        elif db_path is not None:
            self.db_path = db_path  # kept for backward compatibility
            self._storage = SqliteEscalationStorage(db_path)
        else:
            raise ValueError(
                "Provide either 'db_path' (SQLite) or 'storage' (custom backend)."
            )

    def _get_window_count(self, agent_id: str, now: datetime) -> int:
        """Count escalations in current refill window (last N seconds)."""
        since = now - timedelta(seconds=self.refill_interval_sec)
        since_str = since.strftime("%Y-%m-%d %H:%M:%S")
        return self._storage.get_window_count(agent_id, since_str)

    def request_escalation(
        self,
        dfid: str,
        agent_id: str,
        reason: str,
        context: Dict[str, Any],
        proposal: PolicyProposal,
        impact: ImpactCategory,
    ) -> EscalationOutcome:
        """
        Request escalation. Returns GRANTED or BUDGET_EXHAUSTED.
        On BUDGET_EXHAUSTED, agent should be demoted to PASSIVE and flow aborted.
        """
        now = datetime.now(timezone.utc)
        count = self._get_window_count(agent_id, now)
        if count >= self.max_escalations_per_hour:
            logger.warning(
                "Escalation budget exhausted: agent_id=%s count=%d",
                agent_id,
                count,
            )
            return EscalationOutcome.BUDGET_EXHAUSTED

        self._storage.record_budget_token(agent_id)
        self._storage.insert_request(
            dfid=dfid,
            agent_id=agent_id,
            reason=reason,
            context_json=json.dumps(context, default=str),
            proposal_json=proposal.model_dump_json(),
            impact=impact.value,
        )
        return EscalationOutcome.GRANTED

    def request_from_model(
        self, escalation: EscalationRequest
    ) -> EscalationOutcome:
        """
        Request escalation from EscalationRequest model (DIR §5.3).

        Maps EscalationRequest to request_escalation API. Converts severity
        (LOW/MEDIUM/HIGH/CRITICAL) to ImpactCategory (LOW_IMPACT/HIGH_IMPACT).
        If original_policy is Policy, converts to PolicyProposal for storage.
        """
        impact = (
            ImpactCategory.HIGH_IMPACT
            if escalation.severity in (EscalationSeverity.HIGH, EscalationSeverity.CRITICAL)
            else ImpactCategory.LOW_IMPACT
        )
        if escalation.original_policy is not None:
            orig = escalation.original_policy
            if isinstance(orig, Policy):
                proposal = PolicyProposal(
                    dfid=orig.dfid,
                    agent_id=orig.agent_id,
                    policy_kind=orig.proposed_action,
                    params={},
                    justification=orig.justification,
                    confidence=orig.confidence,
                )
            else:
                proposal = orig  # PolicyProposal or compatible
        else:
            proposal = PolicyProposal(
                dfid=escalation.dfid,
                agent_id=escalation.from_agent_id,
                policy_kind="escalation",
                params={"trigger": escalation.trigger},
            )
        return self.request_escalation(
            dfid=escalation.dfid,
            agent_id=escalation.from_agent_id,
            reason=escalation.trigger,
            context=escalation.context,
            proposal=proposal,
            impact=impact,
        )

    def resolve_escalation(
        self,
        dfid: str,
        decision: Union[HumanDecision, str],
        modified_proposal: Optional[PolicyProposal] = None,
    ) -> None:
        """Record human decision: OVERRIDE, MODIFY, or ABORT."""
        resolved = HumanDecision(decision) if isinstance(decision, str) else decision
        now = datetime.now(timezone.utc)
        proposal_json = (
            modified_proposal.model_dump_json() if modified_proposal else None
        )
        self._storage.resolve_request(
            dfid=dfid,
            resolved_at=now.isoformat(),
            decision=resolved.value,
            proposal_json=proposal_json,
        )

    def get_pending(self) -> List[Dict[str, Any]]:
        """Return list of pending escalation requests."""
        return self._storage.get_pending_requests()

get_pending()

Return list of pending escalation requests.

Source code in src/dir_core/escalation.py
176
177
178
def get_pending(self) -> List[Dict[str, Any]]:
    """Return list of pending escalation requests."""
    return self._storage.get_pending_requests()

request_escalation(dfid, agent_id, reason, context, proposal, impact)

Request escalation. Returns GRANTED or BUDGET_EXHAUSTED. On BUDGET_EXHAUSTED, agent should be demoted to PASSIVE and flow aborted.

Source code in src/dir_core/escalation.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def request_escalation(
    self,
    dfid: str,
    agent_id: str,
    reason: str,
    context: Dict[str, Any],
    proposal: PolicyProposal,
    impact: ImpactCategory,
) -> EscalationOutcome:
    """
    Request escalation. Returns GRANTED or BUDGET_EXHAUSTED.
    On BUDGET_EXHAUSTED, agent should be demoted to PASSIVE and flow aborted.
    """
    now = datetime.now(timezone.utc)
    count = self._get_window_count(agent_id, now)
    if count >= self.max_escalations_per_hour:
        logger.warning(
            "Escalation budget exhausted: agent_id=%s count=%d",
            agent_id,
            count,
        )
        return EscalationOutcome.BUDGET_EXHAUSTED

    self._storage.record_budget_token(agent_id)
    self._storage.insert_request(
        dfid=dfid,
        agent_id=agent_id,
        reason=reason,
        context_json=json.dumps(context, default=str),
        proposal_json=proposal.model_dump_json(),
        impact=impact.value,
    )
    return EscalationOutcome.GRANTED

request_from_model(escalation)

Request escalation from EscalationRequest model (DIR §5.3).

Maps EscalationRequest to request_escalation API. Converts severity (LOW/MEDIUM/HIGH/CRITICAL) to ImpactCategory (LOW_IMPACT/HIGH_IMPACT). If original_policy is Policy, converts to PolicyProposal for storage.

Source code in src/dir_core/escalation.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def request_from_model(
    self, escalation: EscalationRequest
) -> EscalationOutcome:
    """
    Request escalation from EscalationRequest model (DIR §5.3).

    Maps EscalationRequest to request_escalation API. Converts severity
    (LOW/MEDIUM/HIGH/CRITICAL) to ImpactCategory (LOW_IMPACT/HIGH_IMPACT).
    If original_policy is Policy, converts to PolicyProposal for storage.
    """
    impact = (
        ImpactCategory.HIGH_IMPACT
        if escalation.severity in (EscalationSeverity.HIGH, EscalationSeverity.CRITICAL)
        else ImpactCategory.LOW_IMPACT
    )
    if escalation.original_policy is not None:
        orig = escalation.original_policy
        if isinstance(orig, Policy):
            proposal = PolicyProposal(
                dfid=orig.dfid,
                agent_id=orig.agent_id,
                policy_kind=orig.proposed_action,
                params={},
                justification=orig.justification,
                confidence=orig.confidence,
            )
        else:
            proposal = orig  # PolicyProposal or compatible
    else:
        proposal = PolicyProposal(
            dfid=escalation.dfid,
            agent_id=escalation.from_agent_id,
            policy_kind="escalation",
            params={"trigger": escalation.trigger},
        )
    return self.request_escalation(
        dfid=escalation.dfid,
        agent_id=escalation.from_agent_id,
        reason=escalation.trigger,
        context=escalation.context,
        proposal=proposal,
        impact=impact,
    )

resolve_escalation(dfid, decision, modified_proposal=None)

Record human decision: OVERRIDE, MODIFY, or ABORT.

Source code in src/dir_core/escalation.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def resolve_escalation(
    self,
    dfid: str,
    decision: Union[HumanDecision, str],
    modified_proposal: Optional[PolicyProposal] = None,
) -> None:
    """Record human decision: OVERRIDE, MODIFY, or ABORT."""
    resolved = HumanDecision(decision) if isinstance(decision, str) else decision
    now = datetime.now(timezone.utc)
    proposal_json = (
        modified_proposal.model_dump_json() if modified_proposal else None
    )
    self._storage.resolve_request(
        dfid=dfid,
        resolved_at=now.isoformat(),
        decision=resolved.value,
        proposal_json=proposal_json,
    )

EscalationOutcome

Bases: StrEnum

Result of request_escalation.

Source code in src/dir_core/escalation.py
28
29
30
31
32
class EscalationOutcome(StrEnum):
    """Result of request_escalation."""

    GRANTED = "GRANTED"
    BUDGET_EXHAUSTED = "BUDGET_EXHAUSTED"

ImpactCategory

Bases: StrEnum

Impact level for escalation (DIR §9.4).

Source code in src/dir_core/escalation.py
21
22
23
24
25
class ImpactCategory(StrEnum):
    """Impact level for escalation (DIR §9.4)."""

    LOW_IMPACT = "LOW_IMPACT"
    HIGH_IMPACT = "HIGH_IMPACT"

Resource Lock

dir_core.resource_lock

Resource Locking / Semantic Locking (DIR §6.2, §2.3).

Reservation locks for shared resources (capital, API throughput). Linear Lock Acquisition (alphabetical order) to prevent deadlocks.

LockResult

Bases: StrEnum

Result of acquire attempt.

Source code in src/dir_core/resource_lock.py
19
20
21
22
23
24
class LockResult(StrEnum):
    """Result of acquire attempt."""

    ACQUIRED = "ACQUIRED"
    INSUFFICIENT_LIQUIDITY = "INSUFFICIENT_LIQUIDITY"
    RESOURCE_CONTENTION_TIMEOUT = "RESOURCE_CONTENTION_TIMEOUT"

ResourceLockManager

Manages reservation locks for shared resources (DIR §6.2).

Responsibility split:

  • Manager: checks domain availability (via availability_provider), enforces alphabetical lock ordering to prevent deadlocks, retries on contention.
  • Storage backend: provides atomic batch-write of acquired locks and returns the currently locked amount per resource.

Storage backend is pluggable. Pass storage= for a custom backend, or db_path= to use the built-in SQLite backend (default behaviour).

Parameters:

Name Type Description Default
db_path Optional[str]

Path to SQLite database. Used when storage is not provided.

None
availability_provider Optional[Callable[[str], float]]

Callable(resource_id) -> float returning the total capacity for a given resource.

None
storage Optional[ResourceLockStorage]

Custom :class:~dir_core.storage.ResourceLockStorage backend. When provided, db_path is ignored.

None

Raises:

Type Description
ValueError

When availability_provider is missing, or when neither db_path nor storage is supplied.

Source code in src/dir_core/resource_lock.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class ResourceLockManager:
    """Manages reservation locks for shared resources (DIR §6.2).

    Responsibility split:

    - **Manager**: checks domain availability (via ``availability_provider``),
      enforces alphabetical lock ordering to prevent deadlocks, retries on
      contention.
    - **Storage backend**: provides atomic batch-write of acquired locks and
      returns the currently locked amount per resource.

    Storage backend is pluggable. Pass ``storage=`` for a custom backend, or
    ``db_path=`` to use the built-in SQLite backend (default behaviour).

    Args:
        db_path: Path to SQLite database. Used when ``storage`` is not provided.
        availability_provider: ``Callable(resource_id) -> float`` returning the
            total capacity for a given resource.
        storage: Custom :class:`~dir_core.storage.ResourceLockStorage` backend.
            When provided, ``db_path`` is ignored.

    Raises:
        ValueError: When ``availability_provider`` is missing, or when neither
            ``db_path`` nor ``storage`` is supplied.
    """

    def __init__(
        self,
        db_path: Optional[str] = None,
        availability_provider: Optional[Callable[[str], float]] = None,
        *,
        storage: Optional[ResourceLockStorage] = None,
    ):
        if availability_provider is None:
            raise ValueError("availability_provider is required.")

        self.availability_provider = availability_provider

        if storage is not None:
            self._storage: ResourceLockStorage = storage
        elif db_path is not None:
            self.db_path = db_path  # kept for backward compatibility
            self._storage = SqliteResourceLockStorage(db_path)
        else:
            raise ValueError(
                "Provide either 'db_path' (SQLite) or 'storage' (custom backend)."
            )

    def acquire(
        self,
        dfid: str,
        resources: Dict[str, float],
        timeout_sec: float = 5.0,
    ) -> LockResult:
        """Acquire locks for all resources in sorted order (DIR Topologies §6.4).

        Flow:
        1. Check domain availability for each resource via
           ``availability_provider``.  Return ``INSUFFICIENT_LIQUIDITY``
           immediately if any resource is over-allocated.
        2. Ask the storage backend to write the locks atomically.  The backend
           may retry internally (SQLite uses ``BEGIN IMMEDIATE``); if it cannot
           obtain exclusive write access within *timeout_sec*, return
           ``RESOURCE_CONTENTION_TIMEOUT``.
        """
        sorted_ids = sorted(resources.keys())
        deadline = time.monotonic() + timeout_sec

        while time.monotonic() < deadline:
            # Step 1 — availability check (domain logic, outside transaction)
            for rid in sorted_ids:
                available = self.availability_provider(rid)
                locked = self._storage.get_locked_amount(rid, exclude_dfid=dfid)
                if available - locked < resources[rid]:
                    return LockResult.INSUFFICIENT_LIQUIDITY

            # Step 2 — atomic write (backend handles its own concurrency)
            remaining = max(0.0, deadline - time.monotonic())
            acquired = self._storage.acquire_batch(dfid, resources, remaining)
            if acquired:
                return LockResult.ACQUIRED

            # Backend timed out getting exclusive write access — retry
            time.sleep(0.05)

        return LockResult.RESOURCE_CONTENTION_TIMEOUT

    def release(self, dfid: str) -> None:
        """Release all locks held by dfid."""
        self._storage.release(dfid)

acquire(dfid, resources, timeout_sec=5.0)

Acquire locks for all resources in sorted order (DIR Topologies §6.4).

Flow: 1. Check domain availability for each resource via availability_provider. Return INSUFFICIENT_LIQUIDITY immediately if any resource is over-allocated. 2. Ask the storage backend to write the locks atomically. The backend may retry internally (SQLite uses BEGIN IMMEDIATE); if it cannot obtain exclusive write access within timeout_sec, return RESOURCE_CONTENTION_TIMEOUT.

Source code in src/dir_core/resource_lock.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def acquire(
    self,
    dfid: str,
    resources: Dict[str, float],
    timeout_sec: float = 5.0,
) -> LockResult:
    """Acquire locks for all resources in sorted order (DIR Topologies §6.4).

    Flow:
    1. Check domain availability for each resource via
       ``availability_provider``.  Return ``INSUFFICIENT_LIQUIDITY``
       immediately if any resource is over-allocated.
    2. Ask the storage backend to write the locks atomically.  The backend
       may retry internally (SQLite uses ``BEGIN IMMEDIATE``); if it cannot
       obtain exclusive write access within *timeout_sec*, return
       ``RESOURCE_CONTENTION_TIMEOUT``.
    """
    sorted_ids = sorted(resources.keys())
    deadline = time.monotonic() + timeout_sec

    while time.monotonic() < deadline:
        # Step 1 — availability check (domain logic, outside transaction)
        for rid in sorted_ids:
            available = self.availability_provider(rid)
            locked = self._storage.get_locked_amount(rid, exclude_dfid=dfid)
            if available - locked < resources[rid]:
                return LockResult.INSUFFICIENT_LIQUIDITY

        # Step 2 — atomic write (backend handles its own concurrency)
        remaining = max(0.0, deadline - time.monotonic())
        acquired = self._storage.acquire_batch(dfid, resources, remaining)
        if acquired:
            return LockResult.ACQUIRED

        # Backend timed out getting exclusive write access — retry
        time.sleep(0.05)

    return LockResult.RESOURCE_CONTENTION_TIMEOUT

release(dfid)

Release all locks held by dfid.

Source code in src/dir_core/resource_lock.py
114
115
116
def release(self, dfid: str) -> None:
    """Release all locks held by dfid."""
    self._storage.release(dfid)

JIT State Verifier

dir_core.jit

Just-In-Time (JIT) State Verification (DIR §6.5, Topologies §2.4, §3.2).

Fast-pass checks before execution: verify state has not drifted since snapshot. Does NOT re-evaluate reasoning — only compares snapshot vs live state.

JITStateVerifier

JIT State Verifier for Topology B (SDS).

Validates that the environment has not drifted since the agent's snapshot. Domain-specific logic (which keys, hard limits) is passed via callbacks.

Source code in src/dir_core/jit.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class JITStateVerifier:
    """
    JIT State Verifier for Topology B (SDS).

    Validates that the environment has not drifted since the agent's snapshot.
    Domain-specific logic (which keys, hard limits) is passed via callbacks.
    """

    def verify(
        self,
        snapshot: Dict[str, Any],
        live: Dict[str, Any],
        keys_to_compare: Optional[List[str]] = None,
        tolerance: Optional[Dict[str, float]] = None,
    ) -> ValidationResult:
        """
        Run drift verification.

        Returns:
            ("ACCEPT", reason) if no drift, else ("REJECT", reason).
        """
        ok, reason = verify_drift(
            snapshot, live,
            keys_to_compare=keys_to_compare,
            tolerance=tolerance,
        )
        if ok:
            return ValidationVerdict.ACCEPT, "no state drift"
        return ValidationVerdict.REJECT, reason

verify(snapshot, live, keys_to_compare=None, tolerance=None)

Run drift verification.

Returns:

Type Description
ValidationResult

("ACCEPT", reason) if no drift, else ("REJECT", reason).

Source code in src/dir_core/jit.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def verify(
    self,
    snapshot: Dict[str, Any],
    live: Dict[str, Any],
    keys_to_compare: Optional[List[str]] = None,
    tolerance: Optional[Dict[str, float]] = None,
) -> ValidationResult:
    """
    Run drift verification.

    Returns:
        ("ACCEPT", reason) if no drift, else ("REJECT", reason).
    """
    ok, reason = verify_drift(
        snapshot, live,
        keys_to_compare=keys_to_compare,
        tolerance=tolerance,
    )
    if ok:
        return ValidationVerdict.ACCEPT, "no state drift"
    return ValidationVerdict.REJECT, reason

verify_drift(snapshot, live, keys_to_compare=None, tolerance=None)

Verify that live state has not drifted beyond tolerance since snapshot.

Parameters:

Name Type Description Default
snapshot Dict[str, Any]

State at snapshot time (when agent reasoned).

required
live Dict[str, Any]

Current live state.

required
keys_to_compare Optional[List[str]]

Keys to check. If None, compare all keys in snapshot.

None
tolerance Optional[Dict[str, float]]

Optional dict of key -> max allowed delta for numeric values. If key not in tolerance, exact match is required.

None

Returns:

Type Description
Tuple[bool, str]

(True, "") if no drift, else (False, reason).

Source code in src/dir_core/jit.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def verify_drift(
    snapshot: Dict[str, Any],
    live: Dict[str, Any],
    keys_to_compare: Optional[List[str]] = None,
    tolerance: Optional[Dict[str, float]] = None,
) -> Tuple[bool, str]:
    """
    Verify that live state has not drifted beyond tolerance since snapshot.

    Args:
        snapshot: State at snapshot time (when agent reasoned).
        live: Current live state.
        keys_to_compare: Keys to check. If None, compare all keys in snapshot.
        tolerance: Optional dict of key -> max allowed delta for numeric values.
            If key not in tolerance, exact match is required.

    Returns:
        (True, "") if no drift, else (False, reason).
    """
    tolerance = tolerance or {}
    keys = keys_to_compare or list(snapshot.keys())

    for key in keys:
        snap_val = snapshot.get(key)
        live_val = live.get(key)

        if key in tolerance:
            try:
                snap_num = float(snap_val) if snap_val is not None else 0.0
                live_num = float(live_val) if live_val is not None else 0.0
                if abs(snap_num - live_num) > tolerance[key]:
                    return (
                        False,
                        f"STATE_DRIFT: {key} changed beyond tolerance "
                        f"(snapshot={snap_val}, live={live_val})",
                    )
            except (TypeError, ValueError):
                if snap_val != live_val:
                    return (
                        False,
                        f"STATE_DRIFT: {key} mismatch (snapshot={snap_val}, live={live_val})",
                    )
        else:
            if snap_val != live_val:
                return (
                    False,
                    f"STATE_DRIFT: {key} changed (snapshot={snap_val}, live={live_val})",
                )

    return True, ""

Arbitration

dir_core.arbitration

Priority-Based Arbitration (DIR Topologies §2.4).

Selects the winning proposal from parallel agents using a priority matrix. Lower priority number = higher precedence (e.g. Risk > Strategy).

select_winner(proposals, priority_matrix=None)

Select winning proposal using Priority Matrix (Topologies §2.4).

Lower priority number = higher precedence. If no matrix provided, uses DEFAULT_PRIORITY_MATRIX. Unknown policy_kind gets priority 10.

Parameters:

Name Type Description Default
proposals List[PolicyProposal]

List of policy proposals from parallel agents.

required
priority_matrix Optional[Dict[str, int]]

Optional mapping policy_kind -> priority (lower = higher).

None

Returns:

Type Description
Optional[PolicyProposal]

The winning proposal, or None if proposals list is empty.

Source code in src/dir_core/arbitration.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def select_winner(
    proposals: List[PolicyProposal],
    priority_matrix: Optional[Dict[str, int]] = None,
) -> Optional[PolicyProposal]:
    """Select winning proposal using Priority Matrix (Topologies §2.4).

    Lower priority number = higher precedence. If no matrix provided,
    uses DEFAULT_PRIORITY_MATRIX. Unknown policy_kind gets priority 10.

    Args:
        proposals: List of policy proposals from parallel agents.
        priority_matrix: Optional mapping policy_kind -> priority (lower = higher).

    Returns:
        The winning proposal, or None if proposals list is empty.
    """
    if not proposals:
        return None

    matrix = priority_matrix or DEFAULT_PRIORITY_MATRIX

    def get_priority(p: PolicyProposal) -> int:
        return matrix.get(p.policy_kind, 10)

    return min(proposals, key=get_priority)