Skip to content

Storage Layer

dir_core.storage

dir_core.storage — pluggable persistence layer for DIR modules.

Quick start::

from dir_core.storage import sqlite_storage, memory_storage

# All modules backed by a single SQLite file
stores = sqlite_storage("data/my_app.db")
registry = AgentRegistry(storage=stores.agent_registry)
context  = ContextStore(storage=stores.context)

# Fully in-memory (great for unit tests)
stores = memory_storage()

Custom backend example::

from dir_core.storage.base import AgentRegistryStorage

class MyPostgresAgentStorage:
    def init_schema(self) -> None: ...
    def upsert_agent(self, agent_id, contract_json, priority,
                     status, agent_version, session_token) -> None: ...
    # ... implement all methods of AgentRegistryStorage

registry = AgentRegistry(storage=MyPostgresAgentStorage())

AgentRegistryStorage

Bases: Protocol

Source code in src/dir_core/storage/base.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@runtime_checkable
class AgentRegistryStorage(Protocol):
    def init_schema(self) -> None:
        """Create or migrate the underlying schema (called once on construction)."""
        ...

    def upsert_agent(
        self,
        agent_id: str,
        contract_json: str,
        priority: int,
        status: str,
        agent_version: Optional[str],
        session_token: Optional[str],
    ) -> None:
        """Insert or replace an agent record."""
        ...

    def get_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
        """Return a dict with keys agent_id/contract/priority/status/
        agent_version/session_token, or None if not found."""
        ...

    def update_status(
        self, agent_id: str, status: str, suspension_reason: Optional[str]
    ) -> bool:
        """Update status and suspension_reason. Return True if a row was changed."""
        ...

    def get_status(self, agent_id: str) -> Optional[Tuple[str, Optional[str]]]:
        """Return (status, suspension_reason) or None if agent does not exist."""
        ...

    def list_active_agents(self) -> List[str]:
        """Return agent_ids where status == 'ACTIVE'."""
        ...

get_agent(agent_id)

Return a dict with keys agent_id/contract/priority/status/ agent_version/session_token, or None if not found.

Source code in src/dir_core/storage/base.py
59
60
61
62
def get_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
    """Return a dict with keys agent_id/contract/priority/status/
    agent_version/session_token, or None if not found."""
    ...

get_status(agent_id)

Return (status, suspension_reason) or None if agent does not exist.

Source code in src/dir_core/storage/base.py
70
71
72
def get_status(self, agent_id: str) -> Optional[Tuple[str, Optional[str]]]:
    """Return (status, suspension_reason) or None if agent does not exist."""
    ...

init_schema()

Create or migrate the underlying schema (called once on construction).

Source code in src/dir_core/storage/base.py
43
44
45
def init_schema(self) -> None:
    """Create or migrate the underlying schema (called once on construction)."""
    ...

list_active_agents()

Return agent_ids where status == 'ACTIVE'.

Source code in src/dir_core/storage/base.py
74
75
76
def list_active_agents(self) -> List[str]:
    """Return agent_ids where status == 'ACTIVE'."""
    ...

update_status(agent_id, status, suspension_reason)

Update status and suspension_reason. Return True if a row was changed.

Source code in src/dir_core/storage/base.py
64
65
66
67
68
def update_status(
    self, agent_id: str, status: str, suspension_reason: Optional[str]
) -> bool:
    """Update status and suspension_reason. Return True if a row was changed."""
    ...

upsert_agent(agent_id, contract_json, priority, status, agent_version, session_token)

Insert or replace an agent record.

Source code in src/dir_core/storage/base.py
47
48
49
50
51
52
53
54
55
56
57
def upsert_agent(
    self,
    agent_id: str,
    contract_json: str,
    priority: int,
    status: str,
    agent_version: Optional[str],
    session_token: Optional[str],
) -> None:
    """Insert or replace an agent record."""
    ...

AuditStore

Repository helper: append-only audit rows plus idempotent replay (DIR §7).

Combines :class:DecisionAuditStorage (decision_audit_events) and :class:IdempotencyStorage (idempotency_cache). Construct from a :class:~dir_core.storage.StorageBundle as AuditStore(bundle.decision_audit, bundle.idempotency).

Source code in src/dir_core/storage/base.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
class AuditStore:
    """Repository helper: append-only audit rows plus idempotent replay (DIR §7).

    Combines :class:`DecisionAuditStorage` (``decision_audit_events``) and
    :class:`IdempotencyStorage` (``idempotency_cache``). Construct from a
    :class:`~dir_core.storage.StorageBundle` as
    ``AuditStore(bundle.decision_audit, bundle.idempotency)``.
    """

    __slots__ = ("_decision_audit", "_idempotency")

    def __init__(
        self,
        decision_audit: DecisionAuditStorage,
        idempotency: IdempotencyStorage,
    ) -> None:
        self._decision_audit = decision_audit
        self._idempotency = idempotency

    def close(self) -> None:
        """No-op when backends use short-lived connections per call."""
        return None

    def record(
        self,
        dfid: str,
        event: str,
        *,
        step_id: str = "",
        state: str = "",
        details: Optional[Dict[str, Any]] = None,
    ) -> None:
        self._decision_audit.record(
            dfid, event, step_id=step_id, state=state, details=details
        )

    def get_idempotent_result(self, key: str) -> Optional[Dict[str, Any]]:
        return self._idempotency.get(key)

    def save_idempotent_result(
        self, key: str, dfid: str, result: Dict[str, Any]
    ) -> None:
        payload = dict(result)
        payload["_dfid"] = dfid
        self._idempotency.set(key, payload)

    def events_for_dfid(self, dfid: str) -> List[Dict[str, Any]]:
        return self._decision_audit.events_for_dfid(dfid)

    def all_events_chronological(self) -> List[Dict[str, Any]]:
        return self._decision_audit.all_events_chronological()

close()

No-op when backends use short-lived connections per call.

Source code in src/dir_core/storage/base.py
182
183
184
def close(self) -> None:
    """No-op when backends use short-lived connections per call."""
    return None

ContextStorage

Bases: Protocol

Source code in src/dir_core/storage/base.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@runtime_checkable
class ContextStorage(Protocol):
    def init_schema(self) -> None: ...

    def get_session(self, dfid: str) -> Optional[str]:
        """Return JSON-encoded session data for dfid, or None."""
        ...

    def set_session(self, dfid: str, data_json: str) -> None:
        """Persist JSON-encoded session data for dfid."""
        ...

    def get_state(self, agent_id: str) -> Optional[str]:
        """Return JSON-encoded state for agent_id, or None."""
        ...

    def set_state(self, agent_id: str, data_json: str) -> None:
        """Persist JSON-encoded state for agent_id."""
        ...

get_session(dfid)

Return JSON-encoded session data for dfid, or None.

Source code in src/dir_core/storage/base.py
88
89
90
def get_session(self, dfid: str) -> Optional[str]:
    """Return JSON-encoded session data for dfid, or None."""
    ...

