Skip to content

prefect.cache_policies

CacheKeyFnPolicy dataclass

Bases: CachePolicy

This policy accepts a custom function with signature f(task_run_context, task_parameters, flow_parameters) -> str and uses it to compute a task run cache key.

Source code in src/prefect/cache_policies.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@dataclass
class CacheKeyFnPolicy(CachePolicy):
    """
    This policy accepts a custom function with signature f(task_run_context, task_parameters, flow_parameters) -> str
    and uses it to compute a task run cache key.
    """

    # making it optional for tests
    cache_key_fn: Optional[
        Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
    ] = None

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        if self.cache_key_fn:
            return self.cache_key_fn(task_ctx, inputs)

CachePolicy dataclass

Base class for all cache policies.

Source code in src/prefect/cache_policies.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@dataclass
class CachePolicy:
    """
    Base class for all cache policies.
    """

    key_storage: Union["WritableFileSystem", str, Path, None] = None
    isolation_level: Union[
        Literal["READ_COMMITTED", "SERIALIZABLE"],
        "IsolationLevel",
        None,
    ] = None
    lock_manager: Optional["LockManager"] = None

    @classmethod
    def from_cache_key_fn(
        cls, cache_key_fn: Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
    ) -> "CacheKeyFnPolicy":
        """
        Given a function generates a key policy.
        """
        return CacheKeyFnPolicy(cache_key_fn=cache_key_fn)

    def configure(
        self,
        key_storage: Union["WritableFileSystem", str, Path, None] = None,
        lock_manager: Optional["LockManager"] = None,
        isolation_level: Union[
            Literal["READ_COMMITTED", "SERIALIZABLE"], "IsolationLevel", None
        ] = None,
    ) -> Self:
        """
        Configure the cache policy with the given key storage, lock manager, and isolation level.

        Args:
            key_storage: The storage to use for cache keys. If not provided,
                the current key storage will be used.
            lock_manager: The lock manager to use for the cache policy. If not provided,
                the current lock manager will be used.
            isolation_level: The isolation level to use for the cache policy. If not provided,
                the current isolation level will be used.

        Returns:
            A new cache policy with the given key storage, lock manager, and isolation level.
        """
        new = deepcopy(self)
        if key_storage is not None:
            new.key_storage = key_storage
        if lock_manager is not None:
            new.lock_manager = lock_manager
        if isolation_level is not None:
            new.isolation_level = isolation_level
        return new

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        raise NotImplementedError

    def __sub__(self, other: str) -> "CachePolicy":
        if not isinstance(other, str):
            raise TypeError("Can only subtract strings from key policies.")
        new = Inputs(exclude=[other])
        return CompoundCachePolicy(policies=[self, new])

    def __add__(self, other: "CachePolicy") -> "CachePolicy":
        # adding _None is a no-op
        if isinstance(other, _None):
            return self

        if (
            other.key_storage is not None
            and self.key_storage is not None
            and other.key_storage != self.key_storage
        ):
            raise ValueError(
                "Cannot add CachePolicies with different storage locations."
            )
        if (
            other.isolation_level is not None
            and self.isolation_level is not None
            and other.isolation_level != self.isolation_level
        ):
            raise ValueError(
                "Cannot add CachePolicies with different isolation levels."
            )
        if (
            other.lock_manager is not None
            and self.lock_manager is not None
            and other.lock_manager != self.lock_manager
        ):
            raise ValueError(
                "Cannot add CachePolicies with different lock implementations."
            )

        return CompoundCachePolicy(
            policies=[self, other],
            key_storage=self.key_storage or other.key_storage,
            isolation_level=self.isolation_level or other.isolation_level,
            lock_manager=self.lock_manager or other.lock_manager,
        )

configure(key_storage=None, lock_manager=None, isolation_level=None)

Configure the cache policy with the given key storage, lock manager, and isolation level.

Parameters:

Name Type Description Default
key_storage Union[WritableFileSystem, str, Path, None]

The storage to use for cache keys. If not provided, the current key storage will be used.

None
lock_manager Optional[LockManager]

The lock manager to use for the cache policy. If not provided, the current lock manager will be used.

None
isolation_level Union[Literal['READ_COMMITTED', 'SERIALIZABLE'], IsolationLevel, None]

The isolation level to use for the cache policy. If not provided, the current isolation level will be used.

None

Returns:

Type Description
Self

A new cache policy with the given key storage, lock manager, and isolation level.

