Skip to content

prefect.runner.runner

Runners are responsible for managing the execution of deployments created and managed by either flow.serve or the serve utility.

Example
import time
from prefect import flow, serve


@flow
def slow_flow(sleep: int = 60):
    "Sleepy flow - sleeps the provided amount of time (in seconds)."
    time.sleep(sleep)


@flow
def fast_flow():
    "Fastest flow this side of the Mississippi."
    return


if __name__ == "__main__":
    slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
    fast_deploy = fast_flow.to_deployment(name="fast")

    # serve generates a Runner instance
    serve(slow_deploy, fast_deploy)

Runner

Source code in src/prefect/runner/runner.py
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
class Runner:
    def __init__(
        self,
        name: Optional[str] = None,
        query_seconds: Optional[float] = None,
        prefetch_seconds: float = 10,
        limit: Optional[int] = None,
        pause_on_shutdown: bool = True,
        webserver: bool = False,
    ):
        """
        Responsible for managing the execution of remotely initiated flow runs.

        Args:
            name: The name of the runner. If not provided, a random one
                will be generated. If provided, it cannot contain '/' or '%'.
            query_seconds: The number of seconds to wait between querying for
                scheduled flow runs; defaults to `PREFECT_RUNNER_POLL_FREQUENCY`
            prefetch_seconds: The number of seconds to prefetch flow runs for.
            limit: The maximum number of flow runs this runner should be running at
            pause_on_shutdown: A boolean for whether or not to automatically pause
                deployment schedules on shutdown; defaults to `True`
            webserver: a boolean flag for whether to start a webserver for this runner

        Examples:
            Set up a Runner to manage the execute of scheduled flow runs for two flows:
                ```python
                import asyncio
                from prefect import flow, Runner

                @flow
                def hello_flow(name):
                    print(f"hello {name}")

                @flow
                def goodbye_flow(name):
                    print(f"goodbye {name}")

                if __name__ == "__main__"
                    runner = Runner(name="my-runner")

                    # Will be runnable via the API
                    runner.add_flow(hello_flow)

                    # Run on a cron schedule
                    runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})

                    asyncio.run(runner.start())
                ```
        """
        if name and ("/" in name or "%" in name):
            raise ValueError("Runner name cannot contain '/' or '%'")
        self.name = Path(name).stem if name is not None else f"runner-{uuid4()}"
        self._logger = get_logger("runner")

        self.started = False
        self.stopping = False
        self.pause_on_shutdown = pause_on_shutdown
        self.limit = limit or PREFECT_RUNNER_PROCESS_LIMIT.value()
        self.webserver = webserver

        self.query_seconds = query_seconds or PREFECT_RUNNER_POLL_FREQUENCY.value()
        self._prefetch_seconds = prefetch_seconds

        self._limiter: Optional[anyio.CapacityLimiter] = None
        self._client = get_client()
        self._submitting_flow_run_ids = set()
        self._cancelling_flow_run_ids = set()
        self._scheduled_task_scopes = set()
        self._deployment_ids: Set[UUID] = set()
        self._flow_run_process_map: Dict[UUID, Dict] = dict()

        self._tmp_dir: Path = (
            Path(tempfile.gettempdir()) / "runner_storage" / str(uuid4())
        )
        self._storage_objs: List[RunnerStorage] = []
        self._deployment_storage_map: Dict[UUID, RunnerStorage] = {}

        self._loop: Optional[asyncio.AbstractEventLoop] = None

    @sync_compatible
    async def add_deployment(
        self,
        deployment: "RunnerDeployment",
    ) -> UUID:
        """
        Registers the deployment with the Prefect API and will monitor for work once
        the runner is started.

        Args:
            deployment: A deployment for the runner to register.
        """
        deployment_id = await deployment.apply()
        storage = deployment.storage
        if storage is not None:
            storage = await self._add_storage(storage)
            self._deployment_storage_map[deployment_id] = storage
        self._deployment_ids.add(deployment_id)

        return deployment_id

    @sync_compatible
    async def add_flow(
        self,
        flow: Flow,
        name: Optional[str] = None,
        interval: Optional[
            Union[
                Iterable[Union[int, float, datetime.timedelta]],
                int,
                float,
                datetime.timedelta,
            ]
        ] = None,
        cron: Optional[Union[Iterable[str], str]] = None,
        rrule: Optional[Union[Iterable[str], str]] = None,
        paused: Optional[bool] = None,
        schedules: Optional["FlexibleScheduleList"] = None,
        parameters: Optional[dict] = None,
        triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
        description: Optional[str] = None,
        tags: Optional[List[str]] = None,
        version: Optional[str] = None,
        enforce_parameter_schema: bool = True,
        entrypoint_type: EntrypointType = EntrypointType.FILE_PATH,
    ) -> UUID:
        """
        Provides a flow to the runner to be run based on the provided configuration.

        Will create a deployment for the provided flow and register the deployment
        with the runner.

        Args:
            flow: A flow for the runner to run.
            name: The name to give the created deployment. Will default to the name
                of the runner.
            interval: An interval on which to execute the current flow. Accepts either a number
                or a timedelta object. If a number is given, it will be interpreted as seconds.
            cron: A cron schedule of when to execute runs of this flow.
            rrule: An rrule schedule of when to execute runs of this flow.
            triggers: A list of triggers that should kick of a run of this flow.
            parameters: A dictionary of default parameter values to pass to runs of this flow.
            description: A description for the created deployment. Defaults to the flow's
                description if not provided.
            tags: A list of tags to associate with the created deployment for organizational
                purposes.
            version: A version for the created deployment. Defaults to the flow's version.
            entrypoint_type: Type of entrypoint to use for the deployment. When using a module path
                entrypoint, ensure that the module will be importable in the execution environment.
        """
        api = PREFECT_API_URL.value()
        if any([interval, cron, rrule]) and not api:
            self._logger.warning(
                "Cannot schedule flows on an ephemeral server; run `prefect server"
                " start` to start the scheduler."
            )
        name = self.name if name is None else name

        deployment = await flow.to_deployment(
            name=name,
            interval=interval,
            cron=cron,
            rrule=rrule,
            schedules=schedules,
            paused=paused,
            triggers=triggers,
            parameters=parameters,
            description=description,
            tags=tags,
            version=version,
            enforce_parameter_schema=enforce_parameter_schema,
            entrypoint_type=entrypoint_type,
        )
        return await self.add_deployment(deployment)

    @sync_compatible
    async def _add_storage(self, storage: RunnerStorage) -> RunnerStorage:
        """
        Adds a storage object to the runner. The storage object will be used to pull
        code to the runner's working directory before the runner starts.

        Args:
            storage: The storage object to add to the runner.
        Returns:
            The updated storage object that was added to the runner.
        """
        if storage not in self._storage_objs:
            storage_copy = deepcopy(storage)
            storage_copy.set_base_path(self._tmp_dir)

            self._logger.debug(
                f"Adding storage {storage_copy!r} to runner at"
                f" {str(storage_copy.destination)!r}"
            )
            self._storage_objs.append(storage_copy)

            return storage_copy
        else:
            return next(s for s in self._storage_objs if s == storage)

    def handle_sigterm(self, signum, frame):
        """
        Gracefully shuts down the runner when a SIGTERM is received.
        """
        self._logger.info("SIGTERM received, initiating graceful shutdown...")
        from_sync.call_in_loop_thread(create_call(self.stop))

        sys.exit(0)

    async def start(
        self, run_once: bool = False, webserver: Optional[bool] = None
    ) -> None:
        """
        Starts a runner.

        The runner will begin monitoring for and executing any scheduled work for all added flows.

        Args:
            run_once: If True, the runner will through one query loop and then exit.
            webserver: a boolean for whether to start a webserver for this runner. If provided,
                overrides the default on the runner

        Examples:
            Initialize a Runner, add two flows, and serve them by starting the Runner:

            ```python
            import asyncio
            from prefect import flow, Runner

            @flow
            def hello_flow(name):
                print(f"hello {name}")

            @flow
            def goodbye_flow(name):
                print(f"goodbye {name}")

            if __name__ == "__main__"
                runner = Runner(name="my-runner")

                # Will be runnable via the API
                runner.add_flow(hello_flow)

                # Run on a cron schedule
                runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})

                asyncio.run(runner.start())
            ```
        """
        from prefect.runner.server import start_webserver

        _register_signal(signal.SIGTERM, self.handle_sigterm)

        webserver = webserver if webserver is not None else self.webserver

        if webserver or PREFECT_RUNNER_SERVER_ENABLE.value():
            # we'll start the ASGI server in a separate thread so that
            # uvicorn does not block the main thread
            server_thread = threading.Thread(
                name="runner-server-thread",
                target=partial(
                    start_webserver,
                    runner=self,
                ),
                daemon=True,
            )
            server_thread.start()

        start_client_metrics_server()

        async with self as runner:
            async with self._loops_task_group as tg:
                for storage in self._storage_objs:
                    if storage.pull_interval:
                        tg.start_soon(
                            partial(
                                critical_service_loop,
                                workload=storage.pull_code,
                                interval=storage.pull_interval,
                                run_once=run_once,
                                jitter_range=0.3,
                            )
                        )
                    else:
                        tg.start_soon(storage.pull_code)
                tg.start_soon(
                    partial(
                        critical_service_loop,
                        workload=runner._get_and_submit_flow_runs,
                        interval=self.query_seconds,
                        run_once=run_once,
                        jitter_range=0.3,
                    )
                )
                tg.start_soon(
                    partial(
                        critical_service_loop,
                        workload=runner._check_for_cancelled_flow_runs,
                        interval=self.query_seconds * 2,
                        run_once=run_once,
                        jitter_range=0.3,
                    )
                )

    def execute_in_background(self, func, *args, **kwargs):
        """
        Executes a function in the background.
        """

        return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self._loop)

    async def cancel_all(self):
        runs_to_cancel = []

        # done to avoid dictionary size changing during iteration
        for info in self._flow_run_process_map.values():
            runs_to_cancel.append(info["flow_run"])
        if runs_to_cancel:
            for run in runs_to_cancel:
                try:
                    await self._cancel_run(run, state_msg="Runner is shutting down.")
                except Exception:
                    self._logger.exception(
                        f"Exception encountered while cancelling {run.id}",
                        exc_info=True,
                    )

    @sync_compatible
    async def stop(self):
        """Stops the runner's polling cycle."""
        if not self.started:
            raise RuntimeError(
                "Runner has not yet started. Please start the runner by calling"
                " .start()"
            )

        self.started = False
        self.stopping = True
        await self.cancel_all()
        try:
            self._loops_task_group.cancel_scope.cancel()
        except Exception:
            self._logger.exception(
                "Exception encountered while shutting down", exc_info=True
            )

    async def execute_flow_run(
        self, flow_run_id: UUID, entrypoint: Optional[str] = None
    ):
        """
        Executes a single flow run with the given ID.

        Execution will wait to monitor for cancellation requests. Exits once
        the flow run process has exited.
        """
        self.pause_on_shutdown = False
        context = self if not self.started else asyncnullcontext()

        async with context:
            if not self._acquire_limit_slot(flow_run_id):
                return

            async with anyio.create_task_group() as tg:
                with anyio.CancelScope():
                    self._submitting_flow_run_ids.add(flow_run_id)
                    flow_run = await self._client.read_flow_run(flow_run_id)

                    pid = await self._runs_task_group.start(
                        partial(
                            self._submit_run_and_capture_errors,
                            flow_run=flow_run,
                            entrypoint=entrypoint,
                        ),
                    )

                    self._flow_run_process_map[flow_run.id] = dict(
                        pid=pid, flow_run=flow_run
                    )

                    # We want this loop to stop when the flow run process exits
                    # so we'll check if the flow run process is still alive on
                    # each iteration and cancel the task group if it is not.
                    workload = partial(
                        self._check_for_cancelled_flow_runs,
                        should_stop=lambda: not self._flow_run_process_map,
                        on_stop=tg.cancel_scope.cancel,
                    )

                    tg.start_soon(
                        partial(
                            critical_service_loop,
                            workload=workload,
                            interval=self.query_seconds,
                            jitter_range=0.3,
                        )
                    )

    def _get_flow_run_logger(self, flow_run: "FlowRun") -> PrefectLogAdapter:
        return flow_run_logger(flow_run=flow_run).getChild(
            "runner",
            extra={
                "runner_name": self.name,
            },
        )

    async def _run_process(
        self,
        flow_run: "FlowRun",
        task_status: Optional[anyio.abc.TaskStatus] = None,
        entrypoint: Optional[str] = None,
    ):
        """
        Runs the given flow run in a subprocess.

        Args:
            flow_run: Flow run to execute via process. The ID of this flow run
                is stored in the PREFECT__FLOW_RUN_ID environment variable to
                allow the engine to retrieve the corresponding flow's code and
                begin execution.
            task_status: anyio task status used to send a message to the caller
                than the flow run process has started.
        """
        command = [get_sys_executable(), "-m", "prefect.engine"]

        flow_run_logger = self._get_flow_run_logger(flow_run)

        # We must add creationflags to a dict so it is only passed as a function
        # parameter on Windows, because the presence of creationflags causes
        # errors on Unix even if set to None
        kwargs: Dict[str, object] = {}
        if sys.platform == "win32":
            kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP

        flow_run_logger.info("Opening process...")

        env = get_current_settings().to_environment_variables(exclude_unset=True)
        env.update(
            {
                **{
                    "PREFECT__FLOW_RUN_ID": str(flow_run.id),
                    "PREFECT__STORAGE_BASE_PATH": str(self._tmp_dir),
                    "PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS": "false",
                },
                **({"PREFECT__FLOW_ENTRYPOINT": entrypoint} if entrypoint else {}),
            }
        )
        env.update(**os.environ)  # is this really necessary??

        storage = self._deployment_storage_map.get(flow_run.deployment_id)
        if storage and storage.pull_interval:
            # perform an adhoc pull of code before running the flow if an
            # adhoc pull hasn't been performed in the last pull_interval
            # TODO: Explore integrating this behavior with global concurrency.
            last_adhoc_pull = getattr(storage, "last_adhoc_pull", None)
            if (
                last_adhoc_pull is None
                or last_adhoc_pull
                < datetime.datetime.now()
                - datetime.timedelta(seconds=storage.pull_interval)
            ):
                self._logger.debug(
                    "Performing adhoc pull of code for flow run %s with storage %r",
                    flow_run.id,
                    storage,
                )
                await storage.pull_code()
                setattr(storage, "last_adhoc_pull", datetime.datetime.now())

        process = await run_process(
            command=command,
            stream_output=True,
            task_status=task_status,
            env=env,
            **kwargs,
            cwd=storage.destination if storage else None,
        )

        # Use the pid for display if no name was given

        if process.returncode:
            help_message = None
            level = logging.ERROR
            if process.returncode == -9:
                level = logging.INFO
                help_message = (
                    "This indicates that the process exited due to a SIGKILL signal. "
                    "Typically, this is either caused by manual cancellation or "
                    "high memory usage causing the operating system to "
                    "terminate the process."
                )
            if process.returncode == -15:
                level = logging.INFO
                help_message = (
                    "This indicates that the process exited due to a SIGTERM signal. "
                    "Typically, this is caused by manual cancellation."
                )
            elif process.returncode == 247:
                help_message = (
                    "This indicates that the process was terminated due to high "
                    "memory usage."
                )
            elif (
                sys.platform == "win32" and process.returncode == STATUS_CONTROL_C_EXIT
            ):
                level = logging.INFO
                help_message = (
                    "Process was terminated due to a Ctrl+C or Ctrl+Break signal. "
                    "Typically, this is caused by manual cancellation."
                )

            flow_run_logger.log(
                level,
                f"Process for flow run {flow_run.name!r} exited with status code:"
                f" {process.returncode}"
                + (f"; {help_message}" if help_message else ""),
            )
        else:
            flow_run_logger.info(
                f"Process for flow run {flow_run.name!r} exited cleanly."
            )

        return process.returncode

    async def _kill_process(
        self,
        pid: int,
        grace_seconds: int = 30,
    ):
        """
        Kills a given flow run process.

        Args:
            pid: ID of the process to kill
            grace_seconds: Number of seconds to wait for the process to end.
        """
        # In a non-windows environment first send a SIGTERM, then, after
        # `grace_seconds` seconds have passed subsequent send SIGKILL. In
        # Windows we use CTRL_BREAK_EVENT as SIGTERM is useless:
        # https://bugs.python.org/issue26350
        if sys.platform == "win32":
            try:
                os.kill(pid, signal.CTRL_BREAK_EVENT)
            except (ProcessLookupError, WindowsError):
                raise RuntimeError(
                    f"Unable to kill process {pid!r}: The process was not found."
                )
        else:
            try:
                os.kill(pid, signal.SIGTERM)
            except ProcessLookupError:
                raise RuntimeError(
                    f"Unable to kill process {pid!r}: The process was not found."
                )

            # Throttle how often we check if the process is still alive to keep
            # from making too many system calls in a short period of time.
            check_interval = max(grace_seconds / 10, 1)

            with anyio.move_on_after(grace_seconds):
                while True:
                    await anyio.sleep(check_interval)

                    # Detect if the process is still alive. If not do an early
                    # return as the process respected the SIGTERM from above.
                    try:
                        os.kill(pid, 0)
                    except ProcessLookupError:
                        return

            try:
                os.kill(pid, signal.SIGKILL)
            except OSError:
                # We shouldn't ever end up here, but it's possible that the
                # process ended right after the check above.
                return

    async def _pause_schedules(self):
        """
        Pauses all deployment schedules.
        """
        self._logger.info("Pausing all deployments...")
        for deployment_id in self._deployment_ids:
            await self._client.set_deployment_paused_state(deployment_id, True)
            self._logger.debug(f"Paused deployment '{deployment_id}'")

        self._logger.info("All deployments have been paused!")

    async def _get_and_submit_flow_runs(self):
        if self.stopping:
            return
        runs_response = await self._get_scheduled_flow_runs()
        self.last_polled = pendulum.now("UTC")
        return await self._submit_scheduled_flow_runs(flow_run_response=runs_response)

    async def _check_for_cancelled_flow_runs(
        self, should_stop: Callable = lambda: False, on_stop: Callable = lambda: None
    ):
        """
        Checks for flow runs with CANCELLING a cancelling state and attempts to
        cancel them.

        Args:
            should_stop: A callable that returns a boolean indicating whether or not
                the runner should stop checking for cancelled flow runs.
            on_stop: A callable that is called when the runner should stop checking
                for cancelled flow runs.
        """
        if self.stopping:
            return
        if not self.started:
            raise RuntimeError(
                "Runner is not set up. Please make sure you are running this runner "
                "as an async context manager."
            )

        if should_stop():
            self._logger.debug(
                "Runner has no active flow runs or deployments. Sending message to loop"
                " service that no further cancellation checks are needed."
            )
            on_stop()

        self._logger.debug("Checking for cancelled flow runs...")

        named_cancelling_flow_runs = await self._client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.CANCELLED]),
                    name=FlowRunFilterStateName(any_=["Cancelling"]),
                ),
                # Avoid duplicate cancellation calls
                id=FlowRunFilterId(
                    any_=list(
                        self._flow_run_process_map.keys()
                        - self._cancelling_flow_run_ids
                    )
                ),
            ),
        )

        typed_cancelling_flow_runs = await self._client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(any_=[StateType.CANCELLING]),
                ),
                # Avoid duplicate cancellation calls
                id=FlowRunFilterId(
                    any_=list(
                        self._flow_run_process_map.keys()
                        - self._cancelling_flow_run_ids
                    )
                ),
            ),
        )

        cancelling_flow_runs = named_cancelling_flow_runs + typed_cancelling_flow_runs

        if cancelling_flow_runs:
            self._logger.info(
                f"Found {len(cancelling_flow_runs)} flow runs awaiting cancellation."
            )

        for flow_run in cancelling_flow_runs:
            self._cancelling_flow_run_ids.add(flow_run.id)
            self._runs_task_group.start_soon(self._cancel_run, flow_run)

        return cancelling_flow_runs

    async def _cancel_run(self, flow_run: "FlowRun", state_msg: Optional[str] = None):
        run_logger = self._get_flow_run_logger(flow_run)

        pid = self._flow_run_process_map.get(flow_run.id, {}).get("pid")
        if not pid:
            await self._run_on_cancellation_hooks(flow_run, flow_run.state)
            await self._mark_flow_run_as_cancelled(
                flow_run,
                state_updates={
                    "message": (
                        "Could not find process ID for flow run"
                        " and cancellation cannot be guaranteed."
                    )
                },
            )
            return

        try:
            await self._kill_process(pid)
        except RuntimeError as exc:
            self._logger.warning(f"{exc} Marking flow run as cancelled.")
            await self._run_on_cancellation_hooks(flow_run, flow_run.state)
            await self._mark_flow_run_as_cancelled(flow_run)
        except Exception:
            run_logger.exception(
                "Encountered exception while killing process for flow run "
                f"'{flow_run.id}'. Flow run may not be cancelled."
            )
            # We will try again on generic exceptions
            self._cancelling_flow_run_ids.remove(flow_run.id)
        else:
            await self._run_on_cancellation_hooks(flow_run, flow_run.state)
            await self._mark_flow_run_as_cancelled(
                flow_run,
                state_updates={
                    "message": state_msg or "Flow run was cancelled successfully."
                },
            )
            try:
                deployment = await self._client.read_deployment(flow_run.deployment_id)
            except ObjectNotFound:
                deployment = None
            try:
                flow = await self._client.read_flow(flow_run.flow_id)
            except ObjectNotFound:
                flow = None
            self._emit_flow_run_cancelled_event(
                flow_run=flow_run, flow=flow, deployment=deployment
            )
            run_logger.info(f"Cancelled flow run '{flow_run.name}'!")

    def _event_resource(self):
        from prefect import __version__

        return {
            "prefect.resource.id": f"prefect.runner.{slugify(self.name)}",
            "prefect.resource.name": self.name,
            "prefect.version": __version__,
        }

    def _emit_flow_run_cancelled_event(
        self,
        flow_run: "FlowRun",
        flow: "Optional[APIFlow]",
        deployment: "Optional[Deployment]",
    ):
        related = []
        tags = []
        if deployment:
            related.append(
                {
                    "prefect.resource.id": f"prefect.deployment.{deployment.id}",
                    "prefect.resource.role": "deployment",
                    "prefect.resource.name": deployment.name,
                }
            )
            tags.extend(deployment.tags)
        if flow:
            related.append(
                {
                    "prefect.resource.id": f"prefect.flow.{flow.id}",
                    "prefect.resource.role": "flow",
                    "prefect.resource.name": flow.name,
                }
            )
        related.append(
            {
                "prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
                "prefect.resource.role": "flow-run",
                "prefect.resource.name": flow_run.name,
            }
        )
        tags.extend(flow_run.tags)

        related = [RelatedResource.model_validate(r) for r in related]
        related += tags_as_related_resources(set(tags))

        emit_event(
            event="prefect.runner.cancelled-flow-run",
            resource=self._event_resource(),
            related=related,
        )

    async def _get_scheduled_flow_runs(
        self,
    ) -> List["FlowRun"]:
        """
        Retrieve scheduled flow runs for this runner.
        """
        scheduled_before = pendulum.now("utc").add(seconds=int(self._prefetch_seconds))
        self._logger.debug(
            f"Querying for flow runs scheduled before {scheduled_before}"
        )

        scheduled_flow_runs = (
            await self._client.get_scheduled_flow_runs_for_deployments(
                deployment_ids=list(self._deployment_ids),
                scheduled_before=scheduled_before,
            )
        )
        self._logger.debug(f"Discovered {len(scheduled_flow_runs)} scheduled_flow_runs")
        return scheduled_flow_runs

    def has_slots_available(self) -> bool:
        """
        Determine if the flow run limit has been reached.

        Returns:
            - bool: True if the limit has not been reached, False otherwise.
        """
        return self._limiter.available_tokens > 0

    def _acquire_limit_slot(self, flow_run_id: str) -> bool:
        """
        Enforces flow run limit set on runner.

        Returns:
            - bool: True if a slot was acquired, False otherwise.
        """
        try:
            if self._limiter:
                self._limiter.acquire_on_behalf_of_nowait(flow_run_id)
                self._logger.debug("Limit slot acquired for flow run '%s'", flow_run_id)
            return True
        except RuntimeError as exc:
            if (
                "this borrower is already holding one of this CapacityLimiter's tokens"
                in str(exc)
            ):
                self._logger.warning(
                    f"Duplicate submission of flow run '{flow_run_id}' detected. Runner"
                    " will not re-submit flow run."
                )
                return False
            else:
                raise
        except anyio.WouldBlock:
            self._logger.info(
                f"Flow run limit reached; {self._limiter.borrowed_tokens} flow runs"
                " in progress. You can control this limit by passing a `limit` value"
                " to `serve` or adjusting the PREFECT_RUNNER_PROCESS_LIMIT setting."
            )
            return False

    def _release_limit_slot(self, flow_run_id: str) -> None:
        """
        Frees up a slot taken by the given flow run id.
        """
        if self._limiter:
            self._limiter.release_on_behalf_of(flow_run_id)
            self._logger.debug("Limit slot released for flow run '%s'", flow_run_id)

    async def _submit_scheduled_flow_runs(
        self,
        flow_run_response: List["FlowRun"],
        entrypoints: Optional[List[str]] = None,
    ) -> List["FlowRun"]:
        """
        Takes a list of FlowRuns and submits the referenced flow runs
        for execution by the runner.
        """
        submittable_flow_runs = flow_run_response
        submittable_flow_runs.sort(key=lambda run: run.next_scheduled_start_time)
        for i, flow_run in enumerate(submittable_flow_runs):
            if flow_run.id in self._submitting_flow_run_ids:
                continue

            if self._acquire_limit_slot(flow_run.id):
                run_logger = self._get_flow_run_logger(flow_run)
                run_logger.info(
                    f"Runner '{self.name}' submitting flow run '{flow_run.id}'"
                )
                self._submitting_flow_run_ids.add(flow_run.id)
                self._runs_task_group.start_soon(
                    partial(
                        self._submit_run,
                        flow_run=flow_run,
                        entrypoint=(
                            entrypoints[i] if entrypoints else None
                        ),  # TODO: avoid relying on index
                    )
                )
            else:
                break

        return list(
            filter(
                lambda run: run.id in self._submitting_flow_run_ids,
                submittable_flow_runs,
            )
        )

    async def _submit_run(self, flow_run: "FlowRun", entrypoint: Optional[str] = None):
        """
        Submits a given flow run for execution by the runner.
        """
        run_logger = self._get_flow_run_logger(flow_run)

        ready_to_submit = await self._propose_pending_state(flow_run)

        if ready_to_submit:
            readiness_result = await self._runs_task_group.start(
                partial(
                    self._submit_run_and_capture_errors,
                    flow_run=flow_run,
                    entrypoint=entrypoint,
                ),
            )

            if readiness_result and not isinstance(readiness_result, Exception):
                self._flow_run_process_map[flow_run.id] = dict(
                    pid=readiness_result, flow_run=flow_run
                )

            run_logger.info(f"Completed submission of flow run '{flow_run.id}'")
        else:
            # If the run is not ready to submit, release the concurrency slot
            self._release_limit_slot(flow_run.id)

        self._submitting_flow_run_ids.discard(flow_run.id)

    async def _submit_run_and_capture_errors(
        self,
        flow_run: "FlowRun",
        task_status: Optional[anyio.abc.TaskStatus] = None,
        entrypoint: Optional[str] = None,
    ) -> Union[Optional[int], Exception]:
        run_logger = self._get_flow_run_logger(flow_run)

        try:
            status_code = await self._run_process(
                flow_run=flow_run,
                task_status=task_status,
                entrypoint=entrypoint,
            )
        except Exception as exc:
            if not task_status._future.done():
                # This flow run was being submitted and did not start successfully
                run_logger.exception(
                    f"Failed to start process for flow run '{flow_run.id}'."
                )
                # Mark the task as started to prevent agent crash
                task_status.started(exc)
                await self._propose_crashed_state(
                    flow_run, "Flow run process could not be started"
                )
            else:
                run_logger.exception(
                    f"An error occurred while monitoring flow run '{flow_run.id}'. "
                    "The flow run will not be marked as failed, but an issue may have "
                    "occurred."
                )
            return exc
        finally:
            self._release_limit_slot(flow_run.id)
            self._flow_run_process_map.pop(flow_run.id, None)

        if status_code != 0:
            await self._propose_crashed_state(
                flow_run,
                f"Flow run process exited with non-zero status code {status_code}.",
            )

        api_flow_run = await self._client.read_flow_run(flow_run_id=flow_run.id)
        terminal_state = api_flow_run.state
        if terminal_state.is_crashed():
            await self._run_on_crashed_hooks(flow_run=flow_run, state=terminal_state)

        return status_code

    async def _propose_pending_state(self, flow_run: "FlowRun") -> bool:
        run_logger = self._get_flow_run_logger(flow_run)
        state = flow_run.state
        try:
            state = await propose_state(
                self._client, Pending(), flow_run_id=flow_run.id
            )
        except Abort as exc:
            run_logger.info(
                (
                    f"Aborted submission of flow run '{flow_run.id}'. "
                    f"Server sent an abort signal: {exc}"
                ),
            )
            return False
        except Exception:
            run_logger.exception(
                f"Failed to update state of flow run '{flow_run.id}'",
            )
            return False

        if not state.is_pending():
            run_logger.info(
                (
                    f"Aborted submission of flow run '{flow_run.id}': "
                    f"Server returned a non-pending state {state.type.value!r}"
                ),
            )
            return False

        return True

    async def _propose_failed_state(self, flow_run: "FlowRun", exc: Exception) -> None:
        run_logger = self._get_flow_run_logger(flow_run)
        try:
            await propose_state(
                self._client,
                await exception_to_failed_state(message="Submission failed.", exc=exc),
                flow_run_id=flow_run.id,
            )
        except Abort:
            # We've already failed, no need to note the abort but we don't want it to
            # raise in the agent process
            pass
        except Exception:
            run_logger.error(
                f"Failed to update state of flow run '{flow_run.id}'",
                exc_info=True,
            )

    async def _propose_crashed_state(self, flow_run: "FlowRun", message: str) -> None:
        run_logger = self._get_flow_run_logger(flow_run)
        try:
            state = await propose_state(
                self._client,
                Crashed(message=message),
                flow_run_id=flow_run.id,
            )
        except Abort:
            # Flow run already marked as failed
            pass
        except Exception:
            run_logger.exception(f"Failed to update state of flow run '{flow_run.id}'")
        else:
            if state.is_crashed():
                run_logger.info(
                    f"Reported flow run '{flow_run.id}' as crashed: {message}"
                )

    async def _mark_flow_run_as_cancelled(
        self, flow_run: "FlowRun", state_updates: Optional[dict] = None
    ) -> None:
        state_updates = state_updates or {}
        state_updates.setdefault("name", "Cancelled")
        state_updates.setdefault("type", StateType.CANCELLED)
        state = flow_run.state.model_copy(update=state_updates)

        await self._client.set_flow_run_state(flow_run.id, state, force=True)

        # Do not remove the flow run from the cancelling set immediately because
        # the API caches responses for the `read_flow_runs` and we do not want to
        # duplicate cancellations.
        await self._schedule_task(
            60 * 10, self._cancelling_flow_run_ids.remove, flow_run.id
        )

    async def _schedule_task(self, __in_seconds: int, fn, *args, **kwargs):
        """
        Schedule a background task to start after some time.

        These tasks will be run immediately when the runner exits instead of waiting.

        The function may be async or sync. Async functions will be awaited.
        """

        async def wrapper(task_status):
            # If we are shutting down, do not sleep; otherwise sleep until the scheduled
            # time or shutdown
            if self.started:
                with anyio.CancelScope() as scope:
                    self._scheduled_task_scopes.add(scope)
                    task_status.started()
                    await anyio.sleep(__in_seconds)

                self._scheduled_task_scopes.remove(scope)
            else:
                task_status.started()

            result = fn(*args, **kwargs)
            if inspect.iscoroutine(result):
                await result

        await self._runs_task_group.start(wrapper)

    async def _run_on_cancellation_hooks(
        self,
        flow_run: "FlowRun",
        state: State,
    ) -> None:
        """
        Run the hooks for a flow.
        """
        if state.is_cancelling():
            try:
                flow = await load_flow_from_flow_run(
                    flow_run, storage_base_path=str(self._tmp_dir)
                )
                hooks = flow.on_cancellation_hooks or []

                await _run_hooks(hooks, flow_run, flow, state)
            except ObjectNotFound:
                run_logger = self._get_flow_run_logger(flow_run)
                run_logger.warning(
                    f"Runner cannot retrieve flow to execute cancellation hooks for flow run {flow_run.id!r}."
                )

    async def _run_on_crashed_hooks(
        self,
        flow_run: "FlowRun",
        state: State,
    ) -> None:
        """
        Run the hooks for a flow.
        """
        if state.is_crashed():
            flow = await load_flow_from_flow_run(
                flow_run, storage_base_path=str(self._tmp_dir)
            )
            hooks = flow.on_crashed_hooks or []

            await _run_hooks(hooks, flow_run, flow, state)

    async def __aenter__(self):
        self._logger.debug("Starting runner...")
        self._client = get_client()
        self._tmp_dir.mkdir(parents=True)

        self._limiter = anyio.CapacityLimiter(self.limit)

        if not hasattr(self, "_loop") or not self._loop:
            self._loop = asyncio.get_event_loop()

        if not hasattr(self, "_runs_task_group") or not self._runs_task_group:
            self._runs_task_group: anyio.abc.TaskGroup = anyio.create_task_group()

        if not hasattr(self, "_loops_task_group") or not self._loops_task_group:
            self._loops_task_group: anyio.abc.TaskGroup = anyio.create_task_group()

        await self._client.__aenter__()
        await self._runs_task_group.__aenter__()

        self.started = True
        return self

    async def __aexit__(self, *exc_info):
        self._logger.debug("Stopping runner...")
        if self.pause_on_shutdown:
            await self._pause_schedules()
        self.started = False
        for scope in self._scheduled_task_scopes:
            scope.cancel()
        if self._runs_task_group:
            await self._runs_task_group.__aexit__(*exc_info)
        if self._client:
            await self._client.__aexit__(*exc_info)
        shutil.rmtree(str(self._tmp_dir))

    def __repr__(self):
        return f"Runner(name={self.name!r})"

