Skip to content

Execution Patterns

Saga Compensation

dir_core.saga

Saga Compensation (DIR §7, Topologies §6.4).

Parent-Child flows: mark_dirty on partial failure, deterministic compensation.

CompensationResult dataclass

Result of execute_compensation.

Source code in src/dir_core/saga.py
19
20
21
22
23
24
@dataclass
class CompensationResult:
    """Result of execute_compensation."""

    success: bool
    message: str = ""

SagaCompensation

Manages dirty state and deterministic compensation (DIR §7).

Storage backend is pluggable. Pass storage= for a custom backend, or db_path= to use the built-in SQLite backend (default behaviour).

Parameters:

Name Type Description Default
db_path Optional[str]

Path to SQLite database. Used when storage is not provided.

None
revert_callback Optional[Callable[[str, Dict[str, Any]], bool]]

Called with (dfid, partial_state) on REVERT.

None
close_all_callback Optional[Callable[[str], bool]]

Called with (dfid,) on CLOSE_ALL.

None
alert_human_callback Optional[Callable[[str, Dict[str, Any]], None]]

Called with (dfid, partial_state) on ALERT_HUMAN.

None
storage Optional[SagaStorage]

Custom :class:~dir_core.storage.SagaStorage backend. When provided, db_path is ignored.

None

Raises:

Type Description
ValueError

When neither db_path nor storage is supplied.

Source code in src/dir_core/saga.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class SagaCompensation:
    """Manages dirty state and deterministic compensation (DIR §7).

    Storage backend is pluggable. Pass ``storage=`` for a custom backend, or
    ``db_path=`` to use the built-in SQLite backend (default behaviour).

    Args:
        db_path: Path to SQLite database. Used when ``storage`` is not provided.
        revert_callback: Called with (dfid, partial_state) on REVERT.
        close_all_callback: Called with (dfid,) on CLOSE_ALL.
        alert_human_callback: Called with (dfid, partial_state) on ALERT_HUMAN.
        storage: Custom :class:`~dir_core.storage.SagaStorage` backend.
            When provided, ``db_path`` is ignored.

    Raises:
        ValueError: When neither ``db_path`` nor ``storage`` is supplied.
    """

    def __init__(
        self,
        db_path: Optional[str] = None,
        revert_callback: Optional[Callable[[str, Dict[str, Any]], bool]] = None,
        close_all_callback: Optional[Callable[[str], bool]] = None,
        alert_human_callback: Optional[
            Callable[[str, Dict[str, Any]], None]
        ] = None,
        *,
        storage: Optional[SagaStorage] = None,
    ):
        self.revert_callback = revert_callback
        self.close_all_callback = close_all_callback
        self.alert_human_callback = alert_human_callback

        if storage is not None:
            self._storage: SagaStorage = storage
        elif db_path is not None:
            self.db_path = db_path  # kept for backward compatibility
            self._storage = SqliteSagaStorage(db_path)
        else:
            raise ValueError(
                "Provide either 'db_path' (SQLite) or 'storage' (custom backend)."
            )

    def mark_dirty(
        self,
        dfid: str,
        failed_step: str,
        partial_state: Dict[str, Any],
    ) -> None:
        """Record flow as PARTIAL_SUCCESS_DIRTY after step failure."""
        self._storage.mark_dirty(
            dfid, failed_step, json.dumps(partial_state, default=str)
        )
        logger.warning("Saga dirty: dfid=%s failed_step=%s", dfid, failed_step)

    def get_dirty_flows(self) -> List[str]:
        """Return list of dfids in dirty state."""
        return self._storage.get_dirty_flows()

    def get_dirty_state(self, dfid: str) -> Optional[Dict[str, Any]]:
        """Return partial state for dirty flow."""
        return self._storage.get_dirty_state(dfid)

    def execute_compensation(
        self,
        dfid: str,
        action: CompensationAction,
    ) -> CompensationResult:
        """
        Execute deterministic compensation. Callbacks are domain-specific.
        ALERT_HUMAN triggers escalation callback.
        """
        state = self.get_dirty_state(dfid)
        if not state:
            return CompensationResult(
                success=False, message="Flow not in dirty state"
            )

        if action == CompensationAction.NOOP:
            return CompensationResult(success=True, message="NOOP")

        if action == CompensationAction.REVERT:
            if self.revert_callback:
                ok = self.revert_callback(dfid, state["partial_state"])
                if ok:
                    self._clear_dirty(dfid)
                return CompensationResult(success=ok, message="REVERT")
            return CompensationResult(
                success=False, message="No revert callback"
            )

        if action == CompensationAction.CLOSE_ALL:
            if self.close_all_callback:
                ok = self.close_all_callback(dfid)
                if ok:
                    self._clear_dirty(dfid)
                return CompensationResult(success=ok, message="CLOSE_ALL")
            return CompensationResult(
                success=False, message="No close_all callback"
            )

        if action == CompensationAction.ALERT_HUMAN:
            if self.alert_human_callback:
                self.alert_human_callback(dfid, state["partial_state"])
                return CompensationResult(success=True, message="ALERT_HUMAN")
            return CompensationResult(
                success=False, message="No alert_human callback"
            )

        return CompensationResult(
            success=False, message=f"Unknown action: {action}"
        )

    def _clear_dirty(self, dfid: str) -> None:
        """Remove flow from dirty state after successful compensation."""
        self._storage.clear_dirty(dfid)

