Skip to content

prefect.input

RunInput

Bases: BaseModel

Source code in src/prefect/input/run_input.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
class RunInput(pydantic.BaseModel):
    model_config = ConfigDict(extra="forbid")

    _description: Optional[str] = pydantic.PrivateAttr(default=None)
    _metadata: RunInputMetadata = pydantic.PrivateAttr()

    @property
    def metadata(self) -> RunInputMetadata:
        return self._metadata

    @classmethod
    def keyset_from_type(cls) -> Keyset:
        return keyset_from_base_key(cls.__name__.lower())

    @classmethod
    @sync_compatible
    async def save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
        """
        Save the run input response to the given key.

        Args:
            - keyset (Keyset): the keyset to save the input for
            - flow_run_id (UUID, optional): the flow run ID to save the input for
        """

        if is_v2_model(cls):
            schema = create_v2_schema(cls.__name__, model_base=cls)
        else:
            schema = cls.schema(by_alias=True)

        await create_flow_run_input(
            key=keyset["schema"], value=schema, flow_run_id=flow_run_id
        )

        description = cls._description if isinstance(cls._description, str) else None
        if description:
            await create_flow_run_input(
                key=keyset["description"],
                value=description,
                flow_run_id=flow_run_id,
            )

    @classmethod
    @sync_compatible
    async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
        """
        Load the run input response from the given key.

        Args:
            - keyset (Keyset): the keyset to load the input for
            - flow_run_id (UUID, optional): the flow run ID to load the input for
        """
        flow_run_id = ensure_flow_run_id(flow_run_id)
        value = await read_flow_run_input(keyset["response"], flow_run_id=flow_run_id)
        if value:
            instance = cls(**value)
        else:
            instance = cls()
        instance._metadata = RunInputMetadata(
            key=keyset["response"], sender=None, receiver=flow_run_id
        )
        return instance

    @classmethod
    def load_from_flow_run_input(cls, flow_run_input: "FlowRunInput"):
        """
        Load the run input from a FlowRunInput object.

        Args:
            - flow_run_input (FlowRunInput): the flow run input to load the input for
        """
        instance = cls(**flow_run_input.decoded_value)
        instance._metadata = RunInputMetadata(
            key=flow_run_input.key,
            sender=flow_run_input.sender,
            receiver=flow_run_input.flow_run_id,
        )
        return instance

    @classmethod
    def with_initial_data(
        cls: Type[R], description: Optional[str] = None, **kwargs: Any
    ) -> Type[R]:
        """
        Create a new `RunInput` subclass with the given initial data as field
        defaults.

        Args:
            - description (str, optional): a description to show when resuming
                a flow run that requires input
            - kwargs (Any): the initial data to populate the subclass
        """
        fields: Dict[str, Any] = {}
        for key, value in kwargs.items():
            fields[key] = (type(value), value)
        model = pydantic.create_model(cls.__name__, **fields, __base__=cls)

        if description is not None:
            model._description = description

        return model

    @sync_compatible
    async def respond(
        self,
        run_input: "RunInput",
        sender: Optional[str] = None,
        key_prefix: Optional[str] = None,
    ):
        flow_run_id = None
        if self.metadata.sender and self.metadata.sender.startswith("prefect.flow-run"):
            _, _, id = self.metadata.sender.rpartition(".")
            flow_run_id = UUID(id)

        if not flow_run_id:
            raise RuntimeError(
                "Cannot respond to an input that was not sent by a flow run."
            )

        await _send_input(
            flow_run_id=flow_run_id,
            run_input=run_input,
            sender=sender,
            key_prefix=key_prefix,
        )

    @sync_compatible
    async def send_to(
        self,
        flow_run_id: UUID,
        sender: Optional[str] = None,
        key_prefix: Optional[str] = None,
    ):
        await _send_input(
            flow_run_id=flow_run_id,
            run_input=self,
            sender=sender,
            key_prefix=key_prefix,
        )

    @classmethod
    def receive(
        cls,
        timeout: Optional[float] = 3600,
        poll_interval: float = 10,
        raise_timeout_error: bool = False,
        exclude_keys: Optional[Set[str]] = None,
        key_prefix: Optional[str] = None,
        flow_run_id: Optional[UUID] = None,
    ):
        if key_prefix is None:
            key_prefix = f"{cls.__name__.lower()}-auto"

        return GetInputHandler(
            run_input_cls=cls,
            key_prefix=key_prefix,
            timeout=timeout,
            poll_interval=poll_interval,
            raise_timeout_error=raise_timeout_error,
            exclude_keys=exclude_keys,
            flow_run_id=flow_run_id,
        )

    @classmethod
    def subclass_from_base_model_type(
        cls, model_cls: Type[pydantic.BaseModel]
    ) -> Type["RunInput"]:
        """
        Create a new `RunInput` subclass from the given `pydantic.BaseModel`
        subclass.

        Args:
            - model_cls (pydantic.BaseModel subclass): the class from which
                to create the new `RunInput` subclass
        """
        return type(f"{model_cls.__name__}RunInput", (RunInput, model_cls), {})  # type: ignore