get_state(agent_id)

Return JSON-encoded state for agent_id, or None.

Source code in src/dir_core/storage/base.py
96
97
98
def get_state(self, agent_id: str) -> Optional[str]:
    """Return JSON-encoded state for agent_id, or None."""
    ...

set_session(dfid, data_json)

Persist JSON-encoded session data for dfid.

Source code in src/dir_core/storage/base.py
92
93
94
def set_session(self, dfid: str, data_json: str) -> None:
    """Persist JSON-encoded session data for dfid."""
    ...

set_state(agent_id, data_json)

Persist JSON-encoded state for agent_id.

Source code in src/dir_core/storage/base.py
100
101
102
def set_state(self, agent_id: str, data_json: str) -> None:
    """Persist JSON-encoded state for agent_id."""
    ...

DecisionAuditStorage

Bases: Protocol

Source code in src/dir_core/storage/base.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@runtime_checkable
class DecisionAuditStorage(Protocol):
    def init_schema(self) -> None: ...

    def record(
        self,
        dfid: str,
        event: str,
        *,
        step_id: str = "",
        state: str = "",
        details: Optional[Dict[str, Any]] = None,
    ) -> None:
        """Append one DFID-scoped audit row.

        *details* is persisted as JSON. Implementations MUST use the same
        encoding as :func:`~dir_core.storage.json_util.dumps_json_dict`
        (``sort_keys=True``, ``default=str``) so SQLite, PostgreSQL, and any
        other backend produce comparable ``detail_json`` / ``detail_json``-text
        column values and do not fail on non-JSON-native values.
        """
        ...

    def events_for_dfid(self, dfid: str) -> List[Dict[str, Any]]:
        """Return events for *dfid* in insertion order (details = dict per row)."""
        ...

    def all_events_chronological(self) -> List[Dict[str, Any]]:
        """Return all events in insertion order."""
        ...

all_events_chronological()

Return all events in insertion order.

Source code in src/dir_core/storage/base.py
158
159
160
def all_events_chronological(self) -> List[Dict[str, Any]]:
    """Return all events in insertion order."""
    ...

events_for_dfid(dfid)

Return events for dfid in insertion order (details = dict per row).

Source code in src/dir_core/storage/base.py
154
155
156
def events_for_dfid(self, dfid: str) -> List[Dict[str, Any]]:
    """Return events for *dfid* in insertion order (details = dict per row)."""
    ...

record(dfid, event, *, step_id='', state='', details=None)

Append one DFID-scoped audit row.

details is persisted as JSON. Implementations MUST use the same encoding as :func:~dir_core.storage.json_util.dumps_json_dict (sort_keys=True, default=str) so SQLite, PostgreSQL, and any other backend produce comparable detail_json / detail_json-text column values and do not fail on non-JSON-native values.

Source code in src/dir_core/storage/base.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def record(
    self,
    dfid: str,
    event: str,
    *,
    step_id: str = "",
    state: str = "",
    details: Optional[Dict[str, Any]] = None,
) -> None:
    """Append one DFID-scoped audit row.

    *details* is persisted as JSON. Implementations MUST use the same
    encoding as :func:`~dir_core.storage.json_util.dumps_json_dict`
    (``sort_keys=True``, ``default=str``) so SQLite, PostgreSQL, and any
    other backend produce comparable ``detail_json`` / ``detail_json``-text
    column values and do not fail on non-JSON-native values.
    """
    ...

EscalationStorage

Bases: Protocol

Source code in src/dir_core/storage/base.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
@runtime_checkable
class EscalationStorage(Protocol):
    def init_schema(self) -> None: ...

    def get_window_count(self, agent_id: str, since_str: str) -> int:
        """Count budget tokens for agent_id after since_str (ISO/SQLite timestamp)."""
        ...

    def record_budget_token(self, agent_id: str) -> None:
        """Record one escalation token consumption for agent_id."""
        ...

    def insert_request(
        self,
        dfid: str,
        agent_id: str,
        reason: str,
        context_json: str,
        proposal_json: str,
        impact: str,
    ) -> None:
        """Persist a new escalation request with PENDING status."""
        ...

    def resolve_request(
        self,
        dfid: str,
        resolved_at: str,
        decision: str,
        proposal_json: Optional[str],
    ) -> None:
        """Mark escalation as RESOLVED with human decision."""
        ...

    def get_pending_requests(self) -> List[Dict[str, Any]]:
        """Return list of pending requests as dicts with
        dfid/agent_id/reason/context/proposal/impact keys."""
        ...

get_pending_requests()

Return list of pending requests as dicts with dfid/agent_id/reason/context/proposal/impact keys.

Source code in src/dir_core/storage/base.py
352
353
354
355
def get_pending_requests(self) -> List[Dict[str, Any]]:
    """Return list of pending requests as dicts with
    dfid/agent_id/reason/context/proposal/impact keys."""
    ...

get_window_count(agent_id, since_str)

Count budget tokens for agent_id after since_str (ISO/SQLite timestamp).

Source code in src/dir_core/storage/base.py
322
323
324
def get_window_count(self, agent_id: str, since_str: str) -> int:
    """Count budget tokens for agent_id after since_str (ISO/SQLite timestamp)."""
    ...

insert_request(dfid, agent_id, reason, context_json, proposal_json, impact)

Persist a new escalation request with PENDING status.

Source code in src/dir_core/storage/base.py
330
331
332
333
334
335
336
337
338
339
340
def insert_request(
    self,
    dfid: str,
    agent_id: str,
    reason: str,
    context_json: str,
    proposal_json: str,
    impact: str,
) -> None:
    """Persist a new escalation request with PENDING status."""
    ...

record_budget_token(agent_id)

Record one escalation token consumption for agent_id.

Source code in src/dir_core/storage/base.py
326
327
328
def record_budget_token(self, agent_id: str) -> None:
    """Record one escalation token consumption for agent_id."""
    ...

resolve_request(dfid, resolved_at, decision, proposal_json)

Mark escalation as RESOLVED with human decision.

Source code in src/dir_core/storage/base.py
342
343
344
345
346
347
348
349
350
def resolve_request(
    self,
    dfid: str,
    resolved_at: str,
    decision: str,
    proposal_json: Optional[str],
) -> None:
    """Mark escalation as RESOLVED with human decision."""
    ...

IdempotencyStorage

Bases: Protocol

Source code in src/dir_core/storage/base.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@runtime_checkable
class IdempotencyStorage(Protocol):
    def get(self, key: str) -> Optional[Dict[str, Any]]:
        """Return cached result for key, or None on cache miss."""
        ...

    def set(self, key: str, result: Dict[str, Any]) -> None:
        """Store result under key.

        Implementations SHOULD persist JSON with the same encoding rules as
        :func:`~dir_core.storage.json_util.dumps_json_dict` (stable key order,
        ``default=str``) so disk backends stay interchangeable.
        """
        ...

