Skip to content

prefect.cache_policies

CacheKeyFnPolicy dataclass

Bases: CachePolicy

This policy accepts a custom function with signature f(task_run_context, task_parameters, flow_parameters) -> str and uses it to compute a task run cache key.

Source code in src/prefect/cache_policies.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@dataclass
class CacheKeyFnPolicy(CachePolicy):
    """
    This policy accepts a custom function with signature f(task_run_context, task_parameters, flow_parameters) -> str
    and uses it to compute a task run cache key.
    """

    # making it optional for tests
    cache_key_fn: Optional[
        Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
    ] = None

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        if self.cache_key_fn:
            return self.cache_key_fn(task_ctx, inputs)

CachePolicy dataclass

Base class for all cache policies.

Source code in src/prefect/cache_policies.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@dataclass
class CachePolicy:
    """
    Base class for all cache policies.
    """

    @classmethod
    def from_cache_key_fn(
        cls, cache_key_fn: Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
    ) -> "CacheKeyFnPolicy":
        """
        Given a function generates a key policy.
        """
        return CacheKeyFnPolicy(cache_key_fn=cache_key_fn)

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        raise NotImplementedError

    def __sub__(self, other: str) -> "CompoundCachePolicy":
        if not isinstance(other, str):
            raise TypeError("Can only subtract strings from key policies.")
        if isinstance(self, Inputs):
            exclude = self.exclude or []
            return Inputs(exclude=exclude + [other])
        elif isinstance(self, CompoundCachePolicy):
            new = Inputs(exclude=[other])
            policies = self.policies or []
            return CompoundCachePolicy(policies=policies + [new])
        else:
            new = Inputs(exclude=[other])
            return CompoundCachePolicy(policies=[self, new])

    def __add__(self, other: "CachePolicy") -> "CompoundCachePolicy":
        # adding _None is a no-op
        if isinstance(other, _None):
            return self
        elif isinstance(self, _None):
            return other

        if isinstance(self, CompoundCachePolicy):
            policies = self.policies or []
            return CompoundCachePolicy(policies=policies + [other])
        elif isinstance(other, CompoundCachePolicy):
            policies = other.policies or []
            return CompoundCachePolicy(policies=policies + [self])
        else:
            return CompoundCachePolicy(policies=[self, other])

from_cache_key_fn(cache_key_fn) classmethod

Given a function generates a key policy.

Source code in src/prefect/cache_policies.py
15
16
17
18
19
20
21
22
@classmethod
def from_cache_key_fn(
    cls, cache_key_fn: Callable[["TaskRunContext", Dict[str, Any]], Optional[str]]
) -> "CacheKeyFnPolicy":
    """
    Given a function generates a key policy.
    """
    return CacheKeyFnPolicy(cache_key_fn=cache_key_fn)

CompoundCachePolicy dataclass

Bases: CachePolicy

This policy is constructed from two or more other cache policies and works by computing the keys for each policy individually, and then hashing a sorted tuple of all computed keys.

Any keys that return None will be ignored.

Source code in src/prefect/cache_policies.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@dataclass
class CompoundCachePolicy(CachePolicy):
    """
    This policy is constructed from two or more other cache policies and works by computing the keys
    for each policy individually, and then hashing a sorted tuple of all computed keys.

    Any keys that return `None` will be ignored.
    """

    policies: Optional[list] = None

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        keys = []
        for policy in self.policies or []:
            policy_key = policy.compute_key(
                task_ctx=task_ctx,
                inputs=inputs,
                flow_parameters=flow_parameters,
                **kwargs,
            )
            if policy_key is not None:
                keys.append(policy_key)
        if not keys:
            return None
        return hash_objects(*keys)

FlowParameters dataclass

Bases: CachePolicy

Policy that computes the cache key based on a hash of the flow parameters.

Source code in src/prefect/cache_policies.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@dataclass
class FlowParameters(CachePolicy):
    """
    Policy that computes the cache key based on a hash of the flow parameters.
    """

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        if not flow_parameters:
            return None
        return hash_objects(flow_parameters)

Inputs dataclass

Bases: CachePolicy

Policy that computes a cache key based on a hash of the runtime inputs provided to the task..

Source code in src/prefect/cache_policies.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@dataclass
class Inputs(CachePolicy):
    """
    Policy that computes a cache key based on a hash of the runtime inputs provided to the task..
    """

    exclude: Optional[list] = None

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        hashed_inputs = {}
        inputs = inputs or {}
        exclude = self.exclude or []

        if not inputs:
            return None

        for key, val in inputs.items():
            if key not in exclude:
                hashed_inputs[key] = val

        return hash_objects(hashed_inputs)

RunId dataclass

Bases: CachePolicy

Returns either the prevailing flow run ID, or if not found, the prevailing task run ID.

Source code in src/prefect/cache_policies.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@dataclass
class RunId(CachePolicy):
    """
    Returns either the prevailing flow run ID, or if not found, the prevailing task
    run ID.
    """

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Dict[str, Any],
        flow_parameters: Dict[str, Any],
        **kwargs,
    ) -> Optional[str]:
        if not task_ctx:
            return None
        run_id = task_ctx.task_run.flow_run_id
        if run_id is None:
            run_id = task_ctx.task_run.id
        return str(run_id)

TaskSource dataclass

Bases: CachePolicy

Policy for computing a cache key based on the source code of the task.

Source code in src/prefect/cache_policies.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@dataclass
class TaskSource(CachePolicy):
    """
    Policy for computing a cache key based on the source code of the task.
    """

    def compute_key(
        self,
        task_ctx: TaskRunContext,
        inputs: Optional[Dict[str, Any]],
        flow_parameters: Optional[Dict[str, Any]],
        **kwargs,
    ) -> Optional[str]:
        if not task_ctx:
            return None
        try:
            lines = inspect.getsource(task_ctx.task)
        except TypeError:
            lines = inspect.getsource(task_ctx.task.fn.__class__)
        except OSError as exc:
            if "could not get source code" in str(exc):
                lines = task_ctx.task.fn.__code__.co_code
            else:
                raise

        return hash_objects(lines)