Skip to content

prefect.utilities.callables

Utilities for working with Python callables.

ParameterSchema

Bases: BaseModel

Simple data model corresponding to an OpenAPI Schema.

Source code in src/prefect/utilities/callables.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
class ParameterSchema(pydantic.BaseModel):
    """Simple data model corresponding to an OpenAPI `Schema`."""

    title: Literal["Parameters"] = "Parameters"
    type: Literal["object"] = "object"
    properties: Dict[str, Any] = pydantic.Field(default_factory=dict)
    required: List[str] = pydantic.Field(default_factory=list)
    definitions: Dict[str, Any] = pydantic.Field(default_factory=dict)

    def model_dump_for_openapi(self) -> Dict[str, Any]:
        result = self.model_dump(mode="python", exclude_none=True)
        if "required" in result and not result["required"]:
            del result["required"]
        return result

call_with_parameters(fn, parameters)

Call a function with parameters extracted with get_call_parameters

The function must have an identical signature to the original function or this will fail. If you need to send to a function with a different signature, extract the args/kwargs using parameters_to_positional_and_keyword directly

Source code in src/prefect/utilities/callables.py
197
198
199
200
201
202
203
204
205
206
def call_with_parameters(fn: Callable, parameters: Dict[str, Any]):
    """
    Call a function with parameters extracted with `get_call_parameters`

    The function _must_ have an identical signature to the original function or this
    will fail. If you need to send to a function with a different signature, extract
    the args/kwargs using `parameters_to_positional_and_keyword` directly
    """
    args, kwargs = parameters_to_args_kwargs(fn, parameters)
    return fn(*args, **kwargs)

cloudpickle_wrapped_call(__fn, *args, **kwargs)

Serializes a function call using cloudpickle then returns a callable which will execute that call and return a cloudpickle serialized return value

This is particularly useful for sending calls to libraries that only use the Python built-in pickler (e.g. anyio.to_process and multiprocessing) but may require a wider range of pickling support.

Source code in src/prefect/utilities/callables.py
209
210
211
212
213
214
215
216
217
218
219
220
221
def cloudpickle_wrapped_call(
    __fn: Callable, *args: Any, **kwargs: Any
) -> Callable[[], bytes]:
    """
    Serializes a function call using cloudpickle then returns a callable which will
    execute that call and return a cloudpickle serialized return value

    This is particularly useful for sending calls to libraries that only use the Python
    built-in pickler (e.g. `anyio.to_process` and `multiprocessing`) but may require
    a wider range of pickling support.
    """
    payload = cloudpickle.dumps((__fn, args, kwargs))
    return partial(_run_serialized_call, payload)

collapse_variadic_parameters(fn, parameters)

Given a parameter dictionary, move any parameters stored not present in the signature into the variadic keyword argument.

Example:

