Skip to content

prefect.server.api.server

Defines the Prefect REST API FastAPI app.

RequestLimitMiddleware

A middleware that limits the number of concurrent requests handled by the API.

This is a blunt tool for limiting SQLite concurrent writes which will cause failures at high volume. Ideally, we would only apply the limit to routes that perform writes.

Source code in src/prefect/server/api/server.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class RequestLimitMiddleware:
    """
    A middleware that limits the number of concurrent requests handled by the API.

    This is a blunt tool for limiting SQLite concurrent writes which will cause failures
    at high volume. Ideally, we would only apply the limit to routes that perform
    writes.
    """

    def __init__(self, app, limit: float):
        self.app = app
        self._limiter = anyio.CapacityLimiter(limit)

    async def __call__(self, scope, receive, send) -> None:
        async with self._limiter:
            await self.app(scope, receive, send)

SPAStaticFiles

Bases: StaticFiles

Implementation of StaticFiles for serving single page applications.

Adds get_response handling to ensure that when a resource isn't found the application still returns the index.

Source code in src/prefect/server/api/server.py
120
121
122
123
124
125
126
127
128
129
130
131
132
class SPAStaticFiles(StaticFiles):
    """
    Implementation of `StaticFiles` for serving single page applications.

    Adds `get_response` handling to ensure that when a resource isn't found the
    application still returns the index.
    """

    async def get_response(self, path: str, scope):
        try:
            return await super().get_response(path, scope)
        except HTTPException:
            return await super().get_response("./index.html", scope)

SubprocessASGIServer

Source code in src/prefect/server/api/server.py
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
class SubprocessASGIServer:
    _instances: Dict[Union[int, None], "SubprocessASGIServer"] = {}
    _port_range = range(8000, 9000)

    def __new__(cls, port: Optional[int] = None, *args, **kwargs):
        """
        Return an instance of the server associated with the provided port.
        Prevents multiple instances from being created for the same port.
        """
        if port not in cls._instances:
            instance = super().__new__(cls)
            cls._instances[port] = instance
        return cls._instances[port]

    def __init__(self, port: Optional[int] = None):
        # This ensures initialization happens only once
        if not hasattr(self, "_initialized"):
            self.port: Optional[int] = port
            self.server_process = None
            self.server = None
            self.running = False
            self._initialized = True

    def find_available_port(self):
        max_attempts = 10
        for _ in range(max_attempts):
            port = random.choice(self._port_range)
            if self.is_port_available(port):
                return port
            time.sleep(random.uniform(0.1, 0.5))  # Random backoff
        raise RuntimeError("Unable to find an available port after multiple attempts")

    @staticmethod
    def is_port_available(port: int):
        with contextlib.closing(
            socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ) as sock:
            try:
                sock.bind(("127.0.0.1", port))
                return True
            except socket.error:
                return False

    @property
    def address(self) -> str:
        return f"http://127.0.0.1:{self.port}"

    @property
    def api_url(self) -> str:
        return f"{self.address}/api"

    def start(self, timeout: Optional[int] = None):
        """
        Start the server in a separate process. Safe to call multiple times; only starts
        the server once.

        Args:
            timeout: The maximum time to wait for the server to start
        """
        if not self.running:
            if self.port is None:
                self.port = self.find_available_port()
            assert self.port is not None, "Port must be provided or available"
            subprocess_server_logger.info(f"Starting server on {self.address}")
            try:
                self.running = True
                self.server_process = self._run_uvicorn_command()
                atexit.register(self.stop)
                with httpx.Client() as client:
                    response = None
                    elapsed_time = 0
                    max_wait_time = (
                        timeout
                        or PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS.value()
                    )
                    while elapsed_time < max_wait_time:
                        if self.server_process.poll() == 3:
                            self.port = self.find_available_port()
                            self.server_process = self._run_uvicorn_command()
                            continue
                        try:
                            response = client.get(f"{self.api_url}/health")
                        except httpx.ConnectError:
                            pass
                        else:
                            if response.status_code == 200:
                                break
                        time.sleep(0.1)
                        elapsed_time += 0.1
                    if response:
                        response.raise_for_status()
                    if not response:
                        error_message = "Timed out while attempting to connect to ephemeral Prefect API server."
                        if self.server_process.poll() is not None:
                            error_message += f" Ephemeral server process exited with code {self.server_process.returncode}."
                        if self.server_process.stdout:
                            error_message += (
                                f" stdout: {self.server_process.stdout.read()}"
                            )
                        if self.server_process.stderr:
                            error_message += (
                                f" stderr: {self.server_process.stderr.read()}"
                            )
                        raise RuntimeError(error_message)
            except Exception:
                self.running = False
                raise

    def _run_uvicorn_command(self) -> subprocess.Popen:
        # used to turn off background services
        server_env = {
            "PREFECT_UI_ENABLED": "0",
        }
        return subprocess.Popen(
            args=[
                get_sys_executable(),
                "-m",
                "uvicorn",
                "--app-dir",
                # quote wrapping needed for windows paths with spaces
                f'"{prefect.__module_path__.parent}"',
                "--factory",
                "prefect.server.api.server:create_app",
                "--host",
                "127.0.0.1",
                "--port",
                str(self.port),
                "--log-level",
                "error",
                "--lifespan",
                "on",
            ],
            env={
                **os.environ,
                **server_env,
                **get_current_settings().to_environment_variables(exclude_unset=True),
            },
        )

    def stop(self):
        if self.server_process:
            subprocess_server_logger.info(f"Stopping server on {self.address}")
            self.server_process.terminate()
            try:
                self.server_process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                self.server_process.kill()
            finally:
                self.server_process = None
        if self.port in self._instances:
            del self._instances[self.port]
        if self.running:
            self.running = False

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, *args):
        self.stop()