__init__(name=None, query_seconds=None, prefetch_seconds=10, limit=None, pause_on_shutdown=True, webserver=False)

Responsible for managing the execution of remotely initiated flow runs.

Parameters:

Name Type Description Default
name Optional[str]

The name of the runner. If not provided, a random one will be generated. If provided, it cannot contain '/' or '%'.

None
query_seconds Optional[float]

The number of seconds to wait between querying for scheduled flow runs; defaults to PREFECT_RUNNER_POLL_FREQUENCY

None
prefetch_seconds float

The number of seconds to prefetch flow runs for.

10
limit Optional[int]

The maximum number of flow runs this runner should be running at

None
pause_on_shutdown bool

A boolean for whether or not to automatically pause deployment schedules on shutdown; defaults to True

True
webserver bool

a boolean flag for whether to start a webserver for this runner

False

Examples:

Set up a Runner to manage the execute of scheduled flow runs for two flows: ```python import asyncio from prefect import flow, Runner

@flow
def hello_flow(name):
    print(f"hello {name}")

@flow
def goodbye_flow(name):
    print(f"goodbye {name}")

if __name__ == "__main__"
    runner = Runner(name="my-runner")

    # Will be runnable via the API
    runner.add_flow(hello_flow)

    # Run on a cron schedule
    runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})

    asyncio.run(runner.start())
```
Source code in src/prefect/runner/runner.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def __init__(
    self,
    name: Optional[str] = None,
    query_seconds: Optional[float] = None,
    prefetch_seconds: float = 10,
    limit: Optional[int] = None,
    pause_on_shutdown: bool = True,
    webserver: bool = False,
):
    """
    Responsible for managing the execution of remotely initiated flow runs.

    Args:
        name: The name of the runner. If not provided, a random one
            will be generated. If provided, it cannot contain '/' or '%'.
        query_seconds: The number of seconds to wait between querying for
            scheduled flow runs; defaults to `PREFECT_RUNNER_POLL_FREQUENCY`
        prefetch_seconds: The number of seconds to prefetch flow runs for.
        limit: The maximum number of flow runs this runner should be running at
        pause_on_shutdown: A boolean for whether or not to automatically pause
            deployment schedules on shutdown; defaults to `True`
        webserver: a boolean flag for whether to start a webserver for this runner

    Examples:
        Set up a Runner to manage the execute of scheduled flow runs for two flows:
            ```python
            import asyncio
            from prefect import flow, Runner

            @flow
            def hello_flow(name):
                print(f"hello {name}")

            @flow
            def goodbye_flow(name):
                print(f"goodbye {name}")

            if __name__ == "__main__"
                runner = Runner(name="my-runner")

                # Will be runnable via the API
                runner.add_flow(hello_flow)

                # Run on a cron schedule
                runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})

                asyncio.run(runner.start())
            ```
    """
    if name and ("/" in name or "%" in name):
        raise ValueError("Runner name cannot contain '/' or '%'")
    self.name = Path(name).stem if name is not None else f"runner-{uuid4()}"
    self._logger = get_logger("runner")

    self.started = False
    self.stopping = False
    self.pause_on_shutdown = pause_on_shutdown
    self.limit = limit or PREFECT_RUNNER_PROCESS_LIMIT.value()
    self.webserver = webserver

    self.query_seconds = query_seconds or PREFECT_RUNNER_POLL_FREQUENCY.value()
    self._prefetch_seconds = prefetch_seconds

    self._limiter: Optional[anyio.CapacityLimiter] = None
    self._client = get_client()
    self._submitting_flow_run_ids = set()
    self._cancelling_flow_run_ids = set()
    self._scheduled_task_scopes = set()
    self._deployment_ids: Set[UUID] = set()
    self._flow_run_process_map: Dict[UUID, Dict] = dict()

    self._tmp_dir: Path = (
        Path(tempfile.gettempdir()) / "runner_storage" / str(uuid4())
    )
    self._storage_objs: List[RunnerStorage] = []
    self._deployment_storage_map: Dict[UUID, RunnerStorage] = {}

    self._loop: Optional[asyncio.AbstractEventLoop] = None