```python
def foo(a, b, **kwargs):
    pass

parameters = {"a": 1, "b": 2, "c": 3, "d": 4}
collapse_variadic_parameters(foo, parameters)
# {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}}
```
Source code in src/prefect/utilities/callables.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def collapse_variadic_parameters(
    fn: Callable, parameters: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Given a parameter dictionary, move any parameters stored not present in the
    signature into the variadic keyword argument.

    Example:

        ```python
        def foo(a, b, **kwargs):
            pass

        parameters = {"a": 1, "b": 2, "c": 3, "d": 4}
        collapse_variadic_parameters(foo, parameters)
        # {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}}
        ```
    """
    signature_parameters = inspect.signature(fn).parameters
    variadic_key = None
    for key, parameter in signature_parameters.items():
        if parameter.kind == parameter.VAR_KEYWORD:
            variadic_key = key
            break

    missing_parameters = set(parameters.keys()) - set(signature_parameters.keys())

    if not variadic_key and missing_parameters:
        raise ValueError(
            f"Signature for {fn} does not include any variadic keyword argument "
            "but parameters were given that are not present in the signature."
        )

    if variadic_key and not missing_parameters:
        # variadic key is present but no missing parameters, return parameters unchanged
        return parameters

    new_parameters = parameters.copy()
    if variadic_key:
        new_parameters[variadic_key] = {}

    for key in missing_parameters:
        new_parameters[variadic_key][key] = new_parameters.pop(key)

    return new_parameters

expand_mapping_parameters(func, parameters)

Generates a list of call parameters to be used for individual calls in a mapping operation.

Parameters:

Name Type Description Default
func Callable

The function to be called

required
parameters Dict[str, Any]

A dictionary of parameters with iterables to be mapped over

required

Returns:

Name Type Description
List List[Dict[str, Any]]

A list of dictionaries to be used as parameters for each call in the mapping operation

Source code in src/prefect/utilities/callables.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
def expand_mapping_parameters(
    func: Callable, parameters: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """
    Generates a list of call parameters to be used for individual calls in a mapping
    operation.

    Args:
        func: The function to be called
        parameters: A dictionary of parameters with iterables to be mapped over

    Returns:
        List: A list of dictionaries to be used as parameters for each
            call in the mapping operation
    """
    # Ensure that any parameters in kwargs are expanded before this check
    parameters = explode_variadic_parameter(func, parameters)

    iterable_parameters = {}
    static_parameters = {}
    annotated_parameters = {}
    for key, val in parameters.items():
        if isinstance(val, (allow_failure, quote)):
            # Unwrap annotated parameters to determine if they are iterable
            annotated_parameters[key] = val
            val = val.unwrap()

        if isinstance(val, unmapped):
            static_parameters[key] = val.value
        elif isiterable(val):
            iterable_parameters[key] = list(val)
        else:
            static_parameters[key] = val

    if not len(iterable_parameters):
        raise MappingMissingIterable(
            "No iterable parameters were received. Parameters for map must "
            f"include at least one iterable. Parameters: {parameters}"
        )

    iterable_parameter_lengths = {
        key: len(val) for key, val in iterable_parameters.items()
    }
    lengths = set(iterable_parameter_lengths.values())
    if len(lengths) > 1:
        raise MappingLengthMismatch(
            "Received iterable parameters with different lengths. Parameters for map"
            f" must all be the same length. Got lengths: {iterable_parameter_lengths}"
        )

    map_length = list(lengths)[0]

    call_parameters_list = []
    for i in range(map_length):
        call_parameters = {key: value[i] for key, value in iterable_parameters.items()}
        call_parameters.update({key: value for key, value in static_parameters.items()})

        # Add default values for parameters; these are skipped earlier since they should
        # not be mapped over
        for key, value in get_parameter_defaults(func).items():
            call_parameters.setdefault(key, value)

        # Re-apply annotations to each key again
        for key, annotation in annotated_parameters.items():
            call_parameters[key] = annotation.rewrap(call_parameters[key])

        # Collapse any previously exploded kwargs
        call_parameters_list.append(collapse_variadic_parameters(func, call_parameters))

    return call_parameters_list

explode_variadic_parameter(fn, parameters)

Given a parameter dictionary, move any parameters stored in a variadic keyword argument parameter (i.e. **kwargs) into the top level.

Example:

```python
def foo(a, b, **kwargs):
    pass

parameters = {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}}
explode_variadic_parameter(foo, parameters)
# {"a": 1, "b": 2, "c": 3, "d": 4}
```
Source code in src/prefect/utilities/callables.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def explode_variadic_parameter(
    fn: Callable, parameters: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Given a parameter dictionary, move any parameters stored in a variadic keyword
    argument parameter (i.e. **kwargs) into the top level.

    Example:

        ```python
        def foo(a, b, **kwargs):
            pass

        parameters = {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}}
        explode_variadic_parameter(foo, parameters)
        # {"a": 1, "b": 2, "c": 3, "d": 4}
        ```
    """
    variadic_key = None
    for key, parameter in inspect.signature(fn).parameters.items():
        if parameter.kind == parameter.VAR_KEYWORD:
            variadic_key = key
            break

    if not variadic_key:
        return parameters

    new_parameters = parameters.copy()
    for key, value in new_parameters.pop(variadic_key, {}).items():
        new_parameters[key] = value

    return new_parameters

generate_parameter_schema(signature, docstrings)

Generate a parameter schema from a function signature and docstrings.

To get a signature from a function, use inspect.signature(fn) or _generate_signature_from_source(source_code, func_name).

Parameters:

Name Type Description Default
signature Signature

The function signature.

required
docstrings Dict[str, str]

A dictionary mapping parameter names to docstrings.

required

Returns:

Name Type Description
ParameterSchema ParameterSchema

The parameter schema.

Source code in src/prefect/utilities/callables.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def generate_parameter_schema(
    signature: inspect.Signature, docstrings: Dict[str, str]
) -> ParameterSchema:
    """
    Generate a parameter schema from a function signature and docstrings.

    To get a signature from a function, use `inspect.signature(fn)` or
    `_generate_signature_from_source(source_code, func_name)`.

    Args:
        signature: The function signature.
        docstrings: A dictionary mapping parameter names to docstrings.

    Returns:
        ParameterSchema: The parameter schema.
    """

    model_fields = {}
    aliases = {}

    if not has_v1_type_as_param(signature):
        create_schema = create_v2_schema
        process_params = process_v2_params

        config = pydantic.ConfigDict(arbitrary_types_allowed=True)
    else:
        create_schema = create_v1_schema
        process_params = process_v1_params

        class ModelConfig:
            arbitrary_types_allowed = True

        config = ModelConfig

    for position, param in enumerate(signature.parameters.values()):
        name, type_, field = process_params(
            param, position=position, docstrings=docstrings, aliases=aliases
        )
        # Generate a Pydantic model at each step so we can check if this parameter
        # type supports schema generation
        try:
            create_schema("CheckParameter", model_cfg=config, **{name: (type_, field)})
        except (ValueError, TypeError):
            # This field's type is not valid for schema creation, update it to `Any`
            type_ = Any
        model_fields[name] = (type_, field)

    # Generate the final model and schema
    schema = create_schema("Parameters", model_cfg=config, **model_fields)
    return ParameterSchema(**schema)

get_call_parameters(fn, call_args, call_kwargs, apply_defaults=True)

Bind a call to a function to get parameter/value mapping. Default values on the signature will be included if not overridden.

If the function has a __prefect_self__ attribute, it will be included as the first parameter. This attribute is set when Prefect decorates a bound method, so this approach allows Prefect to work with bound methods in a way that is consistent with how Python handles them (i.e. users don't have to pass the instance argument to the method) while still making the implicit self argument visible to all of Prefect's parameter machinery (such as cache key functions).

Raises a ParameterBindError if the arguments/kwargs are not valid for the function

Source code in src/prefect/utilities/callables.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def get_call_parameters(
    fn: Callable,
    call_args: Tuple[Any, ...],
    call_kwargs: Dict[str, Any],
    apply_defaults: bool = True,
) -> Dict[str, Any]:
    """
    Bind a call to a function to get parameter/value mapping. Default values on
    the signature will be included if not overridden.

    If the function has a `__prefect_self__` attribute, it will be included as
    the first parameter. This attribute is set when Prefect decorates a bound
    method, so this approach allows Prefect to work with bound methods in a way
    that is consistent with how Python handles them (i.e. users don't have to
    pass the instance argument to the method) while still making the implicit self
    argument visible to all of Prefect's parameter machinery (such as cache key
    functions).

    Raises a ParameterBindError if the arguments/kwargs are not valid for the
    function
    """
    if hasattr(fn, "__prefect_self__"):
        call_args = (fn.__prefect_self__,) + call_args

    try:
        bound_signature = inspect.signature(fn).bind(*call_args, **call_kwargs)
    except TypeError as exc:
        raise ParameterBindError.from_bind_failure(fn, exc, call_args, call_kwargs)

    if apply_defaults:
        bound_signature.apply_defaults()

    # We cast from `OrderedDict` to `dict` because Dask will not convert futures in an
    # ordered dictionary to values during execution; this is the default behavior in
    # Python 3.9 anyway.
    return dict(bound_signature.arguments)

get_parameter_defaults(fn)

Get default parameter values for a callable.

Source code in src/prefect/utilities/callables.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def get_parameter_defaults(
    fn: Callable,
) -> Dict[str, Any]:
    """
    Get default parameter values for a callable.
    """
    signature = inspect.signature(fn)

    parameter_defaults = {}

    for name, param in signature.parameters.items():
        if param.default is not signature.empty:
            parameter_defaults[name] = param.default

    return parameter_defaults

parameter_docstrings(docstring)

Given a docstring in Google docstring format, parse the parameter section and return a dictionary that maps parameter names to docstring.

Parameters:

Name Type Description Default
docstring Optional[str]

The function's docstring.

required

Returns:

Type Description
Dict[str, str]

Mapping from parameter names to docstrings.

Source code in src/prefect/utilities/callables.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def parameter_docstrings(docstring: Optional[str]) -> Dict[str, str]:
    """
    Given a docstring in Google docstring format, parse the parameter section
    and return a dictionary that maps parameter names to docstring.

    Args:
        docstring: The function's docstring.

    Returns:
        Mapping from parameter names to docstrings.
    """
    param_docstrings = {}

    if not docstring:
        return param_docstrings

    with disable_logger("griffe"):
        parsed = parse(Docstring(docstring), Parser.google)
        for section in parsed:
            if section.kind != DocstringSectionKind.parameters:
                continue
            param_docstrings = {
                parameter.name: parameter.description for parameter in section.value
            }

    return param_docstrings

parameter_schema(fn)

Given a function, generates an OpenAPI-compatible description of the function's arguments, including: - name - typing information - whether it is required - a default value - additional constraints (like possible enum values)

Parameters:

Name Type Description Default
fn Callable

The function whose arguments will be serialized

required

Returns:

Name Type Description
ParameterSchema ParameterSchema

the argument schema

Source code in src/prefect/utilities/callables.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def parameter_schema(fn: Callable) -> ParameterSchema:
    """Given a function, generates an OpenAPI-compatible description
    of the function's arguments, including:
        - name
        - typing information
        - whether it is required
        - a default value
        - additional constraints (like possible enum values)

    Args:
        fn (Callable): The function whose arguments will be serialized

    Returns:
        ParameterSchema: the argument schema
    """
    try:
        signature = inspect.signature(fn, eval_str=True)  # novm
    except (NameError, TypeError):
        # `eval_str` is not available in Python < 3.10
        signature = inspect.signature(fn)

    docstrings = parameter_docstrings(inspect.getdoc(fn))

    return generate_parameter_schema(signature, docstrings)

parameter_schema_from_entrypoint(entrypoint)

Generate a parameter schema from an entrypoint string.

Will load the source code of the function and extract the signature and docstring to generate the schema.

Useful for generating a schema for a function when instantiating the function may not be possible due to missing imports or other issues.

Parameters:

Name Type Description Default
entrypoint str

A string representing the entrypoint to a function. The string should be in the format of module.path.to.function:do_stuff.

required

Returns:

Name Type Description
ParameterSchema ParameterSchema

The parameter schema for the function.

Source code in src/prefect/utilities/callables.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
def parameter_schema_from_entrypoint(entrypoint: str) -> ParameterSchema:
    """
    Generate a parameter schema from an entrypoint string.

    Will load the source code of the function and extract the signature and docstring
    to generate the schema.

    Useful for generating a schema for a function when instantiating the function may
    not be possible due to missing imports or other issues.

    Args:
        entrypoint: A string representing the entrypoint to a function. The string
            should be in the format of `module.path.to.function:do_stuff`.

    Returns:
        ParameterSchema: The parameter schema for the function.
    """
    filepath = None
    if ":" in entrypoint:
        # split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
        path, func_name = entrypoint.rsplit(":", maxsplit=1)
        source_code = Path(path).read_text()
        filepath = path
    else:
        path, func_name = entrypoint.rsplit(".", maxsplit=1)
        spec = importlib.util.find_spec(path)
        if not spec or not spec.origin:
            raise ValueError(f"Could not find module {path!r}")
        source_code = Path(spec.origin).read_text()
    signature = _generate_signature_from_source(source_code, func_name, filepath)
    docstring = _get_docstring_from_source(source_code, func_name)
    return generate_parameter_schema(signature, parameter_docstrings(docstring))

parameters_to_args_kwargs(fn, parameters)

Convert a parameters dictionary to positional and keyword arguments

The function must have an identical signature to the original function or this will return an empty tuple and dict.

Source code in src/prefect/utilities/callables.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def parameters_to_args_kwargs(
    fn: Callable,
    parameters: Dict[str, Any],
) -> Tuple[Tuple[Any, ...], Dict[str, Any]]:
    """
    Convert a `parameters` dictionary to positional and keyword arguments

    The function _must_ have an identical signature to the original function or this
    will return an empty tuple and dict.
    """
    function_params = dict(inspect.signature(fn).parameters).keys()
    # Check for parameters that are not present in the function signature
    unknown_params = parameters.keys() - function_params
    if unknown_params:
        raise SignatureMismatchError.from_bad_params(
            list(function_params), list(parameters.keys())
        )
    bound_signature = inspect.signature(fn).bind_partial()
    bound_signature.arguments = parameters

    return bound_signature.args, bound_signature.kwargs

raise_for_reserved_arguments(fn, reserved_arguments)

Raise a ReservedArgumentError if fn has any parameters that conflict with the names contained in reserved_arguments.

Source code in src/prefect/utilities/callables.py
432
433
434
435
436
437
438
439
440
441
def raise_for_reserved_arguments(fn: Callable, reserved_arguments: Iterable[str]):
    """Raise a ReservedArgumentError if `fn` has any parameters that conflict
    with the names contained in `reserved_arguments`."""
    function_paremeters = inspect.signature(fn).parameters

    for argument in reserved_arguments:
        if argument in function_paremeters:
            raise ReservedArgumentError(
                f"{argument!r} is a reserved argument name and cannot be used."
            )