Skip to content

prefect.records.result_store

ResultRecordStore dataclass

Bases: RecordStore

A record store for result records.

Collocates result metadata with result data.

Source code in src/prefect/records/result_store.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class ResultRecordStore(RecordStore):
    """
    A record store for result records.

    Collocates result metadata with result data.
    """

    result_store: ResultStore
    cache: Optional[PersistedResult] = None

    def exists(self, key: str) -> bool:
        try:
            record = self.read(key)
            if not record:
                return False
            result = record.result
            result.get(_sync=True)
            if result.expiration:
                # if the result has an expiration,
                # check if it is still in the future
                exists = result.expiration > pendulum.now("utc")
            else:
                exists = True
            self.cache = result
            return exists
        except Exception:
            return False

    def read(self, key: str, holder: Optional[str] = None) -> TransactionRecord:
        if self.cache:
            return TransactionRecord(key=key, result=self.cache)
        try:
            result = PersistedResult(
                serializer_type=self.result_store.serializer.type,
                storage_block_id=self.result_store.result_storage_block_id,
                storage_key=key,
            )
            return TransactionRecord(key=key, result=result)
        except Exception:
            # this is a bit of a bandaid for functionality
            raise ValueError("Result could not be read")

    def write(self, key: str, result: Any, holder: Optional[str] = None) -> None:
        if isinstance(result, PersistedResult):
            # if the value is already a persisted result, write it
            result.write(_sync=True)
        elif not isinstance(result, BaseResult):
            run_coro_as_sync(self.result_store.create_result(obj=result, key=key))

    def supports_isolation_level(self, isolation_level: IsolationLevel) -> bool:
        return isolation_level == IsolationLevel.READ_COMMITTED