Skip to content

prefect.server.models.csrf_token

create_or_update_csrf_token(db, session, client) async

Create or update a CSRF token for a client. If the client already has a token, it will be updated.

Parameters:

Name Type Description Default
session AsyncSession

The database session

required
client str

The client identifier

required

Returns:

Type Description
CsrfToken

core.CsrfToken: The CSRF token

Source code in src/prefect/server/models/csrf_token.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@db_injector
async def create_or_update_csrf_token(
    db: PrefectDBInterface,
    session: AsyncSession,
    client: str,
) -> core.CsrfToken:
    """Create or update a CSRF token for a client. If the client already has a
    token, it will be updated.

    Args:
        session (AsyncSession): The database session
        client (str): The client identifier

    Returns:
        core.CsrfToken: The CSRF token
    """

    expiration = (
        datetime.now(timezone.utc)
        + settings.PREFECT_SERVER_CSRF_TOKEN_EXPIRATION.value()
    )
    token = secrets.token_hex(32)

    await session.execute(
        db.insert(orm_models.CsrfToken)
        .values(
            client=client,
            token=token,
            expiration=expiration,
        )
        .on_conflict_do_update(
            index_elements=[orm_models.CsrfToken.client],
            set_={"token": token, "expiration": expiration},
        ),
    )

    # Return the created / updated token object
    token = await read_token_for_client(session=session, client=client)
    assert token

    return token

delete_expired_tokens(session) async

Delete expired CSRF tokens.

Parameters:

Name Type Description Default
session AsyncSession

The database session

required

Returns:

Name Type Description
int int

The number of tokens deleted

Source code in src/prefect/server/models/csrf_token.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
async def delete_expired_tokens(session: AsyncSession) -> int:
    """Delete expired CSRF tokens.

    Args:
        session (AsyncSession): The database session

    Returns:
        int: The number of tokens deleted
    """

    result = await session.execute(
        sa.delete(orm_models.CsrfToken).where(
            orm_models.CsrfToken.expiration < datetime.now(timezone.utc)
        )
    )
    return result.rowcount

read_token_for_client(session, client) async

Read a CSRF token for a client.

Parameters:

Name Type Description Default
session AsyncSession

The database session

required
client str

The client identifier

required

Returns:

Type Description
Optional[CsrfToken]

Optional[core.CsrfToken]: The CSRF token, if it exists and is not expired.

Source code in src/prefect/server/models/csrf_token.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
async def read_token_for_client(
    session: AsyncSession,
    client: str,
) -> Optional[core.CsrfToken]:
    """Read a CSRF token for a client.

    Args:
        session (AsyncSession): The database session
        client (str): The client identifier

    Returns:
        Optional[core.CsrfToken]: The CSRF token, if it exists and is not
            expired.
    """
    token = (
        await session.execute(
            sa.select(orm_models.CsrfToken).where(
                sa.and_(
                    orm_models.CsrfToken.expiration > datetime.now(timezone.utc),
                    orm_models.CsrfToken.client == client,
                )
            )
        )
    ).scalar_one_or_none()

    if token is None:
        return None

    return core.CsrfToken.model_validate(token, from_attributes=True)