add_deployment(deployment) async

Registers the deployment with the Prefect API and will monitor for work once the runner is started.

Parameters:

Name Type Description Default
deployment RunnerDeployment

A deployment for the runner to register.

required
Source code in src/prefect/runner/runner.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
@sync_compatible
async def add_deployment(
    self,
    deployment: "RunnerDeployment",
) -> UUID:
    """
    Registers the deployment with the Prefect API and will monitor for work once
    the runner is started.

    Args:
        deployment: A deployment for the runner to register.
    """
    deployment_id = await deployment.apply()
    storage = deployment.storage
    if storage is not None:
        storage = await self._add_storage(storage)
        self._deployment_storage_map[deployment_id] = storage
    self._deployment_ids.add(deployment_id)

    return deployment_id

add_flow(flow, name=None, interval=None, cron=None, rrule=None, paused=None, schedules=None, parameters=None, triggers=None, description=None, tags=None, version=None, enforce_parameter_schema=True, entrypoint_type=EntrypointType.FILE_PATH) async

Provides a flow to the runner to be run based on the provided configuration.

Will create a deployment for the provided flow and register the deployment with the runner.

Parameters:

Name Type Description Default
flow Flow

A flow for the runner to run.

required
name Optional[str]

The name to give the created deployment. Will default to the name of the runner.

