Skip to content

prefect.context

Async and thread safe models for passing runtime context data.

These contexts should never be directly mutated by the user.

For more user-accessible information about the current run, see prefect.runtime.

AsyncClientContext

Bases: ContextModel

A context for managing the async Prefect client instances.

Clients were formerly tracked on the TaskRunContext and FlowRunContext, but having two separate places and the addition of both sync and async clients made it difficult to manage. This context is intended to be the single source for async clients.

The client creates an async client, which can either be read directly from the context object OR loaded with get_client, inject_client, or other Prefect utilities.

with AsyncClientContext.get_or_create() as ctx: c1 = get_client(sync_client=False) c2 = get_client(sync_client=False) assert c1 is c2 assert c1 is ctx.client

Source code in src/prefect/context.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
class AsyncClientContext(ContextModel):
    """
    A context for managing the async Prefect client instances.

    Clients were formerly tracked on the TaskRunContext and FlowRunContext, but
    having two separate places and the addition of both sync and async clients
    made it difficult to manage. This context is intended to be the single
    source for async clients.

    The client creates an async client, which can either be read directly from
    the context object OR loaded with get_client, inject_client, or other
    Prefect utilities.

    with AsyncClientContext.get_or_create() as ctx:
        c1 = get_client(sync_client=False)
        c2 = get_client(sync_client=False)
        assert c1 is c2
        assert c1 is ctx.client
    """

    __var__ = ContextVar("async-client-context")
    client: PrefectClient
    _httpx_settings: Optional[dict[str, Any]] = PrivateAttr(None)
    _context_stack: int = PrivateAttr(0)

    def __init__(self, httpx_settings: Optional[dict[str, Any]] = None):
        super().__init__(
            client=get_client(sync_client=False, httpx_settings=httpx_settings),
        )
        self._httpx_settings = httpx_settings
        self._context_stack = 0

    async def __aenter__(self):
        self._context_stack += 1
        if self._context_stack == 1:
            await self.client.__aenter__()
            await self.client.raise_for_api_version_mismatch()
            return super().__enter__()
        else:
            return self

    async def __aexit__(self, *exc_info):
        self._context_stack -= 1
        if self._context_stack == 0:
            await self.client.__aexit__(*exc_info)
            return super().__exit__(*exc_info)

    @classmethod
    @asynccontextmanager
    async def get_or_create(cls) -> AsyncGenerator[Self, None]:
        ctx = cls.get()
        if ctx:
            yield ctx
        else:
            async with cls() as ctx:
                yield ctx

ContextModel

Bases: BaseModel

A base model for context data that forbids mutation and extra data while providing a context manager

Source code in src/prefect/context.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class ContextModel(BaseModel):
    """
    A base model for context data that forbids mutation and extra data while providing
    a context manager
    """

    # The context variable for storing data must be defined by the child class
    __var__: ContextVar
    _token: Optional[Token] = PrivateAttr(None)
    model_config = ConfigDict(
        arbitrary_types_allowed=True,
        extra="forbid",
    )

    def __enter__(self) -> Self:
        if self._token is not None:
            raise RuntimeError(
                "Context already entered. Context enter calls cannot be nested."
            )
        self._token = self.__var__.set(self)
        return self

    def __exit__(self, *_):
        if not self._token:
            raise RuntimeError(
                "Asymmetric use of context. Context exit called without an enter."
            )
        self.__var__.reset(self._token)
        self._token = None

    @classmethod
    def get(cls: Type[Self]) -> Optional[Self]:
        """Get the current context instance"""
        return cls.__var__.get(None)

    def model_copy(
        self: Self, *, update: Optional[Dict[str, Any]] = None, deep: bool = False
    ):
        """
        Duplicate the context model, optionally choosing which fields to include, exclude, or change.

        Attributes:
            include: Fields to include in new model.
            exclude: Fields to exclude from new model, as with values this takes precedence over include.
            update: Values to change/add in the new model. Note: the data is not validated before creating
                the new model - you should trust this data.
            deep: Set to `True` to make a deep copy of the model.

        Returns:
            A new model instance.
        """
        new = super().model_copy(update=update, deep=deep)
        # Remove the token on copy to avoid re-entrance errors
        new._token = None
        return new

    def serialize(self, include_secrets: bool = True) -> Dict[str, Any]:
        """
        Serialize the context model to a dictionary that can be pickled with cloudpickle.
        """
        return self.model_dump(
            exclude_unset=True, context={"include_secrets": include_secrets}
        )

get() classmethod

Get the current context instance