get(key)

Return cached result for key, or None on cache miss.

Source code in src/dir_core/storage/base.py
112
113
114
def get(self, key: str) -> Optional[Dict[str, Any]]:
    """Return cached result for key, or None on cache miss."""
    ...

set(key, result)

Store result under key.

Implementations SHOULD persist JSON with the same encoding rules as :func:~dir_core.storage.json_util.dumps_json_dict (stable key order, default=str) so disk backends stay interchangeable.

Source code in src/dir_core/storage/base.py
116
117
118
119
120
121
122
123
def set(self, key: str, result: Dict[str, Any]) -> None:
    """Store result under key.

    Implementations SHOULD persist JSON with the same encoding rules as
    :func:`~dir_core.storage.json_util.dumps_json_dict` (stable key order,
    ``default=str``) so disk backends stay interchangeable.
    """
    ...

IntentRetryStorage

Bases: Protocol

Source code in src/dir_core/storage/base.py
298
299
300
301
302
303
304
305
306
307
308
309
310
@runtime_checkable
class IntentRetryStorage(Protocol):
    def get_count(self, dfid: str) -> int:
        """Return current rejection count for dfid (0 if unseen)."""
        ...

    def set_count(self, dfid: str, count: int) -> None:
        """Persist rejection count for dfid."""
        ...

    def delete(self, dfid: str) -> None:
        """Remove dfid record (called when flow reaches terminal state)."""
        ...

delete(dfid)

Remove dfid record (called when flow reaches terminal state).

Source code in src/dir_core/storage/base.py
308
309
310
def delete(self, dfid: str) -> None:
    """Remove dfid record (called when flow reaches terminal state)."""
    ...

get_count(dfid)

Return current rejection count for dfid (0 if unseen).

Source code in src/dir_core/storage/base.py
300
301
302
def get_count(self, dfid: str) -> int:
    """Return current rejection count for dfid (0 if unseen)."""
    ...

set_count(dfid, count)

Persist rejection count for dfid.

Source code in src/dir_core/storage/base.py
304
305
306
def set_count(self, dfid: str, count: int) -> None:
    """Persist rejection count for dfid."""
    ...

LifecycleStorage

Bases: Protocol

Source code in src/dir_core/storage/base.py
363
364
365
366
367
368
369
@runtime_checkable
class LifecycleStorage(Protocol):
    def record_transition(
        self, dfid: str, from_status: str, to_status: str
    ) -> None:
        """Append a flow transition record."""
        ...

record_transition(dfid, from_status, to_status)

Append a flow transition record.

Source code in src/dir_core/storage/base.py
365
366
367
368
369
def record_transition(
    self, dfid: str, from_status: str, to_status: str
) -> None:
    """Append a flow transition record."""
    ...

MemoryAgentRegistryStorage

In-memory backend for AgentRegistry.

Source code in src/dir_core/storage/memory.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class MemoryAgentRegistryStorage:
    """In-memory backend for AgentRegistry."""

    def __init__(self) -> None:
        self._store: Dict[str, Dict[str, Any]] = {}

    def init_schema(self) -> None:
        pass

    def upsert_agent(
        self,
        agent_id: str,
        contract_json: str,
        priority: int,
        status: str,
        agent_version: Optional[str],
        session_token: Optional[str],
    ) -> None:
        suspension_reason = self._store.get(agent_id, {}).get("suspension_reason")
        self._store[agent_id] = {
            "agent_id": agent_id,
            "contract": json.loads(contract_json) if contract_json else {},
            "priority": priority,
            "status": status,
            "agent_version": agent_version,
            "session_token": session_token,
            "suspension_reason": suspension_reason,
        }

    def get_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
        return self._store.get(agent_id)

    def update_status(
        self, agent_id: str, status: str, suspension_reason: Optional[str]
    ) -> bool:
        if agent_id not in self._store:
            return False
        self._store[agent_id]["status"] = status
        self._store[agent_id]["suspension_reason"] = suspension_reason
        return True

    def get_status(self, agent_id: str) -> Optional[Tuple[str, Optional[str]]]:
        rec = self._store.get(agent_id)
        if rec is None:
            return None
        return (rec["status"], rec.get("suspension_reason"))

    def list_active_agents(self) -> List[str]:
        return [
            aid
            for aid, rec in self._store.items()
            if rec.get("status") == AgentRegistryStatus.ACTIVE
        ]

MemoryContextStorage

In-memory backend for ContextStore.

Source code in src/dir_core/storage/memory.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class MemoryContextStorage:
    """In-memory backend for ContextStore."""

    def __init__(self) -> None:
        self._sessions: Dict[str, str] = {}
        self._states: Dict[str, str] = {}

    def init_schema(self) -> None:
        pass

    def get_session(self, dfid: str) -> Optional[str]:
        return self._sessions.get(dfid)

    def set_session(self, dfid: str, data_json: str) -> None:
        self._sessions[dfid] = data_json

    def get_state(self, agent_id: str) -> Optional[str]:
        return self._states.get(agent_id)

    def set_state(self, agent_id: str, data_json: str) -> None:
        self._states[agent_id] = data_json

MemoryDecisionAuditStorage

In-memory backend for DFID-scoped decision audit rows.

Source code in src/dir_core/storage/memory.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class MemoryDecisionAuditStorage:
    """In-memory backend for DFID-scoped decision audit rows."""

    def __init__(self) -> None:
        self._rows: List[Dict[str, Any]] = []

    def init_schema(self) -> None:
        pass

    def record(
        self,
        dfid: str,
        event: str,
        *,
        step_id: str = "",
        state: str = "",
        details: Optional[Dict[str, Any]] = None,
    ) -> None:
        ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        self._rows.append(
            {
                "dfid": dfid,
                "event": event,
                "timestamp": ts,
                "step_id": step_id,
                "state": state,
                "details": dict(details or {}),
            }
        )

    def events_for_dfid(self, dfid: str) -> List[Dict[str, Any]]:
        return [dict(r) for r in self._rows if r["dfid"] == dfid]

    def all_events_chronological(self) -> List[Dict[str, Any]]:
        return [dict(r) for r in self._rows]

MemoryEscalationStorage

In-memory backend for EscalationManager.

