Skip to content

prefect.client.schemas.schedules

Schedule schemas

CronSchedule

Bases: PrefectBaseModel

Cron schedule

NOTE: If the timezone is a DST-observing one, then the schedule will adjust itself appropriately. Cron's rules for DST are based on schedule times, not intervals. This means that an hourly cron schedule will fire on every new schedule hour, not every elapsed hour; for example, when clocks are set back this will result in a two-hour pause as the schedule will fire the first time 1am is reached and the first time 2am is reached, 120 minutes later. Longer schedules, such as one that fires at 9am every morning, will automatically adjust for DST.

Parameters:

Name Type Description Default
cron str

a valid cron string

required
timezone str

a valid timezone string in IANA tzdata format (for example, America/New_York).

required
day_or bool

Control how croniter handles day and day_of_week entries. Defaults to True, matching cron which connects those values using OR. If the switch is set to False, the values are connected using AND. This behaves like fcron and enables you to e.g. define a job that executes each 2nd friday of a month by setting the days of month and the weekday.

required
Source code in src/prefect/client/schemas/schedules.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class CronSchedule(PrefectBaseModel):
    """
    Cron schedule

    NOTE: If the timezone is a DST-observing one, then the schedule will adjust
    itself appropriately. Cron's rules for DST are based on schedule times, not
    intervals. This means that an hourly cron schedule will fire on every new
    schedule hour, not every elapsed hour; for example, when clocks are set back
    this will result in a two-hour pause as the schedule will fire *the first
    time* 1am is reached and *the first time* 2am is reached, 120 minutes later.
    Longer schedules, such as one that fires at 9am every morning, will
    automatically adjust for DST.

    Args:
        cron (str): a valid cron string
        timezone (str): a valid timezone string in IANA tzdata format (for example,
            America/New_York).
        day_or (bool, optional): Control how croniter handles `day` and `day_of_week`
            entries. Defaults to True, matching cron which connects those values using
            OR. If the switch is set to False, the values are connected using AND. This
            behaves like fcron and enables you to e.g. define a job that executes each
            2nd friday of a month by setting the days of month and the weekday.

    """

    model_config = ConfigDict(extra="forbid")

    cron: str = Field(default=..., examples=["0 0 * * *"])
    timezone: Optional[str] = Field(default=None, examples=["America/New_York"])
    day_or: bool = Field(
        default=True,
        description=(
            "Control croniter behavior for handling day and day_of_week entries."
        ),
    )

    @field_validator("timezone")
    @classmethod
    def valid_timezone(cls, v):
        return default_timezone(v)

    @field_validator("cron")
    @classmethod
    def valid_cron_string(cls, v):
        return validate_cron_string(v)

IntervalSchedule

Bases: PrefectBaseModel

A schedule formed by adding interval increments to an anchor_date. If no anchor_date is supplied, the current UTC time is used. If a timezone-naive datetime is provided for anchor_date, it is assumed to be in the schedule's timezone (or UTC). Even if supplied with an IANA timezone, anchor dates are always stored as UTC offsets, so a timezone can be provided to determine localization behaviors like DST boundary handling. If none is provided it will be inferred from the anchor date.

NOTE: If the IntervalSchedule anchor_date or timezone is provided in a DST-observing timezone, then the schedule will adjust itself appropriately. Intervals greater than 24 hours will follow DST conventions, while intervals of less than 24 hours will follow UTC intervals. For example, an hourly schedule will fire every UTC hour, even across DST boundaries. When clocks are set back, this will result in two runs that appear to both be scheduled for 1am local time, even though they are an hour apart in UTC time. For longer intervals, like a daily schedule, the interval schedule will adjust for DST boundaries so that the clock-hour remains constant. This means that a daily schedule that always fires at 9am will observe DST and continue to fire at 9am in the local time zone.

Parameters:

Name Type Description Default
interval timedelta

an interval to schedule on

required
anchor_date DateTime

an anchor date to schedule increments against; if not provided, the current timestamp will be used

required
timezone str

a valid timezone string

