Skip to content

prefect.server.api.dependencies

Utilities for injecting FastAPI dependencies.

EnforceMinimumAPIVersion

FastAPI Dependency used to check compatibility between the version of the api and a given request.

Looks for the header 'X-PREFECT-API-VERSION' in the request and compares it to the api's version. Rejects requests that are lower than the minimum version.

Source code in src/prefect/server/api/dependencies.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class EnforceMinimumAPIVersion:
    """
    FastAPI Dependency used to check compatibility between the version of the api
    and a given request.

    Looks for the header 'X-PREFECT-API-VERSION' in the request and compares it
    to the api's version. Rejects requests that are lower than the minimum version.
    """

    def __init__(self, minimum_api_version: str, logger: logging.Logger):
        self.minimum_api_version = minimum_api_version
        versions = [int(v) for v in minimum_api_version.split(".")]
        self.api_major = versions[0]
        self.api_minor = versions[1]
        self.api_patch = versions[2]
        self.logger = logger

    async def __call__(
        self,
        x_prefect_api_version: str = Header(None),
    ):
        request_version = x_prefect_api_version

        # if no version header, assume latest and continue
        if not request_version:
            return

        # parse version
        try:
            major, minor, patch = [int(v) for v in request_version.split(".")]
        except ValueError:
            await self._notify_of_invalid_value(request_version)
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=(
                    "Invalid X-PREFECT-API-VERSION header format."
                    f"Expected header in format 'x.y.z' but received {request_version}"
                ),
            )

        if (major, minor, patch) < (self.api_major, self.api_minor, self.api_patch):
            await self._notify_of_outdated_version(request_version)
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail=(
                    f"The request specified API version {request_version} but this "
                    f"server requires version {self.minimum_api_version} or higher."
                ),
            )

    async def _notify_of_invalid_value(self, request_version: str):
        self.logger.error(
            f"Invalid X-PREFECT-API-VERSION header format: '{request_version}'"
        )

    async def _notify_of_outdated_version(self, request_version: str):
        self.logger.error(
            f"X-PREFECT-API-VERSION header specifies version '{request_version}' "
            f"but minimum allowed version is '{self.minimum_api_version}'"
        )

LimitBody()

A fastapi.Depends factory for pulling a limit: int parameter from the request body while determining the default from the current settings.

Source code in src/prefect/server/api/dependencies.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def LimitBody() -> Depends:
    """
    A `fastapi.Depends` factory for pulling a `limit: int` parameter from the
    request body while determining the default from the current settings.
    """

    def get_limit(
        limit: int = Body(
            None,
            description="Defaults to PREFECT_API_DEFAULT_LIMIT if not provided.",
        ),
    ):
        default_limit = PREFECT_API_DEFAULT_LIMIT.value()
        limit = limit if limit is not None else default_limit
        if not limit >= 0:
            raise HTTPException(
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
                detail="Invalid limit: must be greater than or equal to 0.",
            )
        if limit > default_limit:
            raise HTTPException(
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
                detail=f"Invalid limit: must be less than or equal to {default_limit}.",
            )
        return limit

    return Depends(get_limit)

get_created_by(prefect_automation_id=Header(None, include_in_schema=False), prefect_automation_name=Header(None, include_in_schema=False))

A dependency that returns the provenance information to use when creating objects during this API call.

Source code in src/prefect/server/api/dependencies.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def get_created_by(
    prefect_automation_id: Optional[UUID] = Header(None, include_in_schema=False),
    prefect_automation_name: Optional[str] = Header(None, include_in_schema=False),
) -> Optional[schemas.core.CreatedBy]:
    """A dependency that returns the provenance information to use when creating objects
    during this API call."""
    if prefect_automation_id and prefect_automation_name:
        try:
            display_value = b64decode(prefect_automation_name.encode()).decode()
        except Exception:
            display_value = None

        if display_value:
            return schemas.core.CreatedBy(
                id=prefect_automation_id,
                type="AUTOMATION",
                display_value=display_value,
            )

    return None

get_updated_by(prefect_automation_id=Header(None, include_in_schema=False), prefect_automation_name=Header(None, include_in_schema=False))

A dependency that returns the provenance information to use when updating objects during this API call.

Source code in src/prefect/server/api/dependencies.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_updated_by(
    prefect_automation_id: Optional[UUID] = Header(None, include_in_schema=False),
    prefect_automation_name: Optional[str] = Header(None, include_in_schema=False),
) -> Optional[schemas.core.UpdatedBy]:
    """A dependency that returns the provenance information to use when updating objects
    during this API call."""
    if prefect_automation_id and prefect_automation_name:
        return schemas.core.UpdatedBy(
            id=prefect_automation_id,
            type="AUTOMATION",
            display_value=prefect_automation_name,
        )

    return None

is_ephemeral_request(request)

A dependency that returns whether the request is to an ephemeral server.

Source code in src/prefect/server/api/dependencies.py
165
166
167
168
169
def is_ephemeral_request(request: Request):
    """
    A dependency that returns whether the request is to an ephemeral server.
    """
    return "ephemeral-prefect" in str(request.base_url)