Source code in src/dir_core/storage/memory.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
class MemoryEscalationStorage:
    """In-memory backend for EscalationManager."""

    def __init__(self) -> None:
        self._budget: List[Dict[str, Any]] = []
        self._requests: Dict[str, Dict[str, Any]] = {}

    def init_schema(self) -> None:
        pass

    def get_window_count(self, agent_id: str, since_str: str) -> int:
        since = datetime.fromisoformat(since_str.replace(" ", "T"))
        if since.tzinfo is None:
            since = since.replace(tzinfo=timezone.utc)
        return sum(
            1
            for e in self._budget
            if e["agent_id"] == agent_id and e["created_at"] >= since
        )

    def record_budget_token(self, agent_id: str) -> None:
        self._budget.append(
            {"agent_id": agent_id, "created_at": datetime.now(timezone.utc)}
        )

    def insert_request(
        self,
        dfid: str,
        agent_id: str,
        reason: str,
        context_json: str,
        proposal_json: str,
        impact: str,
    ) -> None:
        self._requests[dfid] = {
            "dfid": dfid,
            "agent_id": agent_id,
            "reason": reason,
            "context": json.loads(context_json or "{}"),
            "proposal": json.loads(proposal_json or "{}"),
            "impact": impact,
            "status": "PENDING",
            "resolved_at": None,
            "human_decision": None,
        }

    def resolve_request(
        self,
        dfid: str,
        resolved_at: str,
        decision: str,
        proposal_json: Optional[str],
    ) -> None:
        if dfid in self._requests:
            req = self._requests[dfid]
            req["status"] = "RESOLVED"
            req["resolved_at"] = resolved_at
            req["human_decision"] = decision
            if proposal_json:
                req["proposal"] = json.loads(proposal_json)

    def get_pending_requests(self) -> List[Dict[str, Any]]:
        return [
            {
                "dfid": req["dfid"],
                "agent_id": req["agent_id"],
                "reason": req["reason"],
                "context": req["context"],
                "proposal": req["proposal"],
                "impact": req["impact"],
            }
            for req in self._requests.values()
            if req["status"] == "PENDING"
        ]

MemoryIdempotencyStorage

In-memory backend for IdempotencyGuard.

Source code in src/dir_core/storage/memory.py
111
112
113
114
115
116
117
118
119
120
121
class MemoryIdempotencyStorage:
    """In-memory backend for IdempotencyGuard."""

    def __init__(self) -> None:
        self._cache: Dict[str, Any] = {}

    def get(self, key: str) -> Optional[Any]:
        return self._cache.get(key)

    def set(self, key: str, result: Any) -> None:
        self._cache[key] = result

MemoryIntentRetryStorage

In-memory backend for IntentRetryGovernor.

Source code in src/dir_core/storage/memory.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class MemoryIntentRetryStorage:
    """In-memory backend for IntentRetryGovernor."""

    def __init__(self) -> None:
        self._counts: Dict[str, int] = {}

    def get_count(self, dfid: str) -> int:
        return self._counts.get(dfid, 0)

    def set_count(self, dfid: str, count: int) -> None:
        self._counts[dfid] = count

    def delete(self, dfid: str) -> None:
        self._counts.pop(dfid, None)

MemoryLifecycleStorage

In-memory backend for lifecycle.transition.

Source code in src/dir_core/storage/memory.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
class MemoryLifecycleStorage:
    """In-memory backend for lifecycle.transition."""

    def __init__(self) -> None:
        self._transitions: List[Dict[str, Any]] = []

    def record_transition(self, dfid: str, from_status: str, to_status: str) -> None:
        self._transitions.append(
            {
                "dfid": dfid,
                "from_status": from_status,
                "to_status": to_status,
                "created_at": datetime.now(timezone.utc).isoformat(),
            }
        )

    def get_transitions(self, dfid: Optional[str] = None) -> List[Dict[str, Any]]:
        """Helper: return all transitions, optionally filtered by dfid."""
        if dfid is None:
            return list(self._transitions)
        return [t for t in self._transitions if t["dfid"] == dfid]

get_transitions(dfid=None)

Helper: return all transitions, optionally filtered by dfid.

Source code in src/dir_core/storage/memory.py
361
362
363
364
365
def get_transitions(self, dfid: Optional[str] = None) -> List[Dict[str, Any]]:
    """Helper: return all transitions, optionally filtered by dfid."""
    if dfid is None:
        return list(self._transitions)
    return [t for t in self._transitions if t["dfid"] == dfid]

MemoryResourceLockStorage

In-memory backend for ResourceLockManager.

Thread-safe via a reentrant lock.

Source code in src/dir_core/storage/memory.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class MemoryResourceLockStorage:
    """In-memory backend for ResourceLockManager.

    Thread-safe via a reentrant lock.
    """

    def __init__(self) -> None:
        self._locks: Dict[str, Dict[str, float]] = {}
        self._mutex = threading.Lock()

    def init_schema(self) -> None:
        pass

    def get_locked_amount(self, resource_id: str, exclude_dfid: str) -> float:
        """Return total locked amount for resource_id (excluding exclude_dfid)."""
        with self._mutex:
            return sum(
                locks.get(resource_id, 0.0)
                for d, locks in self._locks.items()
                if d != exclude_dfid
            )

    def acquire_batch(
        self,
        dfid: str,
        resources: Dict[str, float],
        timeout_sec: float,
    ) -> bool:
        with self._mutex:
            self._locks[dfid] = dict(resources)
            return True

    def release(self, dfid: str) -> None:
        with self._mutex:
            self._locks.pop(dfid, None)

get_locked_amount(resource_id, exclude_dfid)

Return total locked amount for resource_id (excluding exclude_dfid).

Source code in src/dir_core/storage/memory.py
214
215
216
217
218
219
220
221
def get_locked_amount(self, resource_id: str, exclude_dfid: str) -> float:
    """Return total locked amount for resource_id (excluding exclude_dfid)."""
    with self._mutex:
        return sum(
            locks.get(resource_id, 0.0)
            for d, locks in self._locks.items()
            if d != exclude_dfid
        )

MemorySagaStorage

In-memory backend for SagaCompensation.

Source code in src/dir_core/storage/memory.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class MemorySagaStorage:
    """In-memory backend for SagaCompensation."""

    def __init__(self) -> None:
        self._dirty: Dict[str, Dict[str, Any]] = {}

    def init_schema(self) -> None:
        pass

    def mark_dirty(self, dfid: str, failed_step: str, partial_state_json: str) -> None:
        self._dirty[dfid] = {
            "failed_step": failed_step,
            "partial_state": json.loads(partial_state_json or "{}"),
        }

    def get_dirty_flows(self) -> List[str]:
        return list(self._dirty.keys())

    def get_dirty_state(self, dfid: str) -> Optional[Dict[str, Any]]:
        return self._dirty.get(dfid)

    def clear_dirty(self, dfid: str) -> None:
        self._dirty.pop(dfid, None)

ResourceContentionError

Bases: StorageError

Raised when an exclusive lock cannot be acquired within the timeout.

Source code in src/dir_core/storage/base.py
32
33
class ResourceContentionError(StorageError):
    """Raised when an exclusive lock cannot be acquired within the timeout."""

ResourceLockStorage

Bases: Protocol