required
Source code in src/prefect/client/schemas/schedules.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class IntervalSchedule(PrefectBaseModel):
    """
    A schedule formed by adding `interval` increments to an `anchor_date`. If no
    `anchor_date` is supplied, the current UTC time is used.  If a
    timezone-naive datetime is provided for `anchor_date`, it is assumed to be
    in the schedule's timezone (or UTC). Even if supplied with an IANA timezone,
    anchor dates are always stored as UTC offsets, so a `timezone` can be
    provided to determine localization behaviors like DST boundary handling. If
    none is provided it will be inferred from the anchor date.

    NOTE: If the `IntervalSchedule` `anchor_date` or `timezone` is provided in a
    DST-observing timezone, then the schedule will adjust itself appropriately.
    Intervals greater than 24 hours will follow DST conventions, while intervals
    of less than 24 hours will follow UTC intervals. For example, an hourly
    schedule will fire every UTC hour, even across DST boundaries. When clocks
    are set back, this will result in two runs that *appear* to both be
    scheduled for 1am local time, even though they are an hour apart in UTC
    time. For longer intervals, like a daily schedule, the interval schedule
    will adjust for DST boundaries so that the clock-hour remains constant. This
    means that a daily schedule that always fires at 9am will observe DST and
    continue to fire at 9am in the local time zone.

    Args:
        interval (datetime.timedelta): an interval to schedule on
        anchor_date (DateTime, optional): an anchor date to schedule increments against;
            if not provided, the current timestamp will be used
        timezone (str, optional): a valid timezone string
    """

    model_config = ConfigDict(extra="forbid", exclude_none=True)

    interval: datetime.timedelta = Field(gt=datetime.timedelta(0))
    anchor_date: Annotated[DateTime, AfterValidator(default_anchor_date)] = Field(
        default_factory=lambda: pendulum.now("UTC"),
        examples=["2020-01-01T00:00:00Z"],
    )
    timezone: Optional[str] = Field(default=None, examples=["America/New_York"])

    @model_validator(mode="after")
    def validate_timezone(self):
        self.timezone = default_timezone(self.timezone, self.model_dump())
        return self

RRuleSchedule

Bases: PrefectBaseModel

RRule schedule, based on the iCalendar standard (RFC 5545) as implemented in dateutils.rrule.

RRules are appropriate for any kind of calendar-date manipulation, including irregular intervals, repetition, exclusions, week day or day-of-month adjustments, and more.

Note that as a calendar-oriented standard, RRuleSchedules are sensitive to to the initial timezone provided. A 9am daily schedule with a daylight saving time-aware start date will maintain a local 9am time through DST boundaries; a 9am daily schedule with a UTC start date will maintain a 9am UTC time.

Parameters:

Name Type Description Default
rrule str

a valid RRule string

required
timezone str

a valid timezone string