load(keyset, flow_run_id=None) async classmethod

Load the run input response from the given key.

Parameters:

Name Type Description Default
- keyset (Keyset

the keyset to load the input for

required
- flow_run_id (UUID

the flow run ID to load the input for

required
Source code in src/prefect/input/run_input.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@classmethod
@sync_compatible
async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
    """
    Load the run input response from the given key.

    Args:
        - keyset (Keyset): the keyset to load the input for
        - flow_run_id (UUID, optional): the flow run ID to load the input for
    """
    flow_run_id = ensure_flow_run_id(flow_run_id)
    value = await read_flow_run_input(keyset["response"], flow_run_id=flow_run_id)
    if value:
        instance = cls(**value)
    else:
        instance = cls()
    instance._metadata = RunInputMetadata(
        key=keyset["response"], sender=None, receiver=flow_run_id
    )
    return instance

load_from_flow_run_input(flow_run_input) classmethod

Load the run input from a FlowRunInput object.

Parameters:

Name Type Description Default
- flow_run_input (FlowRunInput

the flow run input to load the input for

required
Source code in src/prefect/input/run_input.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@classmethod
def load_from_flow_run_input(cls, flow_run_input: "FlowRunInput"):
    """
    Load the run input from a FlowRunInput object.

    Args:
        - flow_run_input (FlowRunInput): the flow run input to load the input for
    """
    instance = cls(**flow_run_input.decoded_value)
    instance._metadata = RunInputMetadata(
        key=flow_run_input.key,
        sender=flow_run_input.sender,
        receiver=flow_run_input.flow_run_id,
    )
    return instance

save(keyset, flow_run_id=None) async classmethod

Save the run input response to the given key.

Parameters:

Name Type Description Default
- keyset (Keyset

the keyset to save the input for

required
- flow_run_id (UUID

the flow run ID to save the input for

required
Source code in src/prefect/input/run_input.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@classmethod
@sync_compatible
async def save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None):
    """
    Save the run input response to the given key.

    Args:
        - keyset (Keyset): the keyset to save the input for
        - flow_run_id (UUID, optional): the flow run ID to save the input for
    """

    if is_v2_model(cls):
        schema = create_v2_schema(cls.__name__, model_base=cls)
    else:
        schema = cls.schema(by_alias=True)

    await create_flow_run_input(
        key=keyset["schema"], value=schema, flow_run_id=flow_run_id
    )

    description = cls._description if isinstance(cls._description, str) else None
    if description:
        await create_flow_run_input(
            key=keyset["description"],
            value=description,
            flow_run_id=flow_run_id,
        )

subclass_from_base_model_type(model_cls) classmethod

Create a new RunInput subclass from the given pydantic.BaseModel subclass.

Parameters:

Name Type Description Default
- model_cls (pydantic.BaseModel subclass

the class from which to create the new RunInput subclass

required
Source code in src/prefect/input/run_input.py
309
310
311
312
313
314
315
316
317
318
319
320
321
@classmethod
def subclass_from_base_model_type(
    cls, model_cls: Type[pydantic.BaseModel]
) -> Type["RunInput"]:
    """
    Create a new `RunInput` subclass from the given `pydantic.BaseModel`
    subclass.

    Args:
        - model_cls (pydantic.BaseModel subclass): the class from which
            to create the new `RunInput` subclass
    """
    return type(f"{model_cls.__name__}RunInput", (RunInput, model_cls), {})  # type: ignore

with_initial_data(description=None, **kwargs) classmethod

Create a new RunInput subclass with the given initial data as field defaults.

Parameters:

Name Type Description Default
- description (str

a description to show when resuming a flow run that requires input

required
- kwargs (Any

the initial data to populate the subclass

required
Source code in src/prefect/input/run_input.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
@classmethod
def with_initial_data(
    cls: Type[R], description: Optional[str] = None, **kwargs: Any
) -> Type[R]:
    """
    Create a new `RunInput` subclass with the given initial data as field
    defaults.

    Args:
        - description (str, optional): a description to show when resuming
            a flow run that requires input
        - kwargs (Any): the initial data to populate the subclass
    """
    fields: Dict[str, Any] = {}
    for key, value in kwargs.items():
        fields[key] = (type(value), value)
    model = pydantic.create_model(cls.__name__, **fields, __base__=cls)

    if description is not None:
        model._description = description

    return model

create_flow_run_input(client, key, value, flow_run_id=None, sender=None) async

Create a new flow run input. The given value will be serialized to JSON and stored as a flow run input value.

Parameters:

Name Type Description Default
- key (str

the flow run input key

required
- value (Any

the flow run input value

required
- flow_run_id (UUID

the, optional, flow run ID. If not given will default to pulling the flow run ID from the current context.

required
Source code in src/prefect/input/actions.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@sync_compatible
@client_injector
async def create_flow_run_input(
    client: "PrefectClient",
    key: str,
    value: Any,
    flow_run_id: Optional[UUID] = None,
    sender: Optional[str] = None,
):
    """
    Create a new flow run input. The given `value` will be serialized to JSON
    and stored as a flow run input value.

    Args:
        - key (str): the flow run input key
        - value (Any): the flow run input value
        - flow_run_id (UUID): the, optional, flow run ID. If not given will
          default to pulling the flow run ID from the current context.
    """
    flow_run_id = ensure_flow_run_id(flow_run_id)

    await client.create_flow_run_input(
        flow_run_id=flow_run_id,
        key=key,
        sender=sender,
        value=orjson.dumps(value).decode(),
    )

delete_flow_run_input(client, key, flow_run_id=None) async

Delete a flow run input.

Parameters:

Name Type Description Default
- flow_run_id (UUID

the flow run ID

required
- key (str

the flow run input key

required
Source code in src/prefect/input/actions.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@sync_compatible
@client_injector
async def delete_flow_run_input(
    client: "PrefectClient", key: str, flow_run_id: Optional[UUID] = None
):
    """Delete a flow run input.

    Args:
        - flow_run_id (UUID): the flow run ID
        - key (str): the flow run input key
    """

    flow_run_id = ensure_flow_run_id(flow_run_id)

    await client.delete_flow_run_input(flow_run_id=flow_run_id, key=key)

keyset_from_base_key(base_key)

Get the keyset for the given base key.

Parameters:

Name Type Description Default
- base_key (str

the base key to get the keyset for

required

Returns:

Type Description
Keyset
  • Dict[str, str]: the keyset
Source code in src/prefect/input/run_input.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def keyset_from_base_key(base_key: str) -> Keyset:
    """
    Get the keyset for the given base key.

    Args:
        - base_key (str): the base key to get the keyset for

    Returns:
        - Dict[str, str]: the keyset
    """
    return {
        "description": f"{base_key}-description",
        "response": f"{base_key}-response",
        "schema": f"{base_key}-schema",
    }

keyset_from_paused_state(state)

Get the keyset for the given Paused state.

Parameters:

Name Type Description Default
- state (State

the state to get the keyset for

required
Source code in src/prefect/input/run_input.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def keyset_from_paused_state(state: "State") -> Keyset:
    """
    Get the keyset for the given Paused state.

    Args:
        - state (State): the state to get the keyset for
    """

    if not state.is_paused():
        raise RuntimeError(f"{state.type.value!r} is unsupported.")

    state_name = state.name or ""
    base_key = f"{state_name.lower()}-{str(state.state_details.pause_key)}"
    return keyset_from_base_key(base_key)

read_flow_run_input(client, key, flow_run_id=None) async

Read a flow run input.

Parameters:

Name Type Description Default
- key (str

the flow run input key

required
- flow_run_id (UUID

the flow run ID

required
Source code in src/prefect/input/actions.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@sync_compatible
@client_injector
async def read_flow_run_input(
    client: "PrefectClient", key: str, flow_run_id: Optional[UUID] = None
) -> Any:
    """Read a flow run input.

    Args:
        - key (str): the flow run input key
        - flow_run_id (UUID): the flow run ID
    """
    flow_run_id = ensure_flow_run_id(flow_run_id)

    try:
        value = await client.read_flow_run_input(flow_run_id=flow_run_id, key=key)
    except PrefectHTTPStatusError as exc:
        if exc.response.status_code == 404:
            return None
        raise
    else:
        return orjson.loads(value)