Skip to content

prefect.server.events.storage

process_time_based_counts(filter, time_unit, time_interval, counts)

Common logic for processing time-based counts across different event backends.

When doing time-based counting we want to do two things:

  1. Backfill any missing intervals with 0 counts.
  2. Update the start/end times that are emitted to match the beginning and end of the intervals rather than having them reflect the true max/min occurred time of the events themselves.
Source code in src/prefect/server/events/storage/__init__.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def process_time_based_counts(
    filter: "EventFilter",
    time_unit: TimeUnit,
    time_interval: float,
    counts: List[EventCount],
) -> List[EventCount]:
    """
    Common logic for processing time-based counts across different event backends.

    When doing time-based counting we want to do two things:

    1. Backfill any missing intervals with 0 counts.
    2. Update the start/end times that are emitted to match the beginning and
    end of the intervals rather than having them reflect the true max/min
    occurred time of the events themselves.
    """

    span_generator = time_unit.get_interval_spans(
        filter.occurred.since, filter.occurred.until, time_interval
    )

    spans_since_pivot = next(span_generator)
    assert isinstance(spans_since_pivot, int)

    backfilled_counts = [
        EventCount(
            value=str(i),
            count=0,
            label=start_time.isoformat(),
            start_time=start_time,
            end_time=end_time,
        )
        for i, (start_time, end_time) in enumerate(span_generator)
    ]

    for count in counts:
        index = int(float(count.value)) - spans_since_pivot
        backfilled_counts[index].count = count.count

    return backfilled_counts