Skip to content

prefect.server.api.validation

This module contains functions for validating job variables for deployments, work pools, flow runs, and RunDeployment actions. These functions are used to validate that job variables provided by users conform to the JSON schema defined in the work pool's base job template.

Note some important details:

  1. The order of applying job variables is: work pool's base job template, deployment, flow run. This means that flow run job variables override deployment job variables, which override work pool job variables.

  2. The validation of job variables for work pools and deployments ignores required keys in because we don't know if the full set of overrides will include values for any required fields.

  3. Work pools can include default values for job variables. These can be normal types or references to blocks. We have not been validating these values or whether default blocks satisfy job variable JSON schemas. To avoid failing validation for existing (otherwise working) data, we ignore invalid defaults when validating deployment and flow run variables, but not when validating the work pool's base template, e.g. during work pool creation or updates. If we find defaults that are invalid, we have to ignore required fields when we run the full validation.

  4. A flow run is the terminal point for job variables, so it is the only place where we validate required variables and default values. Thus, validate_job_variables_for_deployment_flow_run and validate_job_variables_for_run_deployment_action check for required fields.

  5. We have been using Pydantic v1 to generate work pool base job templates, and it produces invalid JSON schemas for some fields, e.g. tuples and optional fields. We try to fix these schemas on the fly while validating job variables, but there is a case we can't resolve, which is whether or not an optional field supports a None value. In this case, we allow None values to be passed in, which means that if an optional field does not actually allow None values, the Pydantic model will fail to validate at runtime.

validate_job_variable_defaults_for_work_pool(session, work_pool_name, base_job_template) async

Validate the default job variables for a work pool.

This validation checks that default values for job variables match the JSON schema defined in the work pool's base job template. It also resolves references to block documents in the default values and hydrates them to perform the validation.

Unlike validations for flow runs, validation here ignores required keys in the schema because we're only concerned with default values. The absence of a default for a required field is not an error, but if the full set of job variables when a flow is running, including the deployment's and flow run's overrides, fails to specify a value for the required key, that's an error.

NOTE: This will raise an HTTP 404 error if a referenced block document does not exist.

Source code in src/prefect/server/api/validation.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
async def validate_job_variable_defaults_for_work_pool(
    session: AsyncSession,
    work_pool_name: str,
    base_job_template: Dict[str, Any],
) -> None:
    """
    Validate the default job variables for a work pool.

    This validation checks that default values for job variables match the JSON schema
    defined in the work pool's base job template. It also resolves references to block
    documents in the default values and hydrates them to perform the validation.

    Unlike validations for flow runs, validation here ignores required keys in the schema
    because we're only concerned with default values. The absence of a default for a
    required field is not an error, but if the full set of job variables when a flow is
    running, including the deployment's and flow run's overrides, fails to specify a value
    for the required key, that's an error.

    NOTE: This will raise an HTTP 404 error if a referenced block document does not exist.
    """
    try:
        await _validate_work_pool_job_variables(
            session,
            work_pool_name,
            base_job_template,
            ignore_required=True,
            ignore_invalid_defaults=False,
        )
    except ValidationError as exc:
        error_msg = f"Validation failed for work pool's job variable defaults: {exc}"
        raise HTTPException(status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_msg)

validate_job_variables_for_deployment(session, work_pool, deployment) async

Validate job variables for deployment creation and updates.

This validation applies only to deployments that have a work pool. If the deployment does not have a work pool, we cannot validate job variables because we don't have a base job template to validate against, so we skip this validation.

Unlike validations for flow runs, validation here ignores required keys in the schema because we don't know if the full set of overrides will include values for any required fields. If the full set of job variables when a flow is running, including the deployment's and flow run's overrides, fails to specify a value for the required key, that's an error.

