Skip to content

prefect.server.models.logs

Functions for interacting with log ORM objects. Intended for internal use by the Prefect REST API.

create_logs(db, session, logs) async

Creates new logs

Parameters:

Name Type Description Default
session AsyncSession

a database session

required
logs List[Log]

a list of log schemas

required

Returns:

Type Description
None

None

Source code in src/prefect/server/models/logs.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@db_injector
async def create_logs(
    db: PrefectDBInterface, session: AsyncSession, logs: List[schemas.core.Log]
) -> None:
    """
    Creates new logs

    Args:
        session: a database session
        logs: a list of log schemas

    Returns:
        None
    """
    try:
        await session.execute(
            db.insert(orm_models.Log).values([log.model_dump() for log in logs])
        )
    except RuntimeError as exc:
        if "can't create new thread at interpreter shutdown" in str(exc):
            # Background logs sometimes fail to write when the interpreter is shutting down.
            # This is a known issue in Python 3.12.2 that can be ignored and is fixed in Python 3.12.3.
            # see e.g. https://github.com/python/cpython/issues/113964
            logger.debug("Received event during interpreter shutdown, ignoring")
        else:
            raise

read_logs(session, log_filter, offset=None, limit=None, sort=schemas.sorting.LogSort.TIMESTAMP_ASC) async

Read logs.

Parameters:

Name Type Description Default
session AsyncSession

a database session

required
db

the database interface

required
log_filter LogFilter

only select logs that match these filters

required
offset Optional[int]

Query offset

None
limit Optional[int]

Query limit

None
sort LogSort

Query sort

TIMESTAMP_ASC

Returns:

Type Description
Sequence[Log]

List[orm_models.Log]: the matching logs

Source code in src/prefect/server/models/logs.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
async def read_logs(
    session: AsyncSession,
    log_filter: schemas.filters.LogFilter,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
    sort: schemas.sorting.LogSort = schemas.sorting.LogSort.TIMESTAMP_ASC,
) -> Sequence[orm_models.Log]:
    """
    Read logs.

    Args:
        session: a database session
        db: the database interface
        log_filter: only select logs that match these filters
        offset: Query offset
        limit: Query limit
        sort: Query sort

    Returns:
        List[orm_models.Log]: the matching logs
    """
    query = (
        select(orm_models.Log).order_by(sort.as_sql_sort()).offset(offset).limit(limit)
    )

    if log_filter:
        query = query.where(log_filter.as_sql_filter())

    result = await session.execute(query)
    return result.scalars().unique().all()