Source code in src/dir_core/storage/base.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
@runtime_checkable
class ResourceLockStorage(Protocol):
    def init_schema(self) -> None: ...

    def get_locked_amount(self, resource_id: str, exclude_dfid: str) -> float:
        """Return the total reserved amount for resource_id, excluding
        any lock already held by exclude_dfid (so re-acquiring is idempotent).
        """
        ...

    def acquire_batch(
        self,
        dfid: str,
        resources: Dict[str, float],
        timeout_sec: float,
    ) -> bool:
        """Atomically write all locks for dfid.

        The availability check is performed by :class:`ResourceLockManager`
        *before* calling this method.  Implementations must ensure the write
        is atomic so that two concurrent callers cannot both see "enough room"
        and both succeed.

        Args:
            dfid: Flow identifier claiming the locks.
            resources: Mapping of resource_id -> requested amount.
            timeout_sec: Maximum time to wait for exclusive write access.

        Returns:
            ``True`` if all locks were written, ``False`` if exclusive access
            could not be obtained within *timeout_sec*
            (``RESOURCE_CONTENTION_TIMEOUT``).

        Note:
            Implementations that guarantee atomic check-and-set (e.g. a
            Postgres ``INSERT ... WHERE available - locked >= requested``)
            may perform the availability check themselves and raise
            :exc:`InsufficientCapacityError` when capacity is exceeded.
        """
        ...

    def release(self, dfid: str) -> None:
        """Release all locks held by dfid."""
        ...

acquire_batch(dfid, resources, timeout_sec)

Atomically write all locks for dfid.

The availability check is performed by :class:ResourceLockManager before calling this method. Implementations must ensure the write is atomic so that two concurrent callers cannot both see "enough room" and both succeed.

Parameters:

Name Type Description Default
dfid str

Flow identifier claiming the locks.

required
resources Dict[str, float]

Mapping of resource_id -> requested amount.

required
timeout_sec float

Maximum time to wait for exclusive write access.

required

Returns:

Type Description
bool

True if all locks were written, False if exclusive access

bool

could not be obtained within timeout_sec

bool

(RESOURCE_CONTENTION_TIMEOUT).

Note

Implementations that guarantee atomic check-and-set (e.g. a Postgres INSERT ... WHERE available - locked >= requested) may perform the availability check themselves and raise :exc:InsufficientCapacityError when capacity is exceeded.

Source code in src/dir_core/storage/base.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def acquire_batch(
    self,
    dfid: str,
    resources: Dict[str, float],
    timeout_sec: float,
) -> bool:
    """Atomically write all locks for dfid.

    The availability check is performed by :class:`ResourceLockManager`
    *before* calling this method.  Implementations must ensure the write
    is atomic so that two concurrent callers cannot both see "enough room"
    and both succeed.

    Args:
        dfid: Flow identifier claiming the locks.
        resources: Mapping of resource_id -> requested amount.
        timeout_sec: Maximum time to wait for exclusive write access.

    Returns:
        ``True`` if all locks were written, ``False`` if exclusive access
        could not be obtained within *timeout_sec*
        (``RESOURCE_CONTENTION_TIMEOUT``).

    Note:
        Implementations that guarantee atomic check-and-set (e.g. a
        Postgres ``INSERT ... WHERE available - locked >= requested``)
        may perform the availability check themselves and raise
        :exc:`InsufficientCapacityError` when capacity is exceeded.
    """
    ...

get_locked_amount(resource_id, exclude_dfid)

Return the total reserved amount for resource_id, excluding any lock already held by exclude_dfid (so re-acquiring is idempotent).

Source code in src/dir_core/storage/base.py
251
252
253
254
255
def get_locked_amount(self, resource_id: str, exclude_dfid: str) -> float:
    """Return the total reserved amount for resource_id, excluding
    any lock already held by exclude_dfid (so re-acquiring is idempotent).
    """
    ...

release(dfid)

Release all locks held by dfid.

Source code in src/dir_core/storage/base.py
288
289
290
def release(self, dfid: str) -> None:
    """Release all locks held by dfid."""
    ...

SagaStorage

Bases: Protocol

Source code in src/dir_core/storage/base.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@runtime_checkable
class SagaStorage(Protocol):
    def init_schema(self) -> None: ...

    def mark_dirty(self, dfid: str, failed_step: str, partial_state_json: str) -> None:
        """Record dfid as PARTIAL_SUCCESS_DIRTY with the given state snapshot."""
        ...

    def get_dirty_flows(self) -> List[str]:
        """Return all dfids currently in dirty state."""
        ...

    def get_dirty_state(self, dfid: str) -> Optional[Dict[str, Any]]:
        """Return dict with 'failed_step' and 'partial_state' keys, or None."""
        ...

    def clear_dirty(self, dfid: str) -> None:
        """Remove dfid from dirty state after successful compensation."""
        ...

clear_dirty(dfid)

Remove dfid from dirty state after successful compensation.

Source code in src/dir_core/storage/base.py
237
238
239
def clear_dirty(self, dfid: str) -> None:
    """Remove dfid from dirty state after successful compensation."""
    ...

get_dirty_flows()

Return all dfids currently in dirty state.

Source code in src/dir_core/storage/base.py
229
230
231
def get_dirty_flows(self) -> List[str]:
    """Return all dfids currently in dirty state."""
    ...

get_dirty_state(dfid)

Return dict with 'failed_step' and 'partial_state' keys, or None.

Source code in src/dir_core/storage/base.py
233
234
235
def get_dirty_state(self, dfid: str) -> Optional[Dict[str, Any]]:
    """Return dict with 'failed_step' and 'partial_state' keys, or None."""
    ...

mark_dirty(dfid, failed_step, partial_state_json)

Record dfid as PARTIAL_SUCCESS_DIRTY with the given state snapshot.

Source code in src/dir_core/storage/base.py
225
226
227
def mark_dirty(self, dfid: str, failed_step: str, partial_state_json: str) -> None:
    """Record dfid as PARTIAL_SUCCESS_DIRTY with the given state snapshot."""
    ...

SqliteAgentRegistryStorage

SQLite backend for AgentRegistry (DIR §2.3).

