Skip to content

prefect.server.events.stream

run_distributor(started) async

Runs the distributor consumer forever until it is cancelled

Source code in src/prefect/server/events/stream.py
137
138
139
140
141
142
143
144
145
146
147
148
async def run_distributor(started: asyncio.Event):
    """Runs the distributor consumer forever until it is cancelled"""
    global _distributor_started
    async with messaging.ephemeral_subscription(
        topic="events",
    ) as create_consumer_kwargs:
        started.set()
        async with distributor() as handler:
            consumer = messaging.create_consumer(**create_consumer_kwargs)
            await consumer.run(
                handler=handler,
            )

start_distributor() async

Starts the distributor consumer as a global background task

Source code in src/prefect/server/events/stream.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def start_distributor():
    """Starts the distributor consumer as a global background task"""
    global _distributor_task
    global _distributor_started
    if _distributor_task:
        return

    _distributor_started = asyncio.Event()
    _distributor_task = asyncio.create_task(run_distributor(_distributor_started))
    await _distributor_started.wait()

stop_distributor() async

Stops the distributor consumer global background task

Source code in src/prefect/server/events/stream.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
async def stop_distributor():
    """Stops the distributor consumer global background task"""
    global _distributor_task
    global _distributor_started
    if not _distributor_task:
        return

    task = _distributor_task
    _distributor_task = None
    _distributor_started = None

    task.cancel()
    try:
        await asyncio.shield(task)
    except asyncio.CancelledError:
        pass