Source code in src/prefect/cache_policies.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def configure(
    self,
    key_storage: Union["WritableFileSystem", str, Path, None] = None,
    lock_manager: Optional["LockManager"] = None,
    isolation_level: Union[
        Literal["READ_COMMITTED", "SERIALIZABLE"], "IsolationLevel", None
    ] = None,
) -> Self:
    """
    Configure the cache policy with the given key storage, lock manager, and isolation level.

    Args:
        key_storage: The storage to use for cache keys. If not provided,
            the current key storage will be used.
        lock_manager: The lock manager to use for the cache policy. If not provided,
            the current lock manager will be used.
        isolation_level: The isolation level to use for the cache policy. If not provided,
            the current isolation level will be used.

    Returns:
        A new cache policy with the given key storage, lock manager, and isolation level.
    """
    new = deepcopy(self)
    if key_storage is not None:
        new.key_storage = key_storage
    if lock_manager is not None:
        new.lock_manager = lock_manager
    if isolation_level is not None:
        new.isolation_level = isolation_level
    return new

from_cache_key_fn(cache_key_fn) classmethod

Given a function generates a key policy.

Source code in src/prefect/cache_policies.py
32
33
34
35
36
37
38
39
@classmethod
def from_cache_key_fn(
    cls, cache_key_fn: Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
) -> "CacheKeyFnPolicy":
    """
    Given a function generates a key policy.
    """
    return CacheKeyFnPolicy(cache_key_fn=cache_key_fn)

CompoundCachePolicy dataclass

Bases: CachePolicy

This policy is constructed from two or more other cache policies and works by computing the keys for each policy individually, and then hashing a sorted tuple of all computed keys.

Any keys that return None will be ignored.

Source code in src/prefect/cache_policies.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@dataclass
class CompoundCachePolicy(CachePolicy):
    """
    This policy is constructed from two or more other cache policies and works by computing the keys
    for each policy individually, and then hashing a sorted tuple of all computed keys.

    Any keys that return `None` will be ignored.
    """

    policies: List[CachePolicy] = field(default_factory=list)

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        keys = []
        for policy in self.policies:
            policy_key = policy.compute_key(
                task_ctx=task_ctx,
                inputs=inputs,
                flow_parameters=flow_parameters,
                **kwargs,
            )
            if policy_key is not None:
                keys.append(policy_key)
        if not keys:
            return None
        return hash_objects(*keys, raise_on_failure=True)

FlowParameters dataclass

Bases: CachePolicy

Policy that computes the cache key based on a hash of the flow parameters.

Source code in src/prefect/cache_policies.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@dataclass
class FlowParameters(CachePolicy):
    """
    Policy that computes the cache key based on a hash of the flow parameters.
    """

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        if not flow_parameters:
            return None
        return hash_objects(flow_parameters, raise_on_failure=True)

Inputs dataclass

Bases: CachePolicy

Policy that computes a cache key based on a hash of the runtime inputs provided to the task..

Source code in src/prefect/cache_policies.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
@dataclass
class Inputs(CachePolicy):
    """
    Policy that computes a cache key based on a hash of the runtime inputs provided to the task..
    """

    exclude: List[str] = field(default_factory=list)

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        hashed_inputs = {}
        inputs = inputs or {}
        exclude = self.exclude or []

        if not inputs:
            return None

        for key, val in inputs.items():
            if key not in exclude:
                hashed_inputs[key] = val

        return hash_objects(hashed_inputs, raise_on_failure=True)

    def __sub__(self, other: str) -> "CachePolicy":
        if not isinstance(other, str):
            raise TypeError("Can only subtract strings from key policies.")
        return Inputs(exclude=self.exclude + [other])

RunId dataclass

Bases: CachePolicy

Returns either the prevailing flow run ID, or if not found, the prevailing task run ID.

Source code in src/prefect/cache_policies.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
@dataclass
class RunId(CachePolicy):
    """
    Returns either the prevailing flow run ID, or if not found, the prevailing task
    run ID.
    """

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        if not task_ctx:
            return None
        run_id = task_ctx.task_run.flow_run_id
        if run_id is None:
            run_id = task_ctx.task_run.id
        return str(run_id)

TaskSource dataclass

Bases: CachePolicy

Policy for computing a cache key based on the source code of the task.

Source code in src/prefect/cache_policies.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@dataclass
class TaskSource(CachePolicy):
    """
    Policy for computing a cache key based on the source code of the task.
    """

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Optional[Dict[str, Any]],
        flow_parameters: Optional[Dict[str, Any]],
        **kwargs,
    ) -> Optional[str]:
        if not task_ctx:
            return None
        try:
            lines = inspect.getsource(task_ctx.task)
        except TypeError:
            lines = inspect.getsource(task_ctx.task.fn.__class__)
        except OSError as exc:
            if "source code" in str(exc):
                lines = task_ctx.task.fn.__code__.co_code
            else:
                raise

        return hash_objects(lines, raise_on_failure=True)