execute_compensation(dfid, action)

Execute deterministic compensation. Callbacks are domain-specific. ALERT_HUMAN triggers escalation callback.

Source code in src/dir_core/saga.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def execute_compensation(
    self,
    dfid: str,
    action: CompensationAction,
) -> CompensationResult:
    """
    Execute deterministic compensation. Callbacks are domain-specific.
    ALERT_HUMAN triggers escalation callback.
    """
    state = self.get_dirty_state(dfid)
    if not state:
        return CompensationResult(
            success=False, message="Flow not in dirty state"
        )

    if action == CompensationAction.NOOP:
        return CompensationResult(success=True, message="NOOP")

    if action == CompensationAction.REVERT:
        if self.revert_callback:
            ok = self.revert_callback(dfid, state["partial_state"])
            if ok:
                self._clear_dirty(dfid)
            return CompensationResult(success=ok, message="REVERT")
        return CompensationResult(
            success=False, message="No revert callback"
        )

    if action == CompensationAction.CLOSE_ALL:
        if self.close_all_callback:
            ok = self.close_all_callback(dfid)
            if ok:
                self._clear_dirty(dfid)
            return CompensationResult(success=ok, message="CLOSE_ALL")
        return CompensationResult(
            success=False, message="No close_all callback"
        )

    if action == CompensationAction.ALERT_HUMAN:
        if self.alert_human_callback:
            self.alert_human_callback(dfid, state["partial_state"])
            return CompensationResult(success=True, message="ALERT_HUMAN")
        return CompensationResult(
            success=False, message="No alert_human callback"
        )

    return CompensationResult(
        success=False, message=f"Unknown action: {action}"
    )

get_dirty_flows()

Return list of dfids in dirty state.

Source code in src/dir_core/saga.py
82
83
84
def get_dirty_flows(self) -> List[str]:
    """Return list of dfids in dirty state."""
    return self._storage.get_dirty_flows()

get_dirty_state(dfid)

Return partial state for dirty flow.

Source code in src/dir_core/saga.py
86
87
88
def get_dirty_state(self, dfid: str) -> Optional[Dict[str, Any]]:
    """Return partial state for dirty flow."""
    return self._storage.get_dirty_state(dfid)

mark_dirty(dfid, failed_step, partial_state)

Record flow as PARTIAL_SUCCESS_DIRTY after step failure.