__new__(port=None, *args, **kwargs)

Return an instance of the server associated with the provided port. Prevents multiple instances from being created for the same port.

Source code in src/prefect/server/api/server.py
713
714
715
716
717
718
719
720
721
def __new__(cls, port: Optional[int] = None, *args, **kwargs):
    """
    Return an instance of the server associated with the provided port.
    Prevents multiple instances from being created for the same port.
    """
    if port not in cls._instances:
        instance = super().__new__(cls)
        cls._instances[port] = instance
    return cls._instances[port]

start(timeout=None)

Start the server in a separate process. Safe to call multiple times; only starts the server once.

Parameters:

Name Type Description Default
timeout Optional[int]

The maximum time to wait for the server to start

None
Source code in src/prefect/server/api/server.py
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
def start(self, timeout: Optional[int] = None):
    """
    Start the server in a separate process. Safe to call multiple times; only starts
    the server once.

    Args:
        timeout: The maximum time to wait for the server to start
    """
    if not self.running:
        if self.port is None:
            self.port = self.find_available_port()
        assert self.port is not None, "Port must be provided or available"
        subprocess_server_logger.info(f"Starting server on {self.address}")
        try:
            self.running = True
            self.server_process = self._run_uvicorn_command()
            atexit.register(self.stop)
            with httpx.Client() as client:
                response = None
                elapsed_time = 0
                max_wait_time = (
                    timeout
                    or PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS.value()
                )
                while elapsed_time < max_wait_time:
                    if self.server_process.poll() == 3:
                        self.port = self.find_available_port()
                        self.server_process = self._run_uvicorn_command()
                        continue
                    try:
                        response = client.get(f"{self.api_url}/health")
                    except httpx.ConnectError:
                        pass
                    else:
                        if response.status_code == 200:
                            break
                    time.sleep(0.1)
                    elapsed_time += 0.1
                if response:
                    response.raise_for_status()
                if not response:
                    error_message = "Timed out while attempting to connect to ephemeral Prefect API server."
                    if self.server_process.poll() is not None:
                        error_message += f" Ephemeral server process exited with code {self.server_process.returncode}."
                    if self.server_process.stdout:
                        error_message += (
                            f" stdout: {self.server_process.stdout.read()}"
                        )
                    if self.server_process.stderr:
                        error_message += (
                            f" stderr: {self.server_process.stderr.read()}"
                        )
                    raise RuntimeError(error_message)
        except Exception:
            self.running = False
            raise

create_api_app(dependencies=None, health_check_path='/health', version_check_path='/version', fast_api_app_kwargs=None)