required
Source code in src/prefect/client/schemas/schedules.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class RRuleSchedule(PrefectBaseModel):
    """
    RRule schedule, based on the iCalendar standard
    ([RFC 5545](https://datatracker.ietf.org/doc/html/rfc5545)) as
    implemented in `dateutils.rrule`.

    RRules are appropriate for any kind of calendar-date manipulation, including
    irregular intervals, repetition, exclusions, week day or day-of-month
    adjustments, and more.

    Note that as a calendar-oriented standard, `RRuleSchedules` are sensitive to
    to the initial timezone provided. A 9am daily schedule with a daylight saving
    time-aware start date will maintain a local 9am time through DST boundaries;
    a 9am daily schedule with a UTC start date will maintain a 9am UTC time.

    Args:
        rrule (str): a valid RRule string
        timezone (str, optional): a valid timezone string
    """

    model_config = ConfigDict(extra="forbid")

    rrule: str
    timezone: Optional[str] = Field(
        default="UTC", examples=["America/New_York"], validate_default=True
    )

    @field_validator("rrule")
    @classmethod
    def validate_rrule_str(cls, v):
        return validate_rrule_string(v)

    @classmethod
    def from_rrule(cls, rrule: dateutil.rrule.rrule):
        if isinstance(rrule, dateutil.rrule.rrule):
            if rrule._dtstart.tzinfo is not None:
                timezone = rrule._dtstart.tzinfo.name
            else:
                timezone = "UTC"
            return RRuleSchedule(rrule=str(rrule), timezone=timezone)
        elif isinstance(rrule, dateutil.rrule.rruleset):
            dtstarts = [rr._dtstart for rr in rrule._rrule if rr._dtstart is not None]
            unique_dstarts = set(pendulum.instance(d).in_tz("UTC") for d in dtstarts)
            unique_timezones = set(d.tzinfo for d in dtstarts if d.tzinfo is not None)

            if len(unique_timezones) > 1:
                raise ValueError(
                    f"rruleset has too many dtstart timezones: {unique_timezones}"
                )

            if len(unique_dstarts) > 1:
                raise ValueError(f"rruleset has too many dtstarts: {unique_dstarts}")

            if unique_dstarts and unique_timezones:
                timezone = dtstarts[0].tzinfo.name
            else:
                timezone = "UTC"

            rruleset_string = ""
            if rrule._rrule:
                rruleset_string += "\n".join(str(r) for r in rrule._rrule)
            if rrule._exrule:
                rruleset_string += "\n" if rruleset_string else ""
                rruleset_string += "\n".join(str(r) for r in rrule._exrule).replace(
                    "RRULE", "EXRULE"
                )
            if rrule._rdate:
                rruleset_string += "\n" if rruleset_string else ""
                rruleset_string += "RDATE:" + ",".join(
                    rd.strftime("%Y%m%dT%H%M%SZ") for rd in rrule._rdate
                )
            if rrule._exdate:
                rruleset_string += "\n" if rruleset_string else ""
                rruleset_string += "EXDATE:" + ",".join(
                    exd.strftime("%Y%m%dT%H%M%SZ") for exd in rrule._exdate
                )
            return RRuleSchedule(rrule=rruleset_string, timezone=timezone)
        else:
            raise ValueError(f"Invalid RRule object: {rrule}")

    def to_rrule(self) -> dateutil.rrule.rrule:
        """
        Since rrule doesn't properly serialize/deserialize timezones, we localize dates
        here
        """
        rrule = dateutil.rrule.rrulestr(
            self.rrule,
            dtstart=DEFAULT_ANCHOR_DATE,
            cache=True,
        )
        timezone = dateutil.tz.gettz(self.timezone)
        if isinstance(rrule, dateutil.rrule.rrule):
            kwargs = dict(dtstart=rrule._dtstart.replace(tzinfo=timezone))
            if rrule._until:
                kwargs.update(
                    until=rrule._until.replace(tzinfo=timezone),
                )
            return rrule.replace(**kwargs)
        elif isinstance(rrule, dateutil.rrule.rruleset):
            # update rrules
            localized_rrules = []
            for rr in rrule._rrule:
                kwargs = dict(dtstart=rr._dtstart.replace(tzinfo=timezone))
                if rr._until:
                    kwargs.update(
                        until=rr._until.replace(tzinfo=timezone),
                    )
                localized_rrules.append(rr.replace(**kwargs))
            rrule._rrule = localized_rrules

            # update exrules
            localized_exrules = []
            for exr in rrule._exrule:
                kwargs = dict(dtstart=exr._dtstart.replace(tzinfo=timezone))
                if exr._until:
                    kwargs.update(
                        until=exr._until.replace(tzinfo=timezone),
                    )
                localized_exrules.append(exr.replace(**kwargs))
            rrule._exrule = localized_exrules

            # update rdates
            localized_rdates = []
            for rd in rrule._rdate:
                localized_rdates.append(rd.replace(tzinfo=timezone))
            rrule._rdate = localized_rdates

            # update exdates
            localized_exdates = []
            for exd in rrule._exdate:
                localized_exdates.append(exd.replace(tzinfo=timezone))
            rrule._exdate = localized_exdates

            return rrule

    @field_validator("timezone")
    def valid_timezone(cls, v):
        """
        Validate that the provided timezone is a valid IANA timezone.

        Unfortunately this list is slightly different from the list of valid
        timezones in pendulum that we use for cron and interval timezone validation.
        """
        from prefect._internal.pytz import HAS_PYTZ

        if HAS_PYTZ:
            import pytz
        else:
            from prefect._internal import pytz

        if v and v not in pytz.all_timezones_set:
            raise ValueError(f'Invalid timezone: "{v}"')
        elif v is None:
            return "UTC"
        return v

to_rrule()

Since rrule doesn't properly serialize/deserialize timezones, we localize dates here