Source code in src/prefect/context.py
147
148
149
150
@classmethod
def get(cls: Type[Self]) -> Optional[Self]:
    """Get the current context instance"""
    return cls.__var__.get(None)

model_copy(*, update=None, deep=False)

Duplicate the context model, optionally choosing which fields to include, exclude, or change.

Attributes:

Name Type Description
include

Fields to include in new model.

exclude

Fields to exclude from new model, as with values this takes precedence over include.

update

Values to change/add in the new model. Note: the data is not validated before creating the new model - you should trust this data.

deep

Set to True to make a deep copy of the model.

Returns:

Type Description

A new model instance.

Source code in src/prefect/context.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def model_copy(
    self: Self, *, update: Optional[Dict[str, Any]] = None, deep: bool = False
):
    """
    Duplicate the context model, optionally choosing which fields to include, exclude, or change.

    Attributes:
        include: Fields to include in new model.
        exclude: Fields to exclude from new model, as with values this takes precedence over include.
        update: Values to change/add in the new model. Note: the data is not validated before creating
            the new model - you should trust this data.
        deep: Set to `True` to make a deep copy of the model.

    Returns:
        A new model instance.
    """
    new = super().model_copy(update=update, deep=deep)
    # Remove the token on copy to avoid re-entrance errors
    new._token = None
    return new

serialize(include_secrets=True)

Serialize the context model to a dictionary that can be pickled with cloudpickle.

Source code in src/prefect/context.py
173
174
175
176
177
178
179
def serialize(self, include_secrets: bool = True) -> Dict[str, Any]:
    """
    Serialize the context model to a dictionary that can be pickled with cloudpickle.
    """
    return self.model_dump(
        exclude_unset=True, context={"include_secrets": include_secrets}
    )

EngineContext

Bases: RunContext

The context for a flow run. Data in this context is only available from within a flow run function.

Attributes:

Name Type Description
flow Optional[Flow]

The flow instance associated with the run

flow_run Optional[FlowRun]

The API metadata for the flow run

task_runner TaskRunner

The task runner instance being used for the flow run

task_run_futures TaskRunner

A list of futures for task runs submitted within this flow run

task_run_states TaskRunner

A list of states for task runs created within this flow run

task_run_results Mapping[int, State]

A mapping of result ids to task run states for this flow run

flow_run_states Mapping[int, State]

A list of states for flow runs created within this flow run

Source code in src/prefect/context.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
class EngineContext(RunContext):
    """
    The context for a flow run. Data in this context is only available from within a
    flow run function.

    Attributes:
        flow: The flow instance associated with the run
        flow_run: The API metadata for the flow run
        task_runner: The task runner instance being used for the flow run
        task_run_futures: A list of futures for task runs submitted within this flow run
        task_run_states: A list of states for task runs created within this flow run
        task_run_results: A mapping of result ids to task run states for this flow run
        flow_run_states: A list of states for flow runs created within this flow run
    """

    flow: Optional["Flow"] = None
    flow_run: Optional[FlowRun] = None
    task_runner: TaskRunner
    log_prints: bool = False
    parameters: Optional[Dict[str, Any]] = None

    # Flag signaling if the flow run context has been serialized and sent
    # to remote infrastructure.
    detached: bool = False

    # Result handling
    result_store: ResultStore
    persist_result: bool = Field(default_factory=get_default_persist_setting)

    # Counter for task calls allowing unique
    task_run_dynamic_keys: Dict[str, int] = Field(default_factory=dict)

    # Counter for flow pauses
    observed_flow_pauses: Dict[str, int] = Field(default_factory=dict)

    # Tracking for result from task runs in this flow run for dependency tracking
    # Holds the ID of the object returned by the task run and task run state
    task_run_results: Mapping[int, State] = Field(default_factory=dict)

    # Events worker to emit events
    events: Optional[EventsWorker] = None

    __var__: ContextVar = ContextVar("flow_run")

    def serialize(self):
        return self.model_dump(
            include={
                "flow_run",
                "flow",
                "parameters",
                "log_prints",
                "start_time",
                "input_keyset",
                "result_store",
                "persist_result",
            },
            exclude_unset=True,
        )

RunContext

Bases: ContextModel

The base context for a flow or task run. Data in this context will always be available when get_run_context is called.

Attributes:

Name Type Description
start_time DateTime

The time the run context was entered

client Union[PrefectClient, SyncPrefectClient]

The Prefect client instance being used for API communication

