Skip to content

prefect.server.api.clients

BaseClient

Source code in src/prefect/server/api/clients.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class BaseClient:
    _http_client: PrefectHttpxAsyncClient

    def __init__(self, additional_headers: Dict[str, str] = {}):
        from prefect.server.api.server import create_app

        # create_app caches application instances, and invoking it with no arguments
        # will point it to the the currently running server instance
        api_app = create_app()

        self._http_client = PrefectHttpxAsyncClient(
            transport=httpx.ASGITransport(app=api_app, raise_app_exceptions=False),
            headers={**additional_headers},
            base_url="http://prefect-in-memory/api",
            enable_csrf_support=False,
            raise_on_all_errors=False,
        )

    async def __aenter__(self) -> Self:
        await self._http_client.__aenter__()
        return self

    async def __aexit__(self, *args):
        await self._http_client.__aexit__(*args)

__init__(additional_headers={})

Source code in src/prefect/server/api/clients.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(self, additional_headers: Dict[str, str] = {}):
    from prefect.server.api.server import create_app

    # create_app caches application instances, and invoking it with no arguments
    # will point it to the the currently running server instance
    api_app = create_app()

    self._http_client = PrefectHttpxAsyncClient(
        transport=httpx.ASGITransport(app=api_app, raise_app_exceptions=False),
        headers={**additional_headers},
        base_url="http://prefect-in-memory/api",
        enable_csrf_support=False,
        raise_on_all_errors=False,
    )

WorkPoolsOrchestrationClient

Bases: BaseClient

Source code in src/prefect/server/api/clients.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class WorkPoolsOrchestrationClient(BaseClient):
    async def __aenter__(self) -> Self:
        return self

    async def read_work_pool(self, work_pool_name: str) -> WorkPool:
        """
        Reads information for a given work pool
        Args:
            work_pool_name: The name of the work pool to for which to get
                information.
        Returns:
            Information about the requested work pool.
        """
        try:
            response = await self._http_client.get(f"/work_pools/{work_pool_name}")
            response.raise_for_status()
            return WorkPool.model_validate(response.json())
        except httpx.HTTPStatusError as e:
            if e.response.status_code == status.HTTP_404_NOT_FOUND:
                raise ObjectNotFound(http_exc=e) from e
            else:
                raise

read_work_pool(work_pool_name) async

Reads information for a given work pool Args: work_pool_name: The name of the work pool to for which to get information. Returns: Information about the requested work pool.

Source code in src/prefect/server/api/clients.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
async def read_work_pool(self, work_pool_name: str) -> WorkPool:
    """
    Reads information for a given work pool
    Args:
        work_pool_name: The name of the work pool to for which to get
            information.
    Returns:
        Information about the requested work pool.
    """
    try:
        response = await self._http_client.get(f"/work_pools/{work_pool_name}")
        response.raise_for_status()
        return WorkPool.model_validate(response.json())
    except httpx.HTTPStatusError as e:
        if e.response.status_code == status.HTTP_404_NOT_FOUND:
            raise ObjectNotFound(http_exc=e) from e
        else:
            raise