Source code in src/dir_core/storage/sqlite.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class SqliteAgentRegistryStorage:
    """SQLite backend for AgentRegistry (DIR §2.3)."""

    def __init__(self, db_path: str) -> None:
        self.db_path = db_path
        self.init_schema()

    def init_schema(self) -> None:
        with _connect(self.db_path) as conn:
            _apply_schema(conn)

    def upsert_agent(
        self,
        agent_id: str,
        contract_json: str,
        priority: int,
        status: str,
        agent_version: Optional[str],
        session_token: Optional[str],
    ) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                """
                INSERT OR REPLACE INTO agent_registry
                (agent_id, contract, priority, status, agent_version, session_token)
                VALUES (?, ?, ?, ?, ?, ?)
                """,
                (
                    agent_id,
                    contract_json,
                    priority,
                    status,
                    agent_version,
                    session_token,
                ),
            )
            conn.commit()

    def get_agent(self, agent_id: str) -> Optional[Dict[str, Any]]:
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT agent_id, contract, priority, status, agent_version, "
                "session_token FROM agent_registry WHERE agent_id = ?",
                (agent_id,),
            )
            row = cursor.fetchone()
            if not row:
                return None
            return {
                "agent_id": row[0],
                "contract": json.loads(row[1]) if row[1] else {},
                "priority": row[2],
                "status": row[3],
                "agent_version": row[4],
                "session_token": row[5],
            }

    def update_status(
        self, agent_id: str, status: str, suspension_reason: Optional[str]
    ) -> bool:
        with _connect(self.db_path) as conn:
            cur = conn.execute(
                """
                UPDATE agent_registry
                SET status = ?, suspension_reason = ?,
                    updated_at = CURRENT_TIMESTAMP
                WHERE agent_id = ?
                """,
                (status, suspension_reason, agent_id),
            )
            conn.commit()
            return cur.rowcount > 0

    def get_status(self, agent_id: str) -> Optional[Tuple[str, Optional[str]]]:
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT status, suspension_reason "
                "FROM agent_registry WHERE agent_id = ?",
                (agent_id,),
            )
            row = cursor.fetchone()
            return (row[0], row[1]) if row else None

    def list_active_agents(self) -> List[str]:
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT agent_id FROM agent_registry WHERE status = ?",
                (AgentRegistryStatus.ACTIVE,),
            )
            return [row[0] for row in cursor.fetchall()]

SqliteContextStorage

SQLite backend for ContextStore (DIR §8).

Source code in src/dir_core/storage/sqlite.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class SqliteContextStorage:
    """SQLite backend for ContextStore (DIR §8)."""

    def __init__(self, db_path: str) -> None:
        self.db_path = db_path
        self.init_schema()

    def init_schema(self) -> None:
        with _connect(self.db_path) as conn:
            _apply_schema(conn)

    def get_session(self, dfid: str) -> Optional[str]:
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT data FROM context_session WHERE dfid = ?", (dfid,)
            )
            row = cursor.fetchone()
            return row[0] if row else None

    def set_session(self, dfid: str, data_json: str) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                "INSERT OR REPLACE INTO context_session (dfid, data) "
                "VALUES (?, ?)",
                (dfid, data_json),
            )
            conn.commit()

    def get_state(self, agent_id: str) -> Optional[str]:
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT data FROM context_state WHERE agent_id = ?",
                (agent_id,),
            )
            row = cursor.fetchone()
            return row[0] if row else None

    def set_state(self, agent_id: str, data_json: str) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                "INSERT OR REPLACE INTO context_state (agent_id, data) "
                "VALUES (?, ?)",
                (agent_id, data_json),
            )
            conn.commit()

SqliteDecisionAuditStorage

SQLite backend for append-only decision_audit_events.

Source code in src/dir_core/storage/sqlite.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
class SqliteDecisionAuditStorage:
    """SQLite backend for append-only decision_audit_events."""

    def __init__(self, db_path: str) -> None:
        self.db_path = db_path
        self.init_schema()

    def init_schema(self) -> None:
        with _connect(self.db_path) as conn:
            _apply_schema(conn)

    def record(
        self,
        dfid: str,
        event: str,
        *,
        step_id: str = "",
        state: str = "",
        details: Optional[Dict[str, Any]] = None,
    ) -> None:
        ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
        payload = dumps_json_dict(details or {})
        with _connect(self.db_path) as conn:
            conn.execute(
                """
                INSERT INTO decision_audit_events
                    (dfid, event, timestamp, step_id, state, detail_json)
                VALUES (?, ?, ?, ?, ?, ?)
                """,
                (dfid, event, ts, step_id, state, payload),
            )
            conn.commit()

    def events_for_dfid(self, dfid: str) -> List[Dict[str, Any]]:
        with _connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute(
                """
                SELECT dfid, event, timestamp, step_id, state, detail_json
                FROM decision_audit_events
                WHERE dfid = ?
                ORDER BY id ASC
                """,
                (dfid,),
            )
            rows = cursor.fetchall()
        out: List[Dict[str, Any]] = []
        for r in rows:
            out.append(
                {
                    "dfid": r["dfid"],
                    "event": r["event"],
                    "timestamp": r["timestamp"],
                    "step_id": r["step_id"],
                    "state": r["state"],
                    "details": json.loads(r["detail_json"] or "{}"),
                }
            )
        return out

    def all_events_chronological(self) -> List[Dict[str, Any]]:
        with _connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute(
                """
                SELECT dfid, event, timestamp, step_id, state, detail_json
                FROM decision_audit_events
                ORDER BY id ASC
                """
            )
            rows = cursor.fetchall()
        out: List[Dict[str, Any]] = []
        for r in rows:
            out.append(
                {
                    "dfid": r["dfid"],
                    "event": r["event"],
                    "timestamp": r["timestamp"],
                    "step_id": r["step_id"],
                    "state": r["state"],
                    "details": json.loads(r["detail_json"] or "{}"),
                }
            )
        return out

SqliteEscalationStorage

SQLite backend for EscalationManager (DIR §9).

Source code in src/dir_core/storage/sqlite.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
class SqliteEscalationStorage:
    """SQLite backend for EscalationManager (DIR §9)."""

    def __init__(self, db_path: str) -> None:
        self.db_path = db_path
        self.init_schema()

    def init_schema(self) -> None:
        with _connect(self.db_path) as conn:
            _apply_schema(conn)

    def get_window_count(self, agent_id: str, since_str: str) -> int:
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT COUNT(*) FROM escalation_budget "
                "WHERE agent_id = ? AND created_at >= ?",
                (agent_id, since_str),
            )
            row = cursor.fetchone()
            return int(row[0]) if row else 0

    def record_budget_token(self, agent_id: str) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO escalation_budget (agent_id) VALUES (?)",
                (agent_id,),
            )
            conn.commit()

    def insert_request(
        self,
        dfid: str,
        agent_id: str,
        reason: str,
        context_json: str,
        proposal_json: str,
        impact: str,
    ) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                """
                INSERT OR REPLACE INTO escalation_requests
                (dfid, agent_id, reason, context_json, proposal_json,
                 impact, status)
                VALUES (?, ?, ?, ?, ?, ?, 'PENDING')
                """,
                (dfid, agent_id, reason, context_json, proposal_json, impact),
            )
            conn.commit()

    def resolve_request(
        self,
        dfid: str,
        resolved_at: str,
        decision: str,
        proposal_json: Optional[str],
    ) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                """
                UPDATE escalation_requests
                SET status = 'RESOLVED', resolved_at = ?,
                    human_decision = ?,
                    proposal_json = COALESCE(?, proposal_json)
                WHERE dfid = ?
                """,
                (resolved_at, decision, proposal_json, dfid),
            )
            conn.commit()

    def get_pending_requests(self) -> List[Dict[str, Any]]:
        with _connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute(
                "SELECT dfid, agent_id, reason, context_json, "
                "proposal_json, impact "
                "FROM escalation_requests WHERE status = 'PENDING'"
            )
            rows = cursor.fetchall()
            return [
                {
                    "dfid": r["dfid"],
                    "agent_id": r["agent_id"],
                    "reason": r["reason"],
                    "context": json.loads(r["context_json"] or "{}"),
                    "proposal": json.loads(r["proposal_json"] or "{}"),
                    "impact": r["impact"],
                }
                for r in rows
            ]