Source code in src/dir_core/saga.py
70
71
72
73
74
75
76
77
78
79
80
def mark_dirty(
    self,
    dfid: str,
    failed_step: str,
    partial_state: Dict[str, Any],
) -> None:
    """Record flow as PARTIAL_SUCCESS_DIRTY after step failure."""
    self._storage.mark_dirty(
        dfid, failed_step, json.dumps(partial_state, default=str)
    )
    logger.warning("Saga dirty: dfid=%s failed_step=%s", dfid, failed_step)

Intent Retry Governor

dir_core.intent_retry

Intent Retry Governor (DIR §6.2).

Limits correction attempts per DFID. After max_retries rejections, flow must be aborted with REASONING_EXHAUSTION to prevent feedback poisoning.

IntentRetryGovernor

Tracks rejection count per DFID; enforces max retries before abort.

Storage backend is pluggable. The default is in-memory when neither db_path nor storage is provided.

Parameters:

Name Type Description Default
max_retries int

Maximum number of allowed rejections before abort.

3
db_path Optional[str]

Path to SQLite database for persistent counting.

None
storage Optional[IntentRetryStorage]

Custom :class:~dir_core.storage.IntentRetryStorage backend. When provided, db_path is ignored.

None
Source code in src/dir_core/intent_retry.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class IntentRetryGovernor:
    """Tracks rejection count per DFID; enforces max retries before abort.

    Storage backend is pluggable. The default is in-memory when neither
    ``db_path`` nor ``storage`` is provided.

    Args:
        max_retries: Maximum number of allowed rejections before abort.
        db_path: Path to SQLite database for persistent counting.
        storage: Custom :class:`~dir_core.storage.IntentRetryStorage` backend.
            When provided, ``db_path`` is ignored.
    """

    def __init__(
        self,
        max_retries: int = 3,
        db_path: Optional[str] = None,
        *,
        storage: Optional[IntentRetryStorage] = None,
    ):
        self.max_retries = max_retries

        if storage is not None:
            self._storage: IntentRetryStorage = storage
        elif db_path is not None:
            self.db_path = db_path  # kept for backward compatibility
            self._storage = SqliteIntentRetryStorage(db_path)
        else:
            self._storage = MemoryIntentRetryStorage()

    def record_rejection(self, dfid: str) -> int:
        """Increment rejection count for dfid; return new count."""
        count = self._storage.get_count(dfid) + 1
        self._storage.set_count(dfid, count)
        logger.debug("Intent retry: dfid=%s count=%d", dfid, count)
        return count

    def should_abort(self, dfid: str) -> bool:
        """True if count >= max_retries (flow must be aborted)."""
        return self._storage.get_count(dfid) >= self.max_retries

    def reset(self, dfid: str) -> None:
        """Clear rejection count when flow ends (CLOSED/ABORTED)."""
        self._storage.delete(dfid)

record_rejection(dfid)

Increment rejection count for dfid; return new count.

Source code in src/dir_core/intent_retry.py
51
52
53
54
55
56
def record_rejection(self, dfid: str) -> int:
    """Increment rejection count for dfid; return new count."""
    count = self._storage.get_count(dfid) + 1
    self._storage.set_count(dfid, count)
    logger.debug("Intent retry: dfid=%s count=%d", dfid, count)
    return count

reset(dfid)

Clear rejection count when flow ends (CLOSED/ABORTED).

Source code in src/dir_core/intent_retry.py
62
63
64
def reset(self, dfid: str) -> None:
    """Clear rejection count when flow ends (CLOSED/ABORTED)."""
    self._storage.delete(dfid)

should_abort(dfid)

True if count >= max_retries (flow must be aborted).

Source code in src/dir_core/intent_retry.py
58
59
60
def should_abort(self, dfid: str) -> bool:
    """True if count >= max_retries (flow must be aborted)."""
    return self._storage.get_count(dfid) >= self.max_retries