None
interval Optional[Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]]

An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.

None
cron Optional[Union[Iterable[str], str]]

A cron schedule of when to execute runs of this flow.

None
rrule Optional[Union[Iterable[str], str]]

An rrule schedule of when to execute runs of this flow.

None
triggers Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]]

A list of triggers that should kick of a run of this flow.

None
parameters Optional[dict]

A dictionary of default parameter values to pass to runs of this flow.

None
description Optional[str]

A description for the created deployment. Defaults to the flow's description if not provided.

None
tags Optional[List[str]]

A list of tags to associate with the created deployment for organizational purposes.

None
version Optional[str]

A version for the created deployment. Defaults to the flow's version.

None
entrypoint_type EntrypointType

Type of entrypoint to use for the deployment. When using a module path entrypoint, ensure that the module will be importable in the execution environment.

FILE_PATH
Source code in src/prefect/runner/runner.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@sync_compatible
async def add_flow(
    self,
    flow: Flow,
    name: Optional[str] = None,
    interval: Optional[
        Union[
            Iterable[Union[int, float, datetime.timedelta]],
            int,
            float,
            datetime.timedelta,
        ]
    ] = None,
    cron: Optional[Union[Iterable[str], str]] = None,
    rrule: Optional[Union[Iterable[str], str]] = None,
    paused: Optional[bool] = None,
    schedules: Optional["FlexibleScheduleList"] = None,
    parameters: Optional[dict] = None,
    triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
    description: Optional[str] = None,
    tags: Optional[List[str]] = None,
    version: Optional[str] = None,
    enforce_parameter_schema: bool = True,
    entrypoint_type: EntrypointType = EntrypointType.FILE_PATH,
) -> UUID:
    """
    Provides a flow to the runner to be run based on the provided configuration.

    Will create a deployment for the provided flow and register the deployment
    with the runner.

    Args:
        flow: A flow for the runner to run.
        name: The name to give the created deployment. Will default to the name
            of the runner.
        interval: An interval on which to execute the current flow. Accepts either a number
            or a timedelta object. If a number is given, it will be interpreted as seconds.
        cron: A cron schedule of when to execute runs of this flow.
        rrule: An rrule schedule of when to execute runs of this flow.
        triggers: A list of triggers that should kick of a run of this flow.
        parameters: A dictionary of default parameter values to pass to runs of this flow.
        description: A description for the created deployment. Defaults to the flow's
            description if not provided.
        tags: A list of tags to associate with the created deployment for organizational
            purposes.
        version: A version for the created deployment. Defaults to the flow's version.
        entrypoint_type: Type of entrypoint to use for the deployment. When using a module path
            entrypoint, ensure that the module will be importable in the execution environment.
    """
    api = PREFECT_API_URL.value()
    if any([interval, cron, rrule]) and not api:
        self._logger.warning(
            "Cannot schedule flows on an ephemeral server; run `prefect server"
            " start` to start the scheduler."
        )
    name = self.name if name is None else name

    deployment = await flow.to_deployment(
        name=name,
        interval=interval,
        cron=cron,
        rrule=rrule,
        schedules=schedules,
        paused=paused,
        triggers=triggers,
        parameters=parameters,
        description=description,
        tags=tags,
        version=version,
        enforce_parameter_schema=enforce_parameter_schema,
        entrypoint_type=entrypoint_type,
    )
    return await self.add_deployment(deployment)

