Skip to content

prefect.server.api.ui.flow_runs

count_task_runs_by_flow_run(flow_run_ids=Body(default=..., embed=True, max_items=200), db=Depends(provide_database_interface)) async

Get task run counts by flow run id.

Source code in src/prefect/server/api/ui/flow_runs.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@router.post("/count-task-runs")
async def count_task_runs_by_flow_run(
    flow_run_ids: list[UUID] = Body(default=..., embed=True, max_items=200),
    db: PrefectDBInterface = Depends(provide_database_interface),
) -> dict[UUID, int]:
    """
    Get task run counts by flow run id.
    """
    async with db.session_context() as session:
        query = (
            sa.select(
                orm_models.TaskRun.flow_run_id,
                sa.func.count(orm_models.TaskRun.id).label("task_run_count"),
            )
            .where(
                sa.and_(
                    orm_models.TaskRun.flow_run_id.in_(flow_run_ids),
                    sa.not_(orm_models.TaskRun.subflow_run.has()),
                )
            )
            .group_by(orm_models.TaskRun.flow_run_id)
        )

        results = await session.execute(query)

        task_run_counts_by_flow_run = {
            flow_run_id: task_run_count for flow_run_id, task_run_count in results.all()
        }

        return {
            flow_run_id: task_run_counts_by_flow_run.get(flow_run_id, 0)
            for flow_run_id in flow_run_ids
        }