Skip to content

prefect.server.orchestration.instrumentation_policies

Orchestration rules related to instrumenting the orchestration engine for Prefect Observability

InstrumentFlowRunStateTransitions

Bases: BaseUniversalTransform

When a Flow Run changes states, fire a Prefect Event for the state change

Source code in src/prefect/server/orchestration/instrumentation_policies.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class InstrumentFlowRunStateTransitions(BaseUniversalTransform):
    """When a Flow Run changes states, fire a Prefect Event for the state change"""

    async def after_transition(self, context: OrchestrationContext) -> None:
        if not context.proposed_state or not context.validated_state:
            return

        if not isinstance(context, FlowOrchestrationContext):
            return

        initial_state = (
            context.initial_state.model_copy() if context.initial_state else None
        )
        validated_state = context.validated_state.model_copy()

        # Guard against passing large state payloads to arq
        if initial_state:
            initial_state.timestamp = context.initial_state.timestamp
            initial_state.message = truncated_to(
                TRUNCATE_STATE_MESSAGES_AT, initial_state.message
            )
        if validated_state:
            validated_state.timestamp = context.validated_state.timestamp
            validated_state.message = truncated_to(
                TRUNCATE_STATE_MESSAGES_AT, validated_state.message
            )

        assert isinstance(context.session, AsyncSession)

        async with PrefectServerEventsClient() as events:
            await events.emit(
                await flow_run_state_change_event(
                    session=context.session,
                    occurred=validated_state.timestamp,
                    flow_run=context.run,
                    initial_state_id=initial_state.id if initial_state else None,
                    initial_state=initial_state,
                    validated_state_id=validated_state.id,
                    validated_state=validated_state,
                )
            )