Source code in src/prefect/client/schemas/schedules.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def to_rrule(self) -> dateutil.rrule.rrule:
    """
    Since rrule doesn't properly serialize/deserialize timezones, we localize dates
    here
    """
    rrule = dateutil.rrule.rrulestr(
        self.rrule,
        dtstart=DEFAULT_ANCHOR_DATE,
        cache=True,
    )
    timezone = dateutil.tz.gettz(self.timezone)
    if isinstance(rrule, dateutil.rrule.rrule):
        kwargs = dict(dtstart=rrule._dtstart.replace(tzinfo=timezone))
        if rrule._until:
            kwargs.update(
                until=rrule._until.replace(tzinfo=timezone),
            )
        return rrule.replace(**kwargs)
    elif isinstance(rrule, dateutil.rrule.rruleset):
        # update rrules
        localized_rrules = []
        for rr in rrule._rrule:
            kwargs = dict(dtstart=rr._dtstart.replace(tzinfo=timezone))
            if rr._until:
                kwargs.update(
                    until=rr._until.replace(tzinfo=timezone),
                )
            localized_rrules.append(rr.replace(**kwargs))
        rrule._rrule = localized_rrules

        # update exrules
        localized_exrules = []
        for exr in rrule._exrule:
            kwargs = dict(dtstart=exr._dtstart.replace(tzinfo=timezone))
            if exr._until:
                kwargs.update(
                    until=exr._until.replace(tzinfo=timezone),
                )
            localized_exrules.append(exr.replace(**kwargs))
        rrule._exrule = localized_exrules

        # update rdates
        localized_rdates = []
        for rd in rrule._rdate:
            localized_rdates.append(rd.replace(tzinfo=timezone))
        rrule._rdate = localized_rdates

        # update exdates
        localized_exdates = []
        for exd in rrule._exdate:
            localized_exdates.append(exd.replace(tzinfo=timezone))
        rrule._exdate = localized_exdates

        return rrule

valid_timezone(v)

Validate that the provided timezone is a valid IANA timezone.

Unfortunately this list is slightly different from the list of valid timezones in pendulum that we use for cron and interval timezone validation.

Source code in src/prefect/client/schemas/schedules.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
@field_validator("timezone")
def valid_timezone(cls, v):
    """
    Validate that the provided timezone is a valid IANA timezone.

    Unfortunately this list is slightly different from the list of valid
    timezones in pendulum that we use for cron and interval timezone validation.
    """
    from prefect._internal.pytz import HAS_PYTZ

    if HAS_PYTZ:
        import pytz
    else:
        from prefect._internal import pytz

    if v and v not in pytz.all_timezones_set:
        raise ValueError(f'Invalid timezone: "{v}"')
    elif v is None:
        return "UTC"
    return v

construct_schedule(interval=None, anchor_date=None, cron=None, rrule=None, timezone=None)

Construct a schedule from the provided arguments.

Parameters:

Name Type Description Default
interval Optional[Union[int, float, timedelta]]

An interval on which to schedule runs. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds.

None
anchor_date Optional[Union[datetime, str]]

The start date for an interval schedule.

None
cron Optional[str]

A cron schedule for runs.

None
rrule Optional[str]

An rrule schedule of when to execute runs of this flow.

None
timezone Optional[str]

A timezone to use for the schedule. Defaults to UTC.

None
Source code in src/prefect/client/schemas/schedules.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def construct_schedule(
    interval: Optional[Union[int, float, datetime.timedelta]] = None,
    anchor_date: Optional[Union[datetime.datetime, str]] = None,
    cron: Optional[str] = None,
    rrule: Optional[str] = None,
    timezone: Optional[str] = None,
) -> SCHEDULE_TYPES:
    """
    Construct a schedule from the provided arguments.

    Args:
        interval: An interval on which to schedule runs. Accepts either a number
            or a timedelta object. If a number is given, it will be interpreted as seconds.
        anchor_date: The start date for an interval schedule.
        cron: A cron schedule for runs.
        rrule: An rrule schedule of when to execute runs of this flow.
        timezone: A timezone to use for the schedule. Defaults to UTC.
    """
    num_schedules = sum(1 for entry in (interval, cron, rrule) if entry is not None)
    if num_schedules > 1:
        raise ValueError("Only one of interval, cron, or rrule can be provided.")

    if anchor_date and not interval:
        raise ValueError(
            "An anchor date can only be provided with an interval schedule"
        )

    if timezone and not (interval or cron or rrule):
        raise ValueError(
            "A timezone can only be provided with interval, cron, or rrule"
        )

    schedule = None
    if interval:
        if isinstance(interval, (int, float)):
            interval = datetime.timedelta(seconds=interval)
        if not anchor_date:
            anchor_date = DateTime.now()
        schedule = IntervalSchedule(
            interval=interval, anchor_date=anchor_date, timezone=timezone
        )
    elif cron:
        schedule = CronSchedule(cron=cron, timezone=timezone)
    elif rrule:
        schedule = RRuleSchedule(rrule=rrule, timezone=timezone)

    if schedule is None:
        raise ValueError("Either interval, cron, or rrule must be provided")

    return schedule