Skip to content

prefect.server.events.filters

AutomationFilterCreated

Bases: PrefectFilterBaseModel

Filter by Automation.created.

Source code in src/prefect/server/events/filters.py
28
29
30
31
32
33
34
35
36
37
38
39
40
class AutomationFilterCreated(PrefectFilterBaseModel):
    """Filter by `Automation.created`."""

    before_: Optional[DateTime] = Field(
        default=None,
        description="Only include automations created before this datetime",
    )

    def _get_filter_list(self) -> list:
        filters = []
        if self.before_ is not None:
            filters.append(orm_models.Automation.created <= self.before_)
        return filters

AutomationFilterName

Bases: PrefectFilterBaseModel

Filter by Automation.created.

Source code in src/prefect/server/events/filters.py
43
44
45
46
47
48
49
50
51
52
53
54
55
class AutomationFilterName(PrefectFilterBaseModel):
    """Filter by `Automation.created`."""

    any_: Optional[List[str]] = Field(
        default=None,
        description="Only include automations with names that match any of these strings",
    )

    def _get_filter_list(self) -> List:
        filters = []
        if self.any_ is not None:
            filters.append(orm_models.Automation.name.in_(self.any_))
        return filters

EventDataFilter

Bases: PrefectBaseModel

A base class for filtering event data.

Source code in src/prefect/server/events/filters.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class EventDataFilter(PrefectBaseModel, extra="forbid"):
    """A base class for filtering event data."""

    _top_level_filter: Optional["EventFilter"] = PrivateAttr(None)

    def get_filters(self) -> List["EventDataFilter"]:
        filters: List[EventDataFilter] = [
            filter
            for filter in [
                getattr(self, name) for name, field in self.model_fields.items()
            ]
            if isinstance(filter, EventDataFilter)
        ]
        for filter in filters:
            filter._top_level_filter = self._top_level_filter
        return filters

    def includes(self, event: Event) -> bool:
        """Does the given event match the criteria of this filter?"""
        return all(filter.includes(event) for filter in self.get_filters())

    def excludes(self, event: Event) -> bool:
        """Would the given filter exclude this event?"""
        return not self.includes(event)

    def build_where_clauses(self) -> Sequence["ColumnExpressionArgument[bool]"]:
        """Convert the criteria to a WHERE clause."""
        clauses: List["ColumnExpressionArgument[bool]"] = []
        for filter in self.get_filters():
            clauses.extend(filter.build_where_clauses())
        return clauses

build_where_clauses()

Convert the criteria to a WHERE clause.

Source code in src/prefect/server/events/filters.py
102
103
104
105
106
107
def build_where_clauses(self) -> Sequence["ColumnExpressionArgument[bool]"]:
    """Convert the criteria to a WHERE clause."""
    clauses: List["ColumnExpressionArgument[bool]"] = []
    for filter in self.get_filters():
        clauses.extend(filter.build_where_clauses())
    return clauses

excludes(event)

Would the given filter exclude this event?

Source code in src/prefect/server/events/filters.py
 98
 99
100
def excludes(self, event: Event) -> bool:
    """Would the given filter exclude this event?"""
    return not self.includes(event)

includes(event)

Does the given event match the criteria of this filter?

Source code in src/prefect/server/events/filters.py
94
95
96
def includes(self, event: Event) -> bool:
    """Does the given event match the criteria of this filter?"""
    return all(filter.includes(event) for filter in self.get_filters())

EventFilter

Bases: EventDataFilter

Source code in src/prefect/server/events/filters.py
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
class EventFilter(EventDataFilter):
    occurred: EventOccurredFilter = Field(
        default_factory=lambda: EventOccurredFilter(),
        description="Filter criteria for when the events occurred",
    )
    event: Optional[EventNameFilter] = Field(
        None,
        description="Filter criteria for the event name",
    )
    any_resource: Optional[EventAnyResourceFilter] = Field(
        None, description="Filter criteria for any resource involved in the event"
    )
    resource: Optional[EventResourceFilter] = Field(
        None, description="Filter criteria for the resource of the event"
    )
    related: Optional[EventRelatedFilter] = Field(
        None, description="Filter criteria for the related resources of the event"
    )
    id: EventIDFilter = Field(
        default_factory=lambda: EventIDFilter(),
        description="Filter criteria for the events' ID",
    )

    order: EventOrder = Field(
        EventOrder.DESC,
        description="The order to return filtered events",
    )

    def build_where_clauses(self) -> Sequence["ColumnExpressionArgument[bool]"]:
        self._top_level_filter = self
        return super().build_where_clauses()

    def _scoped_event_resources(self) -> Select:
        """Returns an event_resources query that is scoped to this filter's scope by occurred."""
        query = sa.select(orm_models.EventResource.event_id).where(
            orm_models.EventResource.occurred >= self.occurred.since,
            orm_models.EventResource.occurred <= self.occurred.until,
        )
        return query

    @property
    def logical_limit(self) -> int:
        """The logical limit for this query, which is a maximum number of rows that it
        _could_ return (regardless of what the caller has requested).  May be used as
        an optimization for DB queries"""
        if self.id and self.id.id:
            # If we're asking for a specific set of IDs, the most we could get back is
            # that number of rows
            return len(self.id.id)

        return sys.maxsize

logical_limit: int property

The logical limit for this query, which is a maximum number of rows that it could return (regardless of what the caller has requested). May be used as an optimization for DB queries

EventOccurredFilter

Bases: EventDataFilter

Source code in src/prefect/server/events/filters.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class EventOccurredFilter(EventDataFilter):
    since: DateTime = Field(
        default_factory=lambda: cast(
            DateTime,
            pendulum.now("UTC").start_of("day").subtract(days=180),
        ),
        description="Only include events after this time (inclusive)",
    )
    until: DateTime = Field(
        default_factory=lambda: cast(DateTime, pendulum.now("UTC")),
        description="Only include events prior to this time (inclusive)",
    )

    def clamp(self, max_duration: timedelta):
        """Limit how far the query can look back based on the given duration"""
        earliest = pendulum.now("UTC") - max_duration
        self.since = max(earliest, cast(pendulum.DateTime, self.since))

    def includes(self, event: Event) -> bool:
        return self.since <= event.occurred <= self.until

    def build_where_clauses(self) -> Sequence["ColumnExpressionArgument[bool]"]:
        filters: List["ColumnExpressionArgument[bool]"] = []

        filters.append(orm_models.Event.occurred >= self.since)
        filters.append(orm_models.Event.occurred <= self.until)

        return filters

clamp(max_duration)

Limit how far the query can look back based on the given duration

Source code in src/prefect/server/events/filters.py
123
124
125
126
def clamp(self, max_duration: timedelta):
    """Limit how far the query can look back based on the given duration"""
    earliest = pendulum.now("UTC") - max_duration
    self.since = max(earliest, cast(pendulum.DateTime, self.since))