Source code in src/prefect/context.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
class RunContext(ContextModel):
    """
    The base context for a flow or task run. Data in this context will always be
    available when `get_run_context` is called.

    Attributes:
        start_time: The time the run context was entered
        client: The Prefect client instance being used for API communication
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        start_client_metrics_server()

    start_time: DateTime = Field(default_factory=lambda: pendulum.now("UTC"))
    input_keyset: Optional[Dict[str, Dict[str, str]]] = None
    client: Union[PrefectClient, SyncPrefectClient]

    def serialize(self):
        return self.model_dump(
            include={"start_time", "input_keyset"},
            exclude_unset=True,
        )

SettingsContext

Bases: ContextModel

The context for a Prefect settings.

This allows for safe concurrent access and modification of settings.

Attributes:

Name Type Description
profile Profile

The profile that is in use.

settings Settings

The complete settings model.

Source code in src/prefect/context.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
class SettingsContext(ContextModel):
    """
    The context for a Prefect settings.

    This allows for safe concurrent access and modification of settings.

    Attributes:
        profile: The profile that is in use.
        settings: The complete settings model.
    """

    profile: Profile
    settings: Settings

    __var__: ContextVar = ContextVar("settings")

    def __hash__(self) -> int:
        return hash(self.settings)

    @classmethod
    def get(cls) -> "SettingsContext":
        # Return the global context instead of `None` if no context exists
        return super().get() or GLOBAL_SETTINGS_CONTEXT

SyncClientContext

Bases: ContextModel

A context for managing the sync Prefect client instances.

Clients were formerly tracked on the TaskRunContext and FlowRunContext, but having two separate places and the addition of both sync and async clients made it difficult to manage. This context is intended to be the single source for sync clients.

The client creates a sync client, which can either be read directly from the context object OR loaded with get_client, inject_client, or other Prefect utilities.

with SyncClientContext.get_or_create() as ctx: c1 = get_client(sync_client=True) c2 = get_client(sync_client=True) assert c1 is c2 assert c1 is ctx.client

Source code in src/prefect/context.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class SyncClientContext(ContextModel):
    """
    A context for managing the sync Prefect client instances.

    Clients were formerly tracked on the TaskRunContext and FlowRunContext, but
    having two separate places and the addition of both sync and async clients
    made it difficult to manage. This context is intended to be the single
    source for sync clients.

    The client creates a sync client, which can either be read directly from
    the context object OR loaded with get_client, inject_client, or other
    Prefect utilities.

    with SyncClientContext.get_or_create() as ctx:
        c1 = get_client(sync_client=True)
        c2 = get_client(sync_client=True)
        assert c1 is c2
        assert c1 is ctx.client
    """

    __var__ = ContextVar("sync-client-context")
    client: SyncPrefectClient
    _httpx_settings: Optional[dict[str, Any]] = PrivateAttr(None)
    _context_stack: int = PrivateAttr(0)

    def __init__(self, httpx_settings: Optional[dict[str, Any]] = None):
        super().__init__(
            client=get_client(sync_client=True, httpx_settings=httpx_settings),
        )
        self._httpx_settings = httpx_settings
        self._context_stack = 0

    def __enter__(self):
        self._context_stack += 1
        if self._context_stack == 1:
            self.client.__enter__()
            self.client.raise_for_api_version_mismatch()
            return super().__enter__()
        else:
            return self

    def __exit__(self, *exc_info):
        self._context_stack -= 1
        if self._context_stack == 0:
            self.client.__exit__(*exc_info)
            return super().__exit__(*exc_info)

    @classmethod
    @contextmanager
    def get_or_create(cls) -> Generator["SyncClientContext", None, None]:
        ctx = SyncClientContext.get()
        if ctx:
            yield ctx
        else:
            with SyncClientContext() as ctx:
                yield ctx

TagsContext

Bases: ContextModel

The context for prefect.tags management.

Attributes:

Name Type Description
current_tags Set[str]

A set of current tags in the context

Source code in src/prefect/context.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
class TagsContext(ContextModel):
    """
    The context for `prefect.tags` management.

    Attributes:
        current_tags: A set of current tags in the context
    """

    current_tags: Set[str] = Field(default_factory=set)

    @classmethod
    def get(cls) -> "TagsContext":
        # Return an empty `TagsContext` instead of `None` if no context exists
        return cls.__var__.get(TagsContext())

    __var__: ContextVar = ContextVar("tags")

TaskRunContext

Bases: RunContext

The context for a task run. Data in this context is only available from within a task run function.

Attributes:

Name Type Description
task Task

The task instance associated with the task run

task_run TaskRun

The API metadata for this task run

Source code in src/prefect/context.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
class TaskRunContext(RunContext):
    """
    The context for a task run. Data in this context is only available from within a
    task run function.

    Attributes:
        task: The task instance associated with the task run
        task_run: The API metadata for this task run
    """

    task: "Task"
    task_run: TaskRun
    log_prints: bool = False
    parameters: Dict[str, Any]

    # Result handling
    result_store: ResultStore
    persist_result: bool = Field(default_factory=get_default_persist_setting_for_tasks)

    __var__ = ContextVar("task_run")

    def serialize(self):
        return self.model_dump(
            include={
                "task_run",
                "task",
                "parameters",
                "log_prints",
                "start_time",
                "input_keyset",
                "result_store",
                "persist_result",
            },
            exclude_unset=True,
        )

get_run_context()

Get the current run context from within a task or flow function.

Returns:

Type Description
Union[FlowRunContext, TaskRunContext]

A FlowRunContext or TaskRunContext depending on the function type.

Raises RuntimeError: If called outside of a flow or task run.

Source code in src/prefect/context.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def get_run_context() -> Union[FlowRunContext, TaskRunContext]:
    """
    Get the current run context from within a task or flow function.

    Returns:
        A `FlowRunContext` or `TaskRunContext` depending on the function type.

    Raises
        RuntimeError: If called outside of a flow or task run.
    """
    task_run_ctx = TaskRunContext.get()
    if task_run_ctx:
        return task_run_ctx

    flow_run_ctx = FlowRunContext.get()
    if flow_run_ctx:
        return flow_run_ctx

    raise MissingContextError(
        "No run context available. You are not in a flow or task run context."
    )

get_settings_context()

Get the current settings context which contains profile information and the settings that are being used.

Generally, the settings that are being used are a combination of values from the profile and environment. See prefect.context.use_profile for more details.

Source code in src/prefect/context.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def get_settings_context() -> SettingsContext:
    """
    Get the current settings context which contains profile information and the
    settings that are being used.

    Generally, the settings that are being used are a combination of values from the
    profile and environment. See `prefect.context.use_profile` for more details.
    """
    settings_ctx = SettingsContext.get()

    if not settings_ctx:
        raise MissingContextError("No settings context found.")

    return settings_ctx

root_settings_context()

Return the settings context that will exist as the root context for the module.

The profile to use is determined with the following precedence - Command line via 'prefect --profile ' - Environment variable via 'PREFECT_PROFILE' - Profiles file via the 'active' key

Source code in src/prefect/context.py
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
def root_settings_context():
    """
    Return the settings context that will exist as the root context for the module.

    The profile to use is determined with the following precedence
    - Command line via 'prefect --profile <name>'
    - Environment variable via 'PREFECT_PROFILE'
    - Profiles file via the 'active' key
    """
    profiles = prefect.settings.load_profiles()
    active_name = profiles.active_name
    profile_source = "in the profiles file"

    if "PREFECT_PROFILE" in os.environ:
        active_name = os.environ["PREFECT_PROFILE"]
        profile_source = "by environment variable"

    if (
        sys.argv[0].endswith("/prefect")
        and len(sys.argv) >= 3
        and sys.argv[1] == "--profile"
    ):
        active_name = sys.argv[2]
        profile_source = "by command line argument"

    if active_name not in profiles.names:
        print(
            (
                f"WARNING: Active profile {active_name!r} set {profile_source} not "
                "found. The default profile will be used instead. "
            ),
            file=sys.stderr,
        )
        active_name = "ephemeral"

    if not (settings := Settings()).home.exists():
        try:
            settings.home.mkdir(mode=0o0700, exist_ok=True)
        except OSError:
            warnings.warn(
                (f"Failed to create the Prefect home directory at {settings.home}"),
                stacklevel=2,
            )

    return SettingsContext(profile=profiles[active_name], settings=settings)

serialize_context()

Serialize the current context for use in a remote execution environment.

Source code in src/prefect/context.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def serialize_context() -> Dict[str, Any]:
    """
    Serialize the current context for use in a remote execution environment.
    """

    flow_run_context = EngineContext.get()
    task_run_context = TaskRunContext.get()
    tags_context = TagsContext.get()
    settings_context = SettingsContext.get()

    return {
        "flow_run_context": flow_run_context.serialize() if flow_run_context else {},
        "task_run_context": task_run_context.serialize() if task_run_context else {},
        "tags_context": tags_context.serialize() if tags_context else {},
        "settings_context": settings_context.serialize() if settings_context else {},
    }

tags(*new_tags)

Context manager to add tags to flow and task run calls.

Tags are always combined with any existing tags.

Yields:

Type Description
Set[str]

The current set of tags

Examples:

>>> from prefect import tags, task, flow
>>> @task
>>> def my_task():
>>>     pass

Run a task with tags

>>> @flow
>>> def my_flow():
>>>     with tags("a", "b"):
>>>         my_task()  # has tags: a, b

Run a flow with tags

>>> @flow
>>> def my_flow():
>>>     pass
>>> with tags("a", "b"):
>>>     my_flow()  # has tags: a, b

Run a task with nested tag contexts

>>> @flow
>>> def my_flow():
>>>     with tags("a", "b"):
>>>         with tags("c", "d"):
>>>             my_task()  # has tags: a, b, c, d
>>>         my_task()  # has tags: a, b

Inspect the current tags

>>> @flow
>>> def my_flow():
>>>     with tags("c", "d"):
>>>         with tags("e", "f") as current_tags:
>>>              print(current_tags)
>>> with tags("a", "b"):
>>>     my_flow()
{"a", "b", "c", "d", "e", "f"}
Source code in src/prefect/context.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
@contextmanager
def tags(*new_tags: str) -> Generator[Set[str], None, None]:
    """
    Context manager to add tags to flow and task run calls.

    Tags are always combined with any existing tags.

    Yields:
        The current set of tags

    Examples:
        >>> from prefect import tags, task, flow
        >>> @task
        >>> def my_task():
        >>>     pass

        Run a task with tags

        >>> @flow
        >>> def my_flow():
        >>>     with tags("a", "b"):
        >>>         my_task()  # has tags: a, b

        Run a flow with tags

        >>> @flow
        >>> def my_flow():
        >>>     pass
        >>> with tags("a", "b"):
        >>>     my_flow()  # has tags: a, b

        Run a task with nested tag contexts

        >>> @flow
        >>> def my_flow():
        >>>     with tags("a", "b"):
        >>>         with tags("c", "d"):
        >>>             my_task()  # has tags: a, b, c, d
        >>>         my_task()  # has tags: a, b

        Inspect the current tags

        >>> @flow
        >>> def my_flow():
        >>>     with tags("c", "d"):
        >>>         with tags("e", "f") as current_tags:
        >>>              print(current_tags)
        >>> with tags("a", "b"):
        >>>     my_flow()
        {"a", "b", "c", "d", "e", "f"}
    """
    current_tags = TagsContext.get().current_tags
    _new_tags = current_tags.union(new_tags)
    with TagsContext(current_tags=_new_tags):
        yield _new_tags

use_profile(profile, override_environment_variables=False, include_current_context=True)

Switch to a profile for the duration of this context.

Profile contexts are confined to an async context in a single thread.

Parameters:

Name Type Description Default
profile Union[Profile, str]

The name of the profile to load or an instance of a Profile.

required
override_environment_variable

If set, variables in the profile will take precedence over current environment variables. By default, environment variables will override profile settings.

required
include_current_context bool

If set, the new settings will be constructed with the current settings context as a base. If not set, the use_base settings will be loaded from the environment and defaults.

True

Yields:

Type Description

The created SettingsContext object

Source code in src/prefect/context.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
@contextmanager
def use_profile(
    profile: Union[Profile, str],
    override_environment_variables: bool = False,
    include_current_context: bool = True,
):
    """
    Switch to a profile for the duration of this context.

    Profile contexts are confined to an async context in a single thread.

    Args:
        profile: The name of the profile to load or an instance of a Profile.
        override_environment_variable: If set, variables in the profile will take
            precedence over current environment variables. By default, environment
            variables will override profile settings.
        include_current_context: If set, the new settings will be constructed
            with the current settings context as a base. If not set, the use_base settings
            will be loaded from the environment and defaults.

    Yields:
        The created `SettingsContext` object
    """
    if isinstance(profile, str):
        profiles = prefect.settings.load_profiles()
        profile = profiles[profile]

    if not isinstance(profile, Profile):
        raise TypeError(
            f"Unexpected type {type(profile).__name__!r} for `profile`. "
            "Expected 'str' or 'Profile'."
        )

    # Create a copy of the profiles settings as we will mutate it
    profile_settings = profile.settings.copy()
    existing_context = SettingsContext.get()
    if existing_context and include_current_context:
        settings = existing_context.settings
    else:
        settings = Settings()

    if not override_environment_variables:
        for key in os.environ:
            if key in _get_settings_fields(Settings):
                profile_settings.pop(_get_settings_fields(Settings)[key], None)

    new_settings = settings.copy_with_update(updates=profile_settings)

    with SettingsContext(profile=profile, settings=new_settings) as ctx:
        yield ctx