Create a FastAPI app that includes the Prefect REST API

Parameters:

Name Type Description Default
dependencies Optional[List[Depends]]

a list of global dependencies to add to each Prefect REST API router

None
health_check_path str

the health check route path

'/health'
fast_api_app_kwargs Optional[Dict[str, Any]]

kwargs to pass to the FastAPI constructor

None

Returns:

Type Description
FastAPI

a FastAPI app that serves the Prefect REST API

Source code in src/prefect/server/api/server.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def create_api_app(
    dependencies: Optional[List[Depends]] = None,
    health_check_path: str = "/health",
    version_check_path: str = "/version",
    fast_api_app_kwargs: Optional[Dict[str, Any]] = None,
) -> FastAPI:
    """
    Create a FastAPI app that includes the Prefect REST API

    Args:
        dependencies: a list of global dependencies to add to each Prefect REST API router
        health_check_path: the health check route path
        fast_api_app_kwargs: kwargs to pass to the FastAPI constructor

    Returns:
        a FastAPI app that serves the Prefect REST API
    """
    fast_api_app_kwargs = fast_api_app_kwargs or {}
    api_app = FastAPI(title=API_TITLE, **fast_api_app_kwargs)
    api_app.add_middleware(GZipMiddleware)

    @api_app.get(health_check_path, tags=["Root"])
    async def health_check():
        return True

    @api_app.get(version_check_path, tags=["Root"])
    async def server_version():
        return SERVER_API_VERSION

    # always include version checking
    if dependencies is None:
        dependencies = [Depends(enforce_minimum_version)]
    else:
        dependencies.append(Depends(enforce_minimum_version))

    for router in API_ROUTERS:
        api_app.include_router(router, dependencies=dependencies)

    return api_app

create_app(settings=None, ephemeral=False, ignore_cache=False)

Create an FastAPI app that includes the Prefect REST API and UI

Parameters:

Name Type Description Default
settings Optional[Settings]

The settings to use to create the app. If not set, settings are pulled from the context.

None
ignore_cache bool

If set, a new application will be created even if the settings match. Otherwise, an application is returned from the cache.

False
ephemeral bool

If set, the application will be treated as ephemeral. The UI and services will be disabled.

