Skip to content

prefect.server.api.ui.flows

count_deployments_by_flow(flow_ids=Body(default=..., embed=True, max_items=200), db=Depends(provide_database_interface)) async

Get deployment counts by flow id.

Source code in src/prefect/server/api/ui/flows.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@router.post("/count-deployments")
async def count_deployments_by_flow(
    flow_ids: List[UUID] = Body(default=..., embed=True, max_items=200),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> Dict[UUID, int]:
    """
    Get deployment counts by flow id.
    """
    async with db.session_context() as session:
        query = (
            sa.select(
                orm_models.Deployment.flow_id,
                sa.func.count(orm_models.Deployment.id).label("deployment_count"),
            )
            .where(orm_models.Deployment.flow_id.in_(flow_ids))
            .group_by(orm_models.Deployment.flow_id)
        )

        results = await session.execute(query)

        deployment_counts_by_flow = {
            flow_id: deployment_count for flow_id, deployment_count in results.all()
        }

        return {
            flow_id: deployment_counts_by_flow.get(flow_id, 0) for flow_id in flow_ids
        }

next_runs_by_flow(flow_ids=Body(default=..., embed=True, max_items=200), db=Depends(provide_database_interface)) async

Get the next flow run by flow id.

Source code in src/prefect/server/api/ui/flows.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
@router.post("/next-runs")
async def next_runs_by_flow(
    flow_ids: List[UUID] = Body(default=..., embed=True, max_items=200),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> Dict[UUID, Optional[SimpleNextFlowRun]]:
    """
    Get the next flow run by flow id.
    """

    async with db.session_context() as session:
        if db.dialect.name == "postgresql":
            query = _get_postgres_next_runs_query(flow_ids=flow_ids)
        else:
            query = _get_sqlite_next_runs_query(flow_ids=flow_ids)

        results = await session.execute(query)

        results_by_flow_id = {
            UUID(str(result.flow_id)): SimpleNextFlowRun(
                id=result.id,
                flow_id=result.flow_id,
                name=result.name,
                state_name=result.state_name,
                state_type=result.state_type,
                next_scheduled_start_time=result.next_scheduled_start_time,
            )
            for result in results.all()
        }

        response = {
            flow_id: results_by_flow_id.get(flow_id, None) for flow_id in flow_ids
        }
        return response