Skip to content

prefect.server.api.flows

Routes for interacting with flow objects.

count_flows(flows=None, flow_runs=None, task_runs=None, deployments=None, work_pools=None, db=Depends(provide_database_interface)) async

Count flows.

Source code in src/prefect/server/api/flows.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@router.post("/count")
async def count_flows(
    flows: schemas.filters.FlowFilter = None,
    flow_runs: schemas.filters.FlowRunFilter = None,
    task_runs: schemas.filters.TaskRunFilter = None,
    deployments: schemas.filters.DeploymentFilter = None,
    work_pools: schemas.filters.WorkPoolFilter = None,
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> int:
    """
    Count flows.
    """
    async with db.session_context() as session:
        return await models.flows.count_flows(
            session=session,
            flow_filter=flows,
            flow_run_filter=flow_runs,
            task_run_filter=task_runs,
            deployment_filter=deployments,
            work_pool_filter=work_pools,
        )

create_flow(flow, response, db=Depends(provide_database_interface)) async

Gracefully creates a new flow from the provided schema. If a flow with the same name already exists, the existing flow is returned.

Source code in src/prefect/server/api/flows.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@router.post("/")
async def create_flow(
    flow: schemas.actions.FlowCreate,
    response: Response,
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> schemas.core.Flow:
    """Gracefully creates a new flow from the provided schema. If a flow with the
    same name already exists, the existing flow is returned.
    """
    # hydrate the input model into a full flow model
    flow = schemas.core.Flow(**flow.model_dump())

    now = pendulum.now("UTC")

    async with db.session_context(begin_transaction=True) as session:
        model = await models.flows.create_flow(session=session, flow=flow)

    if model.created >= now:
        response.status_code = status.HTTP_201_CREATED
    return model

delete_flow(flow_id=Path(..., description='The flow id', alias='id'), db=Depends(provide_database_interface)) async

Delete a flow by id.

Source code in src/prefect/server/api/flows.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_flow(
    flow_id: UUID = Path(..., description="The flow id", alias="id"),
    db: PrefectDBInterface = Depends(provide_database_interface),
):
    """
    Delete a flow by id.
    """
    async with db.session_context(begin_transaction=True) as session:
        result = await models.flows.delete_flow(session=session, flow_id=flow_id)
    if not result:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )

paginate_flows(limit=dependencies.LimitBody(), page=Body(1, ge=1), flows=None, flow_runs=None, task_runs=None, deployments=None, work_pools=None, sort=Body(schemas.sorting.FlowSort.NAME_ASC), db=Depends(provide_database_interface)) async

Pagination query for flows.

Source code in src/prefect/server/api/flows.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@router.post("/paginate")
async def paginate_flows(
    limit: int = dependencies.LimitBody(),
    page: int = Body(1, ge=1),
    flows: Optional[schemas.filters.FlowFilter] = None,
    flow_runs: Optional[schemas.filters.FlowRunFilter] = None,
    task_runs: Optional[schemas.filters.TaskRunFilter] = None,
    deployments: Optional[schemas.filters.DeploymentFilter] = None,
    work_pools: Optional[schemas.filters.WorkPoolFilter] = None,
    sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> FlowPaginationResponse:
    """
    Pagination query for flows.
    """
    offset = (page - 1) * limit

    async with db.session_context() as session:
        results = await models.flows.read_flows(
            session=session,
            flow_filter=flows,
            flow_run_filter=flow_runs,
            task_run_filter=task_runs,
            deployment_filter=deployments,
            work_pool_filter=work_pools,
            sort=sort,
            offset=offset,
            limit=limit,
        )

        count = await models.flows.count_flows(
            session=session,
            flow_filter=flows,
            flow_run_filter=flow_runs,
            task_run_filter=task_runs,
            deployment_filter=deployments,
            work_pool_filter=work_pools,
        )

    return FlowPaginationResponse(
        results=results,
        count=count,
        limit=limit,
        pages=(count + limit - 1) // limit,
        page=page,
    )

read_flow(flow_id=Path(..., description='The flow id', alias='id'), db=Depends(provide_database_interface)) async

Get a flow by id.

Source code in src/prefect/server/api/flows.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@router.get("/{id}")
async def read_flow(
    flow_id: UUID = Path(..., description="The flow id", alias="id"),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> schemas.core.Flow:
    """
    Get a flow by id.
    """
    async with db.session_context() as session:
        flow = await models.flows.read_flow(session=session, flow_id=flow_id)
    if not flow:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )
    return flow

read_flow_by_name(name=Path(..., description='The name of the flow'), db=Depends(provide_database_interface)) async

Get a flow by name.

Source code in src/prefect/server/api/flows.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@router.get("/name/{name}")
async def read_flow_by_name(
    name: str = Path(..., description="The name of the flow"),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> schemas.core.Flow:
    """
    Get a flow by name.
    """
    async with db.session_context() as session:
        flow = await models.flows.read_flow_by_name(session=session, name=name)
    if not flow:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )
    return flow

read_flows(limit=dependencies.LimitBody(), offset=Body(0, ge=0), flows=None, flow_runs=None, task_runs=None, deployments=None, work_pools=None, sort=Body(schemas.sorting.FlowSort.NAME_ASC), db=Depends(provide_database_interface)) async

Query for flows.

Source code in src/prefect/server/api/flows.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@router.post("/filter")
async def read_flows(
    limit: int = dependencies.LimitBody(),
    offset: int = Body(0, ge=0),
    flows: schemas.filters.FlowFilter = None,
    flow_runs: schemas.filters.FlowRunFilter = None,
    task_runs: schemas.filters.TaskRunFilter = None,
    deployments: schemas.filters.DeploymentFilter = None,
    work_pools: schemas.filters.WorkPoolFilter = None,
    sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> List[schemas.core.Flow]:
    """
    Query for flows.
    """
    async with db.session_context() as session:
        return await models.flows.read_flows(
            session=session,
            flow_filter=flows,
            flow_run_filter=flow_runs,
            task_run_filter=task_runs,
            deployment_filter=deployments,
            work_pool_filter=work_pools,
            sort=sort,
            offset=offset,
            limit=limit,
        )

update_flow(flow, flow_id=Path(..., description='The flow id', alias='id'), db=Depends(provide_database_interface)) async

Updates a flow.

Source code in src/prefect/server/api/flows.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@router.patch("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def update_flow(
    flow: schemas.actions.FlowUpdate,
    flow_id: UUID = Path(..., description="The flow id", alias="id"),
    db: PrefectDBInterface = Depends(provide_database_interface),
):
    """
    Updates a flow.
    """
    async with db.session_context(begin_transaction=True) as session:
        result = await models.flows.update_flow(
            session=session, flow=flow, flow_id=flow_id
        )
    if not result:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
        )