SqliteIdempotencyStorage

SQLite backend for IdempotencyGuard (DIR §7).

Source code in src/dir_core/storage/sqlite.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
class SqliteIdempotencyStorage:
    """SQLite backend for IdempotencyGuard (DIR §7)."""

    def __init__(self, db_path: str) -> None:
        self.db_path = db_path
        self.init_schema()

    def init_schema(self) -> None:
        with _connect(self.db_path) as conn:
            _apply_schema(conn)

    def get(self, key: str) -> Optional[Dict[str, Any]]:
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT result FROM idempotency_cache WHERE key = ?", (key,)
            )
            row = cursor.fetchone()
            return json.loads(row[0]) if row else None

    def set(self, key: str, result: Dict[str, Any]) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                "INSERT OR REPLACE INTO idempotency_cache (key, result) "
                "VALUES (?, ?)",
                (key, dumps_json_dict(result)),
            )
            conn.commit()

SqliteIntentRetryStorage

SQLite backend for IntentRetryGovernor (DIR §6.2).

Source code in src/dir_core/storage/sqlite.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
class SqliteIntentRetryStorage:
    """SQLite backend for IntentRetryGovernor (DIR §6.2)."""

    def __init__(self, db_path: str) -> None:
        self.db_path = db_path
        self.init_schema()

    def init_schema(self) -> None:
        with _connect(self.db_path) as conn:
            _apply_schema(conn)

    def get_count(self, dfid: str) -> int:
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT rejection_count FROM intent_retry WHERE dfid = ?",
                (dfid,),
            )
            row = cursor.fetchone()
            return row[0] if row else 0

    def set_count(self, dfid: str, count: int) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                "INSERT OR REPLACE INTO intent_retry "
                "(dfid, rejection_count, updated_at) "
                "VALUES (?, ?, CURRENT_TIMESTAMP)",
                (dfid, count),
            )
            conn.commit()

    def delete(self, dfid: str) -> None:
        with _connect(self.db_path) as conn:
            conn.execute("DELETE FROM intent_retry WHERE dfid = ?", (dfid,))
            conn.commit()

SqliteLifecycleStorage

SQLite backend for lifecycle.transition (DIR §4.3).

Source code in src/dir_core/storage/sqlite.py
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
class SqliteLifecycleStorage:
    """SQLite backend for lifecycle.transition (DIR §4.3)."""

    def __init__(self, db_path: str) -> None:
        self.db_path = db_path
        self.init_schema()

    def init_schema(self) -> None:
        with _connect(self.db_path) as conn:
            _apply_schema(conn)

    def record_transition(
        self, dfid: str, from_status: str, to_status: str
    ) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO flow_transitions (dfid, from_status, to_status) "
                "VALUES (?, ?, ?)",
                (dfid, from_status, to_status),
            )
            conn.commit()

SqliteResourceLockStorage

SQLite backend for ResourceLockManager (DIR §6.2).

acquire_batch uses BEGIN IMMEDIATE to guarantee that the check-and-insert performed by :class:ResourceLockManager is not interleaved with another concurrent acquire_batch.

Source code in src/dir_core/storage/sqlite.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
class SqliteResourceLockStorage:
    """SQLite backend for ResourceLockManager (DIR §6.2).

    ``acquire_batch`` uses ``BEGIN IMMEDIATE`` to guarantee that the
    check-and-insert performed by :class:`ResourceLockManager` is not
    interleaved with another concurrent ``acquire_batch``.
    """

    def __init__(self, db_path: str) -> None:
        self.db_path = db_path
        self.init_schema()

    def init_schema(self) -> None:
        with _connect(self.db_path) as conn:
            _apply_schema(conn)

    def get_locked_amount(self, resource_id: str, exclude_dfid: str) -> float:
        """Return total locked amount for resource_id (excluding exclude_dfid)."""
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT COALESCE(SUM(amount), 0) FROM resource_locks "
                "WHERE resource_id = ? AND dfid != ?",
                (resource_id, exclude_dfid),
            )
            row = cursor.fetchone()
            return float(row[0]) if row else 0.0

    def acquire_batch(
        self,
        dfid: str,
        resources: Dict[str, float],
        timeout_sec: float,
    ) -> bool:
        """Atomically write all locks using ``BEGIN IMMEDIATE``.

        Returns True if written, False if contention persisted beyond timeout.
        """
        deadline = time.monotonic() + timeout_sec

        while time.monotonic() < deadline:
            try:
                conn = sqlite3.connect(self.db_path, timeout=0.1)
                conn.execute("BEGIN IMMEDIATE")
                try:
                    for rid, amount in resources.items():
                        conn.execute(
                            "INSERT OR REPLACE INTO resource_locks "
                            "(dfid, resource_id, amount) VALUES (?, ?, ?)",
                            (dfid, rid, amount),
                        )
                    conn.commit()
                    conn.close()
                    return True
                except Exception:
                    conn.rollback()
                    conn.close()
                    raise
            except sqlite3.OperationalError:
                time.sleep(0.05)
                continue

        return False

    def release(self, dfid: str) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                "DELETE FROM resource_locks WHERE dfid = ?", (dfid,)
            )
            conn.commit()

acquire_batch(dfid, resources, timeout_sec)

Atomically write all locks using BEGIN IMMEDIATE.

Returns True if written, False if contention persisted beyond timeout.

Source code in src/dir_core/storage/sqlite.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def acquire_batch(
    self,
    dfid: str,
    resources: Dict[str, float],
    timeout_sec: float,
) -> bool:
    """Atomically write all locks using ``BEGIN IMMEDIATE``.

    Returns True if written, False if contention persisted beyond timeout.
    """
    deadline = time.monotonic() + timeout_sec

    while time.monotonic() < deadline:
        try:
            conn = sqlite3.connect(self.db_path, timeout=0.1)
            conn.execute("BEGIN IMMEDIATE")
            try:
                for rid, amount in resources.items():
                    conn.execute(
                        "INSERT OR REPLACE INTO resource_locks "
                        "(dfid, resource_id, amount) VALUES (?, ?, ?)",
                        (dfid, rid, amount),
                    )
                conn.commit()
                conn.close()
                return True
            except Exception:
                conn.rollback()
                conn.close()
                raise
        except sqlite3.OperationalError:
            time.sleep(0.05)
            continue

    return False

get_locked_amount(resource_id, exclude_dfid)

Return total locked amount for resource_id (excluding exclude_dfid).