execute_flow_run(flow_run_id, entrypoint=None) async

Executes a single flow run with the given ID.

Execution will wait to monitor for cancellation requests. Exits once the flow run process has exited.

Source code in src/prefect/runner/runner.py
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
async def execute_flow_run(
    self, flow_run_id: UUID, entrypoint: Optional[str] = None
):
    """
    Executes a single flow run with the given ID.

    Execution will wait to monitor for cancellation requests. Exits once
    the flow run process has exited.
    """
    self.pause_on_shutdown = False
    context = self if not self.started else asyncnullcontext()

    async with context:
        if not self._acquire_limit_slot(flow_run_id):
            return

        async with anyio.create_task_group() as tg:
            with anyio.CancelScope():
                self._submitting_flow_run_ids.add(flow_run_id)
                flow_run = await self._client.read_flow_run(flow_run_id)

                pid = await self._runs_task_group.start(
                    partial(
                        self._submit_run_and_capture_errors,
                        flow_run=flow_run,
                        entrypoint=entrypoint,
                    ),
                )

                self._flow_run_process_map[flow_run.id] = dict(
                    pid=pid, flow_run=flow_run
                )

                # We want this loop to stop when the flow run process exits
                # so we'll check if the flow run process is still alive on
                # each iteration and cancel the task group if it is not.
                workload = partial(
                    self._check_for_cancelled_flow_runs,
                    should_stop=lambda: not self._flow_run_process_map,
                    on_stop=tg.cancel_scope.cancel,
                )

                tg.start_soon(
                    partial(
                        critical_service_loop,
                        workload=workload,
                        interval=self.query_seconds,
                        jitter_range=0.3,
                    )
                )