False
Source code in src/prefect/server/api/server.py
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
def create_app(
    settings: Optional[prefect.settings.Settings] = None,
    ephemeral: bool = False,
    ignore_cache: bool = False,
) -> FastAPI:
    """
    Create an FastAPI app that includes the Prefect REST API and UI

    Args:
        settings: The settings to use to create the app. If not set, settings are pulled
            from the context.
        ignore_cache: If set, a new application will be created even if the settings
            match. Otherwise, an application is returned from the cache.
        ephemeral: If set, the application will be treated as ephemeral. The UI
            and services will be disabled.
    """
    settings = settings or prefect.settings.get_current_settings()
    cache_key = (settings.hash_key(), ephemeral)

    from prefect.logging.configuration import setup_logging

    setup_logging()

    if cache_key in APP_CACHE and not ignore_cache:
        return APP_CACHE[cache_key]

    # TODO: Move these startup functions out of this closure into the top-level or
    #       another dedicated location
    async def run_migrations():
        """Ensure the database is created and up to date with the current migrations"""
        if prefect.settings.PREFECT_API_DATABASE_MIGRATE_ON_START:
            from prefect.server.database.dependencies import provide_database_interface

            db = provide_database_interface()
            await db.create_db()

    @_memoize_block_auto_registration
    async def add_block_types():
        """Add all registered blocks to the database"""
        if not prefect.settings.PREFECT_API_BLOCKS_REGISTER_ON_START:
            return

        from prefect.server.database.dependencies import provide_database_interface
        from prefect.server.models.block_registration import run_block_auto_registration

        db = provide_database_interface()
        session = await db.session()

        async with session:
            await run_block_auto_registration(session=session)

    async def start_services():
        """Start additional services when the Prefect REST API starts up."""

        if ephemeral:
            app.state.services = None
            return

        service_instances = []
        if prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED.value():
            service_instances.append(services.scheduler.Scheduler())
            service_instances.append(services.scheduler.RecentDeploymentsScheduler())

        if prefect.settings.PREFECT_API_SERVICES_LATE_RUNS_ENABLED.value():
            service_instances.append(services.late_runs.MarkLateRuns())

        if prefect.settings.PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_ENABLED.value():
            service_instances.append(services.pause_expirations.FailExpiredPauses())

        if prefect.settings.PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED.value():
            service_instances.append(
                services.cancellation_cleanup.CancellationCleanup()
            )

        if prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED.value():
            service_instances.append(services.telemetry.Telemetry())

        if prefect.settings.PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED.value():
            service_instances.append(
                services.flow_run_notifications.FlowRunNotifications()
            )

        if prefect.settings.PREFECT_API_SERVICES_FOREMAN_ENABLED.value():
            service_instances.append(services.foreman.Foreman())

        if prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED.value():
            service_instances.append(ReactiveTriggers())
            service_instances.append(ProactiveTriggers())
            service_instances.append(Actions())

        if prefect.settings.PREFECT_API_SERVICES_TASK_RUN_RECORDER_ENABLED:
            service_instances.append(TaskRunRecorder())

        if prefect.settings.PREFECT_API_SERVICES_EVENT_PERSISTER_ENABLED:
            service_instances.append(EventPersister())

        if prefect.settings.PREFECT_API_EVENTS_STREAM_OUT_ENABLED:
            service_instances.append(stream.Distributor())

        loop = asyncio.get_running_loop()

        app.state.services = {
            service: loop.create_task(service.start()) for service in service_instances
        }

        for service, task in app.state.services.items():
            logger.info(f"{service.name} service scheduled to start in-app")
            task.add_done_callback(partial(on_service_exit, service))

    async def stop_services():
        """Ensure services are stopped before the Prefect REST API shuts down."""
        if hasattr(app.state, "services") and app.state.services:
            await asyncio.gather(*[service.stop() for service in app.state.services])
            try:
                await asyncio.gather(
                    *[task.stop() for task in app.state.services.values()]
                )
            except Exception:
                # `on_service_exit` should handle logging exceptions on exit
                pass

    @asynccontextmanager
    async def lifespan(app):
        if app not in LIFESPAN_RAN_FOR_APP:
            try:
                await run_migrations()
                await add_block_types()
                await start_services()
                LIFESPAN_RAN_FOR_APP.add(app)
                yield
            finally:
                await stop_services()
        else:
            yield

    def on_service_exit(service, task):
        """
        Added as a callback for completion of services to log exit
        """
        try:
            # Retrieving the result will raise the exception
            task.result()
        except Exception:
            logger.error(f"{service.name} service failed!", exc_info=True)
        else:
            logger.info(f"{service.name} service stopped!")

    app = FastAPI(
        title=TITLE,
        version=API_VERSION,
        lifespan=lifespan,
    )
    api_app = create_api_app(
        fast_api_app_kwargs={
            "exception_handlers": {
                # NOTE: FastAPI special cases the generic `Exception` handler and
                #       registers it as a separate middleware from the others
                Exception: custom_internal_exception_handler,
                RequestValidationError: validation_exception_handler,
                sa.exc.IntegrityError: integrity_exception_handler,
                ObjectNotFoundError: prefect_object_not_found_exception_handler,
            }
        },
    )
    ui_app = create_ui_app(ephemeral)

    # middleware
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_methods=["*"],
        allow_headers=["*"],
    )

    # Limit the number of concurrent requests when using a SQLite database to reduce
    # chance of errors where the database cannot be opened due to a high number of
    # concurrent writes
    if (
        get_dialect(prefect.settings.PREFECT_API_DATABASE_CONNECTION_URL.value()).name
        == "sqlite"
    ):
        app.add_middleware(RequestLimitMiddleware, limit=100)

    if prefect.settings.PREFECT_SERVER_CSRF_PROTECTION_ENABLED.value():
        app.add_middleware(api.middleware.CsrfMiddleware)

    if prefect.settings.PREFECT_API_ENABLE_METRICS:
        from prometheus_client import CONTENT_TYPE_LATEST, generate_latest

        @api_app.get("/metrics")
        async def metrics():
            return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST)

    api_app.mount(
        "/static",
        StaticFiles(
            directory=os.path.join(
                os.path.dirname(os.path.realpath(__file__)), "static"
            )
        ),
        name="static",
    )
    app.api_app = api_app
    app.mount("/api", app=api_app, name="api")
    app.mount("/", app=ui_app, name="ui")

    def openapi():
        """
        Convenience method for extracting the user facing OpenAPI schema from the API app.

        This method is attached to the global public app for easy access.
        """
        partial_schema = get_openapi(
            title=API_TITLE,
            version=API_VERSION,
            routes=api_app.routes,
        )
        new_schema = partial_schema.copy()
        new_schema["paths"] = {}
        for path, value in partial_schema["paths"].items():
            new_schema["paths"][f"/api{path}"] = value

        new_schema["info"]["x-logo"] = {"url": "static/prefect-logo-mark-gradient.png"}
        return new_schema

    app.openapi = openapi

    APP_CACHE[cache_key] = app
    return app