Source code in src/dir_core/storage/sqlite.py
422
423
424
425
426
427
428
429
430
431
def get_locked_amount(self, resource_id: str, exclude_dfid: str) -> float:
    """Return total locked amount for resource_id (excluding exclude_dfid)."""
    with _connect(self.db_path) as conn:
        cursor = conn.execute(
            "SELECT COALESCE(SUM(amount), 0) FROM resource_locks "
            "WHERE resource_id = ? AND dfid != ?",
            (resource_id, exclude_dfid),
        )
        row = cursor.fetchone()
        return float(row[0]) if row else 0.0

SqliteSagaStorage

SQLite backend for SagaCompensation (DIR §7).

Source code in src/dir_core/storage/sqlite.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
class SqliteSagaStorage:
    """SQLite backend for SagaCompensation (DIR §7)."""

    def __init__(self, db_path: str) -> None:
        self.db_path = db_path
        self.init_schema()

    def init_schema(self) -> None:
        with _connect(self.db_path) as conn:
            _apply_schema(conn)

    def mark_dirty(
        self, dfid: str, failed_step: str, partial_state_json: str
    ) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                """
                INSERT OR REPLACE INTO saga_dirty_state
                (dfid, failed_step, partial_state_json) VALUES (?, ?, ?)
                """,
                (dfid, failed_step, partial_state_json),
            )
            conn.commit()

    def get_dirty_flows(self) -> List[str]:
        with _connect(self.db_path) as conn:
            cursor = conn.execute("SELECT dfid FROM saga_dirty_state")
            return [row[0] for row in cursor.fetchall()]

    def get_dirty_state(self, dfid: str) -> Optional[Dict[str, Any]]:
        with _connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT failed_step, partial_state_json "
                "FROM saga_dirty_state WHERE dfid = ?",
                (dfid,),
            )
            row = cursor.fetchone()
            if not row:
                return None
            return {
                "failed_step": row[0],
                "partial_state": json.loads(row[1] or "{}"),
            }

    def clear_dirty(self, dfid: str) -> None:
        with _connect(self.db_path) as conn:
            conn.execute(
                "DELETE FROM saga_dirty_state WHERE dfid = ?", (dfid,)
            )
            conn.commit()

StorageBundle dataclass

Holds one storage backend instance per dir_core module.

Use :func:sqlite_storage or :func:memory_storage to obtain a bundle, or construct one manually to mix backends.

Source code in src/dir_core/storage/__init__.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@dataclass
class StorageBundle:
    """Holds one storage backend instance per dir_core module.

    Use :func:`sqlite_storage` or :func:`memory_storage` to obtain a bundle,
    or construct one manually to mix backends.
    """

    agent_registry: AgentRegistryStorage
    context: ContextStorage
    idempotency: IdempotencyStorage
    decision_audit: DecisionAuditStorage
    saga: SagaStorage
    resource_lock: ResourceLockStorage
    intent_retry: IntentRetryStorage
    escalation: EscalationStorage
    lifecycle: LifecycleStorage

StorageError

Bases: Exception

Base exception raised by storage backends.

Source code in src/dir_core/storage/base.py
28
29
class StorageError(Exception):
    """Base exception raised by storage backends."""

dumps_json_dict(data, *, sort_keys=True)

Serialize a dict for TEXT / JSONB columns.

Uses stable key order and default=str so values that are not native JSON types (e.g. datetime, UUID) encode consistently in :class:~dir_core.storage.sqlite.SqliteDecisionAuditStorage and in the sample PostgreSQL repository (samples/shared/storage/pg_repo.py).

Source code in src/dir_core/storage/json_util.py
14
15
16
17
18
19
20
21
22
def dumps_json_dict(data: Dict[str, Any], *, sort_keys: bool = True) -> str:
    """Serialize a dict for TEXT / JSONB columns.

    Uses stable key order and ``default=str`` so values that are not native
    JSON types (e.g. ``datetime``, UUID) encode consistently in
    :class:`~dir_core.storage.sqlite.SqliteDecisionAuditStorage` and in the
    sample PostgreSQL repository (``samples/shared/storage/pg_repo.py``).
    """
    return json.dumps(data, sort_keys=sort_keys, default=str)

ensure_db(path, create_tables=None)

Create parent dirs and an empty DB file if needed, then run optional schema callback.

Used by samples that need SQLite before wiring storage backends. Returns the resolved :class:~pathlib.Path to the database file.

Source code in src/dir_core/storage/sqlite.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def ensure_db(
    path: Path | str,
    create_tables: Optional[Callable[[sqlite3.Connection], None]] = None,
) -> Path:
    """Create parent dirs and an empty DB file if needed, then run optional schema callback.

    Used by samples that need SQLite before wiring storage backends.  Returns
    the resolved :class:`~pathlib.Path` to the database file.
    """
    resolved = Path(path).resolve()
    resolved.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(str(resolved))
    try:
        if create_tables is not None:
            create_tables(conn)
        conn.commit()
    finally:
        conn.close()
    return resolved

memory_storage()

Return a :class:StorageBundle backed entirely in process memory.

No data survives process exit. Useful for tests and ephemeral pipelines.

Source code in src/dir_core/storage/__init__.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def memory_storage() -> StorageBundle:
    """Return a :class:`StorageBundle` backed entirely in process memory.

    No data survives process exit. Useful for tests and ephemeral pipelines.
    """
    return StorageBundle(
        agent_registry=MemoryAgentRegistryStorage(),
        context=MemoryContextStorage(),
        idempotency=MemoryIdempotencyStorage(),
        decision_audit=MemoryDecisionAuditStorage(),
        saga=MemorySagaStorage(),
        resource_lock=MemoryResourceLockStorage(),
        intent_retry=MemoryIntentRetryStorage(),
        escalation=MemoryEscalationStorage(),
        lifecycle=MemoryLifecycleStorage(),
    )

sqlite_storage(db_path)

Return a :class:StorageBundle where every module is backed by one SQLite file.

Parameters:

Name Type Description Default
db_path str

Path to the SQLite database file (created if absent).

required
Source code in src/dir_core/storage/__init__.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def sqlite_storage(db_path: str) -> StorageBundle:
    """Return a :class:`StorageBundle` where every module is backed by one SQLite file.

    Args:
        db_path: Path to the SQLite database file (created if absent).
    """
    return StorageBundle(
        agent_registry=SqliteAgentRegistryStorage(db_path),
        context=SqliteContextStorage(db_path),
        idempotency=SqliteIdempotencyStorage(db_path),
        decision_audit=SqliteDecisionAuditStorage(db_path),
        saga=SqliteSagaStorage(db_path),
        resource_lock=SqliteResourceLockStorage(db_path),
        intent_retry=SqliteIntentRetryStorage(db_path),
        escalation=SqliteEscalationStorage(db_path),
        lifecycle=SqliteLifecycleStorage(db_path),
    )