Skip to content

prefect.runner.utils

inject_schemas_into_openapi(webserver, schemas_to_inject)

Augments the webserver's OpenAPI schema with additional schemas from deployments / flows / tasks.

Parameters:

Name Type Description Default
webserver FastAPI

The FastAPI instance representing the webserver.

required
schemas_to_inject Dict[str, Any]

A dictionary of OpenAPI schemas to integrate.

required

Returns:

Type Description
Dict[str, Any]

The augmented OpenAPI schema dictionary.

Source code in src/prefect/runner/utils.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def inject_schemas_into_openapi(
    webserver: FastAPI, schemas_to_inject: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Augments the webserver's OpenAPI schema with additional schemas from deployments / flows / tasks.

    Args:
        webserver: The FastAPI instance representing the webserver.
        schemas_to_inject: A dictionary of OpenAPI schemas to integrate.

    Returns:
        The augmented OpenAPI schema dictionary.
    """
    openapi_schema = get_openapi(
        title="FastAPI Prefect Runner", version=PREFECT_VERSION, routes=webserver.routes
    )

    augmented_schema = merge_definitions(schemas_to_inject, openapi_schema)
    return update_refs_to_components(augmented_schema)

merge_definitions(injected_schemas, openapi_schema)

Integrates definitions from injected schemas into the OpenAPI components.

Parameters:

Name Type Description Default
injected_schemas Dict[str, Any]

A dictionary of deployment-specific schemas.

required
openapi_schema Dict[str, Any]

The base OpenAPI schema to update.

required
Source code in src/prefect/runner/utils.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def merge_definitions(
    injected_schemas: Dict[str, Any], openapi_schema: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Integrates definitions from injected schemas into the OpenAPI components.

    Args:
        injected_schemas: A dictionary of deployment-specific schemas.
        openapi_schema: The base OpenAPI schema to update.
    """
    openapi_schema_copy = deepcopy(openapi_schema)
    components = openapi_schema_copy.setdefault("components", {}).setdefault(
        "schemas", {}
    )
    for definitions in injected_schemas.values():
        if "definitions" in definitions:
            for def_name, def_schema in definitions["definitions"].items():
                def_schema_copy = deepcopy(def_schema)
                update_refs_in_schema(def_schema_copy, "#/components/schemas/")
                components[def_name] = def_schema_copy
    return openapi_schema_copy

update_refs_in_schema(schema_item, new_ref)

Recursively replaces $ref with a new reference base in a schema item.

Parameters:

Name Type Description Default
schema_item Any

A schema or part of a schema to update references in.

required
new_ref str

The new base string to replace in $ref values.

required
Source code in src/prefect/runner/utils.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def update_refs_in_schema(schema_item: Any, new_ref: str) -> None:
    """
    Recursively replaces `$ref` with a new reference base in a schema item.

    Args:
        schema_item: A schema or part of a schema to update references in.
        new_ref: The new base string to replace in `$ref` values.
    """
    if isinstance(schema_item, dict):
        if "$ref" in schema_item:
            schema_item["$ref"] = schema_item["$ref"].replace("#/definitions/", new_ref)
        for value in schema_item.values():
            update_refs_in_schema(value, new_ref)
    elif isinstance(schema_item, list):
        for item in schema_item:
            update_refs_in_schema(item, new_ref)

update_refs_to_components(openapi_schema)

Updates all $ref fields in the OpenAPI schema to reference the components section.

Parameters:

Name Type Description Default
openapi_schema Dict[str, Any]

The OpenAPI schema to modify $ref fields in.

required
Source code in src/prefect/runner/utils.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def update_refs_to_components(openapi_schema: Dict[str, Any]) -> Dict[str, Any]:
    """
    Updates all `$ref` fields in the OpenAPI schema to reference the components section.

    Args:
        openapi_schema: The OpenAPI schema to modify `$ref` fields in.
    """
    for path_item in openapi_schema.get("paths", {}).values():
        for operation in path_item.values():
            schema = (
                operation.get("requestBody", {})
                .get("content", {})
                .get("application/json", {})
                .get("schema", {})
            )
            update_refs_in_schema(schema, "#/components/schemas/")

    for definition in openapi_schema.get("definitions", {}).values():
        update_refs_in_schema(definition, "#/components/schemas/")

    return openapi_schema