Skip to content

prefect.logging

LogEavesdropper

Bases: Handler

A context manager that collects logs for the duration of the context

Example:

    ```python
    import logging
    from prefect.logging import LogEavesdropper

    with LogEavesdropper("my_logger") as eavesdropper:
        logging.getLogger("my_logger").info("Hello, world!")
        logging.getLogger("my_logger.child_module").info("Another one!")

    print(eavesdropper.text())

    # Outputs: "Hello, world!

Another one!"

Source code in src/prefect/logging/loggers.py
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class LogEavesdropper(logging.Handler):
    """A context manager that collects logs for the duration of the context

    Example:

        ```python
        import logging
        from prefect.logging import LogEavesdropper

        with LogEavesdropper("my_logger") as eavesdropper:
            logging.getLogger("my_logger").info("Hello, world!")
            logging.getLogger("my_logger.child_module").info("Another one!")

        print(eavesdropper.text())

        # Outputs: "Hello, world!\nAnother one!"
    """

    _target_logger: logging.Logger
    _lines: List[str]

    def __init__(self, eavesdrop_on: str, level: int = logging.NOTSET):
        """
        Args:
            eavesdrop_on (str): the name of the logger to eavesdrop on
            level (int): the minimum log level to eavesdrop on; if omitted, all levels
                are captured
        """

        super().__init__(level=level)
        self.eavesdrop_on = eavesdrop_on
        self._target_logger = None

        # It's important that we use a very minimalistic formatter for use cases where
        # we may present these logs back to the user.  We shouldn't leak filenames,
        # versions, or other environmental information.
        self.formatter = logging.Formatter("[%(levelname)s]: %(message)s")

    def __enter__(self) -> Self:
        self._target_logger = logging.getLogger(self.eavesdrop_on)
        self._original_level = self._target_logger.level
        self._target_logger.level = self.level
        self._target_logger.addHandler(self)
        self._lines = []
        return self

    def __exit__(self, *_):
        if self._target_logger:
            self._target_logger.removeHandler(self)
            self._target_logger.level = self._original_level

    def emit(self, record: LogRecord) -> None:
        """The logging.Handler implementation, not intended to be called directly."""
        self._lines.append(self.format(record))

    def text(self) -> str:
        """Return the collected logs as a single newline-delimited string"""
        return "\n".join(self._lines)

__init__(eavesdrop_on, level=logging.NOTSET)

Parameters:

Name Type Description Default
eavesdrop_on str

the name of the logger to eavesdrop on

required
level int

the minimum log level to eavesdrop on; if omitted, all levels are captured

NOTSET
Source code in src/prefect/logging/loggers.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def __init__(self, eavesdrop_on: str, level: int = logging.NOTSET):
    """
    Args:
        eavesdrop_on (str): the name of the logger to eavesdrop on
        level (int): the minimum log level to eavesdrop on; if omitted, all levels
            are captured
    """

    super().__init__(level=level)
    self.eavesdrop_on = eavesdrop_on
    self._target_logger = None

    # It's important that we use a very minimalistic formatter for use cases where
    # we may present these logs back to the user.  We shouldn't leak filenames,
    # versions, or other environmental information.
    self.formatter = logging.Formatter("[%(levelname)s]: %(message)s")

emit(record)

The logging.Handler implementation, not intended to be called directly.

Source code in src/prefect/logging/loggers.py
336
337
338
def emit(self, record: LogRecord) -> None:
    """The logging.Handler implementation, not intended to be called directly."""
    self._lines.append(self.format(record))

text()

Return the collected logs as a single newline-delimited string

Source code in src/prefect/logging/loggers.py
340
341
342
def text(self) -> str:
    """Return the collected logs as a single newline-delimited string"""
    return "\n".join(self._lines)

get_logger(name=None) cached

Get a prefect logger. These loggers are intended for internal use within the prefect package.

See get_run_logger for retrieving loggers for use within task or flow runs. By default, only run-related loggers are connected to the APILogHandler.

Source code in src/prefect/logging/loggers.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@lru_cache()
def get_logger(name: Optional[str] = None) -> logging.Logger:
    """
    Get a `prefect` logger. These loggers are intended for internal use within the
    `prefect` package.

    See `get_run_logger` for retrieving loggers for use within task or flow runs.
    By default, only run-related loggers are connected to the `APILogHandler`.
    """
    parent_logger = logging.getLogger("prefect")

    if name:
        # Append the name if given but allow explicit full names e.g. "prefect.test"
        # should not become "prefect.prefect.test"
        if not name.startswith(parent_logger.name + "."):
            logger = parent_logger.getChild(name)
        else:
            logger = logging.getLogger(name)
    else:
        logger = parent_logger

    # Prevent the current API key from being logged in plain text
    obfuscate_api_key_filter = ObfuscateApiKeyFilter()
    logger.addFilter(obfuscate_api_key_filter)

    return logger

get_run_logger(context=None, **kwargs)

Get a Prefect logger for the current task run or flow run.

The logger will be named either prefect.task_runs or prefect.flow_runs. Contextual data about the run will be attached to the log records.

These loggers are connected to the APILogHandler by default to send log records to the API.

Parameters:

Name Type Description Default
context Optional[RunContext]

A specific context may be provided as an override. By default, the context is inferred from global state and this should not be needed.

None
**kwargs str

Additional keyword arguments will be attached to the log records in addition to the run metadata

{}

Raises:

Type Description
MissingContextError

If no context can be found

Source code in src/prefect/logging/loggers.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def get_run_logger(
    context: Optional["RunContext"] = None, **kwargs: str
) -> Union[logging.Logger, logging.LoggerAdapter]:
    """
    Get a Prefect logger for the current task run or flow run.

    The logger will be named either `prefect.task_runs` or `prefect.flow_runs`.
    Contextual data about the run will be attached to the log records.

    These loggers are connected to the `APILogHandler` by default to send log records to
    the API.

    Arguments:
        context: A specific context may be provided as an override. By default, the
            context is inferred from global state and this should not be needed.
        **kwargs: Additional keyword arguments will be attached to the log records in
            addition to the run metadata

    Raises:
        MissingContextError: If no context can be found
    """
    # Check for existing contexts
    task_run_context = prefect.context.TaskRunContext.get()
    flow_run_context = prefect.context.FlowRunContext.get()

    # Apply the context override
    if context:
        if isinstance(context, prefect.context.FlowRunContext):
            flow_run_context = context
        elif isinstance(context, prefect.context.TaskRunContext):
            task_run_context = context
        else:
            raise TypeError(
                f"Received unexpected type {type(context).__name__!r} for context. "
                "Expected one of 'None', 'FlowRunContext', or 'TaskRunContext'."
            )

    # Determine if this is a task or flow run logger
    if task_run_context:
        logger = task_run_logger(
            task_run=task_run_context.task_run,
            task=task_run_context.task,
            flow_run=flow_run_context.flow_run if flow_run_context else None,
            flow=flow_run_context.flow if flow_run_context else None,
            **kwargs,
        )
    elif flow_run_context:
        logger = flow_run_logger(
            flow_run=flow_run_context.flow_run, flow=flow_run_context.flow, **kwargs
        )
    elif (
        get_logger("prefect.flow_run").disabled
        and get_logger("prefect.task_run").disabled
    ):
        logger = logging.getLogger("null")
    else:
        raise MissingContextError("There is no active flow or task run context.")

    return logger