Source code in src/prefect/server/api/validation.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def validate_job_variables_for_deployment(
    session: AsyncSession,
    work_pool: WorkPool,
    deployment: DeploymentAction,
) -> None:
    """
    Validate job variables for deployment creation and updates.

    This validation applies only to deployments that have a work pool. If the deployment
    does not have a work pool, we cannot validate job variables because we don't have a
    base job template to validate against, so we skip this validation.

    Unlike validations for flow runs, validation here ignores required keys in the schema
    because we don't know if the full set of overrides will include values for any
    required fields. If the full set of job variables when a flow is running, including
    the deployment's and flow run's overrides, fails to specify a value for the required
    key, that's an error.
    """
    if not deployment.job_variables:
        return
    try:
        await _validate_work_pool_job_variables(
            session,
            work_pool.name,
            work_pool.base_job_template,
            deployment.job_variables or {},
            ignore_required=True,
            ignore_invalid_defaults=True,
        )
    except ValidationError as exc:
        if isinstance(deployment, schemas.actions.DeploymentCreate):
            error_msg = f"Error creating deployment: {exc}"
        else:
            error_msg = f"Error updating deployment: {exc}"
        raise HTTPException(status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_msg)

validate_job_variables_for_deployment_flow_run(session, deployment, flow_run) async

Validate job variables for a flow run created for a deployment.

Flow runs are the terminal point for job variable overlays, so we validate required job variables because all variables should now be present.

Source code in src/prefect/server/api/validation.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
async def validate_job_variables_for_deployment_flow_run(
    session: AsyncSession,
    deployment: Deployment,
    flow_run: FlowRunAction,
) -> None:
    """
    Validate job variables for a flow run created for a deployment.

    Flow runs are the terminal point for job variable overlays, so we validate required
    job variables because all variables should now be present.
    """
    # If we aren't able to access a deployment's work pool, we don't have a base job
    # template to validate job variables against. This is not a validation failure because
    # some deployments may not have a work pool, such as those created by flow.serve().
    if deployment.work_queue is None or deployment.work_queue.work_pool is None:
        logger.info(
            "Cannot validate job variables for deployment %s "
            "because it does not have a work pool",
            deployment.id,
        )
        return

    work_pool = deployment.work_queue.work_pool

    try:
        await _validate_work_pool_job_variables(
            session,
            work_pool.name,
            work_pool.base_job_template,
            deployment.job_variables or {},
            flow_run.job_variables or {},
            ignore_required=False,
            ignore_invalid_defaults=True,
        )
    except ValidationError as exc:
        if isinstance(flow_run, schemas.actions.DeploymentFlowRunCreate):
            error_msg = f"Error creating flow run: {exc}"
        else:
            error_msg = f"Error updating flow run: {exc}"
        raise HTTPException(status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_msg)

validate_job_variables_for_run_deployment_action(session, run_action) async

Validate the job variables for a RunDeployment action.

This action is equivalent to creating a flow run for a deployment, so we validate required job variables because all variables should now be present.

Source code in src/prefect/server/api/validation.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
async def validate_job_variables_for_run_deployment_action(
    session: AsyncSession,
    run_action: RunDeployment,
) -> None:
    """
    Validate the job variables for a RunDeployment action.

    This action is equivalent to creating a flow run for a deployment, so we validate
    required job variables because all variables should now be present.
    """
    try:
        deployment = await models.deployments.read_deployment(
            session, run_action.deployment_id
        )
    except (DBAPIError, NoInspectionAvailable):
        # It's possible to get an invalid UUID here because the deployment ID is
        # not validated by our schemas.
        logger.info("Could not find deployment with ID %s", run_action.deployment_id)
        deployment = None
    if not deployment:
        raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Deployment not found.")

    if deployment.work_queue is None or deployment.work_queue.work_pool is None:
        logger.info(
            "Cannot validate job variables for deployment %s "
            "because it does not have a work pool",
            run_action.deployment_id,
        )
        return

    if not (deployment.job_variables or run_action.job_variables):
        return

    work_pool = deployment.work_queue.work_pool

    await _validate_work_pool_job_variables(
        session,
        work_pool.name,
        work_pool.base_job_template,
        run_action.job_variables or {},
        ignore_required=False,
        ignore_invalid_defaults=True,
    )