execute_in_background(func, *args, **kwargs)

Executes a function in the background.

Source code in src/prefect/runner/runner.py
415
416
417
418
419
420
def execute_in_background(self, func, *args, **kwargs):
    """
    Executes a function in the background.
    """

    return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self._loop)

handle_sigterm(signum, frame)

Gracefully shuts down the runner when a SIGTERM is received.

Source code in src/prefect/runner/runner.py
311
312
313
314
315
316
317
318
def handle_sigterm(self, signum, frame):
    """
    Gracefully shuts down the runner when a SIGTERM is received.
    """
    self._logger.info("SIGTERM received, initiating graceful shutdown...")
    from_sync.call_in_loop_thread(create_call(self.stop))

    sys.exit(0)

has_slots_available()

Determine if the flow run limit has been reached.

Returns:

Type Description
bool
  • bool: True if the limit has not been reached, False otherwise.
Source code in src/prefect/runner/runner.py
902
903
904
905
906
907
908
909
def has_slots_available(self) -> bool:
    """
    Determine if the flow run limit has been reached.

    Returns:
        - bool: True if the limit has not been reached, False otherwise.
    """
    return self._limiter.available_tokens > 0

start(run_once=False, webserver=None) async

Starts a runner.

The runner will begin monitoring for and executing any scheduled work for all added flows.