custom_internal_exception_handler(request, exc) async

Log a detailed exception for internal server errors before returning.

Send 503 for errors clients can retry on.

Source code in src/prefect/server/api/server.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
async def custom_internal_exception_handler(request: Request, exc: Exception):
    """
    Log a detailed exception for internal server errors before returning.

    Send 503 for errors clients can retry on.
    """
    if is_client_retryable_exception(exc):
        if PREFECT_API_LOG_RETRYABLE_ERRORS.value():
            logger.error("Encountered retryable exception in request:", exc_info=True)

        return JSONResponse(
            content={"exception_message": "Service Unavailable"},
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
        )

    logger.error("Encountered exception in request:", exc_info=True)

    return JSONResponse(
        content={"exception_message": "Internal Server Error"},
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
    )

integrity_exception_handler(request, exc) async

Capture database integrity errors.

Source code in src/prefect/server/api/server.py
167
168
169
170
171
172
173
174
175
176
177
178
179
async def integrity_exception_handler(request: Request, exc: Exception):
    """Capture database integrity errors."""
    logger.error("Encountered exception in request:", exc_info=True)
    return JSONResponse(
        content={
            "detail": (
                "Data integrity conflict. This usually means a "
                "unique or foreign key constraint was violated. "
                "See server logs for details."
            )
        },
        status_code=status.HTTP_409_CONFLICT,
    )

prefect_object_not_found_exception_handler(request, exc) async

Return 404 status code on object not found exceptions.

Source code in src/prefect/server/api/server.py
272
273
274
275
276
277
278
async def prefect_object_not_found_exception_handler(
    request: Request, exc: ObjectNotFoundError
):
    """Return 404 status code on object not found exceptions."""
    return JSONResponse(
        content={"exception_message": str(exc)}, status_code=status.HTTP_404_NOT_FOUND
    )

replace_placeholder_string_in_files(directory, placeholder, replacement, allowed_extensions=None)

Recursively loops through all files in the given directory and replaces a placeholder string.

Source code in src/prefect/server/api/server.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def replace_placeholder_string_in_files(
    directory, placeholder, replacement, allowed_extensions=None
):
    """
    Recursively loops through all files in the given directory and replaces
    a placeholder string.
    """
    if allowed_extensions is None:
        allowed_extensions = [".txt", ".html", ".css", ".js", ".json", ".txt"]

    for root, dirs, files in os.walk(directory):
        for file in files:
            if any(file.endswith(ext) for ext in allowed_extensions):
                file_path = os.path.join(root, file)

                with open(file_path, "r", encoding="utf-8") as file:
                    file_data = file.read()

                file_data = file_data.replace(placeholder, replacement)

                with open(file_path, "w", encoding="utf-8") as file:
                    file.write(file_data)

validation_exception_handler(request, exc) async

Provide a detailed message for request validation errors.

Source code in src/prefect/server/api/server.py
153
154
155
156
157
158
159
160
161
162
163
164
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    """Provide a detailed message for request validation errors."""
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
        content=jsonable_encoder(
            {
                "exception_message": "Invalid request received.",
                "exception_detail": exc.errors(),
                "request_body": exc.body,
            }
        ),
    )