Parameters:

Name Type Description Default
run_once bool

If True, the runner will through one query loop and then exit.

False
webserver Optional[bool]

a boolean for whether to start a webserver for this runner. If provided, overrides the default on the runner

None

Examples:

Initialize a Runner, add two flows, and serve them by starting the Runner:

import asyncio
from prefect import flow, Runner

@flow
def hello_flow(name):
    print(f"hello {name}")

@flow
def goodbye_flow(name):
    print(f"goodbye {name}")

if __name__ == "__main__"
    runner = Runner(name="my-runner")

    # Will be runnable via the API
    runner.add_flow(hello_flow)

    # Run on a cron schedule
    runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})

    asyncio.run(runner.start())
Source code in src/prefect/runner/runner.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
async def start(
    self, run_once: bool = False, webserver: Optional[bool] = None
) -> None:
    """
    Starts a runner.

    The runner will begin monitoring for and executing any scheduled work for all added flows.

    Args:
        run_once: If True, the runner will through one query loop and then exit.
        webserver: a boolean for whether to start a webserver for this runner. If provided,
            overrides the default on the runner

    Examples:
        Initialize a Runner, add two flows, and serve them by starting the Runner:

        ```python
        import asyncio
        from prefect import flow, Runner

        @flow
        def hello_flow(name):
            print(f"hello {name}")

        @flow
        def goodbye_flow(name):
            print(f"goodbye {name}")

        if __name__ == "__main__"
            runner = Runner(name="my-runner")

            # Will be runnable via the API
            runner.add_flow(hello_flow)

            # Run on a cron schedule
            runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})

            asyncio.run(runner.start())
        ```
    """
    from prefect.runner.server import start_webserver

    _register_signal(signal.SIGTERM, self.handle_sigterm)

    webserver = webserver if webserver is not None else self.webserver

    if webserver or PREFECT_RUNNER_SERVER_ENABLE.value():
        # we'll start the ASGI server in a separate thread so that
        # uvicorn does not block the main thread
        server_thread = threading.Thread(
            name="runner-server-thread",
            target=partial(
                start_webserver,
                runner=self,
            ),
            daemon=True,
        )
        server_thread.start()

    start_client_metrics_server()

    async with self as runner:
        async with self._loops_task_group as tg:
            for storage in self._storage_objs:
                if storage.pull_interval:
                    tg.start_soon(
                        partial(
                            critical_service_loop,
                            workload=storage.pull_code,
                            interval=storage.pull_interval,
                            run_once=run_once,
                            jitter_range=0.3,
                        )
                    )
                else:
                    tg.start_soon(storage.pull_code)
            tg.start_soon(
                partial(
                    critical_service_loop,
                    workload=runner._get_and_submit_flow_runs,
                    interval=self.query_seconds,
                    run_once=run_once,
                    jitter_range=0.3,
                )
            )
            tg.start_soon(
                partial(
                    critical_service_loop,
                    workload=runner._check_for_cancelled_flow_runs,
                    interval=self.query_seconds * 2,
                    run_once=run_once,
                    jitter_range=0.3,
                )
            )

stop() async

Stops the runner's polling cycle.

Source code in src/prefect/runner/runner.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
@sync_compatible
async def stop(self):
    """Stops the runner's polling cycle."""
    if not self.started:
        raise RuntimeError(
            "Runner has not yet started. Please start the runner by calling"
            " .start()"
        )

    self.started = False
    self.stopping = True
    await self.cancel_all()
    try:
        self._loops_task_group.cancel_scope.cancel()
    except Exception:
        self._logger.exception(
            "Exception encountered while shutting down", exc_info=True
        )