Skip to content

prefect.records.base

RecordStore

Bases: ABC

Source code in src/prefect/records/base.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class RecordStore(abc.ABC):
    @abc.abstractmethod
    def read(
        self, key: str, holder: Optional[str] = None
    ) -> Optional[TransactionRecord]:
        """
        Read the transaction record with the given key.

        Args:
            key: Unique identifier for the transaction record.
            holder: Unique identifier for the holder of the lock. If a lock exists on
                the record being written, the read will be blocked until the lock is
                released if the provided holder does not match the holder of the lock.
                If not provided, a default holder based on the current host, process,
                and thread will be used.

        Returns:
            TransactionRecord: The transaction record with the given key.
        """
        ...

    @abc.abstractmethod
    def write(self, key: str, result: "BaseResult", holder: Optional[str] = None):
        """
        Write the transaction record with the given key.

        Args:
            key: Unique identifier for the transaction record.
            record: The transaction record to write.
            holder: Unique identifier for the holder of the lock. If a lock exists on
                the record being written, the write will be rejected if the provided
                holder does not match the holder of the lock. If not provided,
                a default holder based on the current host, process, and thread will
                be used.
        """
        ...

    @abc.abstractmethod
    def exists(self, key: str) -> bool:
        """
        Check if the transaction record with the given key exists.

        Args:
            key: Unique identifier for the transaction record.

        Returns:
            bool: True if the record exists; False otherwise.
        """
        ...

    @abc.abstractmethod
    def supports_isolation_level(self, isolation_level: "IsolationLevel") -> bool:
        """
        Check if the record store supports the given isolation level.

        Args:
            isolation_level: The isolation level to check.

        Returns:
            bool: True if the record store supports the isolation level; False otherwise.
        """
        ...

    def acquire_lock(
        self,
        key: str,
        holder: Optional[str] = None,
        acquire_timeout: Optional[float] = None,
        hold_timeout: Optional[float] = None,
    ) -> bool:
        """
        Acquire a lock for a transaction record with the given key. Will block other
        actors from updating this transaction record until the lock is
        released.

        Args:
            key: Unique identifier for the transaction record.
            holder: Unique identifier for the holder of the lock. If not provided,
                a default holder based on the current host, process, and thread will
                be used.
            acquire_timeout: Max number of seconds to wait for the record to become
                available if it is locked while attempting to acquire a lock. Pass 0
                to attempt to acquire a lock without waiting. Blocks indefinitely by
                default.
            hold_timeout: Max number of seconds to hold the lock for. Holds the lock
                indefinitely by default.

        Returns:
            bool: True if the lock was successfully acquired; False otherwise.
        """
        raise NotImplementedError

    def release_lock(self, key: str, holder: Optional[str] = None):
        """
        Releases the lock on the corresponding transaction record.

        Args:
            key: Unique identifier for the transaction record.
            holder: Unique identifier for the holder of the lock. Must match the
                holder provided when acquiring the lock.
        """
        raise NotImplementedError

    def is_locked(self, key: str) -> bool:
        """
        Simple check to see if the corresponding record is currently locked.

        Args:
            key: Unique identifier for the transaction record.

        Returns:
            True is the record is locked; False otherwise.
        """
        raise NotImplementedError

    def is_lock_holder(self, key: str, holder: Optional[str] = None) -> bool:
        """
        Check if the current holder is the lock holder for the transaction record.

        Args:
            key: Unique identifier for the transaction record.
            holder: Unique identifier for the holder of the lock. If not provided,
                a default holder based on the current host, process, and thread will
                be used.

        Returns:
            bool: True if the current holder is the lock holder; False otherwise.
        """
        raise NotImplementedError

    def wait_for_lock(self, key: str, timeout: Optional[float] = None) -> bool:
        """
        Wait for the corresponding transaction record to become free.

        Args:
            key: Unique identifier for the transaction record.
            timeout: Maximum time to wait. None means to wait indefinitely.

        Returns:
            bool: True if the lock becomes free within the timeout; False
                otherwise.
        """
        ...

    @staticmethod
    def generate_default_holder() -> str:
        """
        Generate a default holder string using hostname, PID, and thread ID.

        Returns:
            str: A unique identifier string.
        """
        hostname = socket.gethostname()
        pid = os.getpid()
        thread_name = threading.current_thread().name
        thread_id = threading.get_ident()
        return f"{hostname}:{pid}:{thread_id}:{thread_name}"

    @contextmanager
    def lock(
        self,
        key: str,
        holder: Optional[str] = None,
        acquire_timeout: Optional[float] = None,
        hold_timeout: Optional[float] = None,
    ):
        """
        Context manager to lock the transaction record during the execution
        of the nested code block.

        Args:
            key: Unique identifier for the transaction record.
            holder: Unique identifier for the holder of the lock. If not provided,
                a default holder based on the current host, process, and thread will
                be used.
            acquire_timeout: Max number of seconds to wait for the record to become
                available if it is locked while attempting to acquire a lock. Pass 0
                to attempt to acquire a lock without waiting. Blocks indefinitely by
                default.
            hold_timeout: Max number of seconds to hold the lock for. Holds the lock
                indefinitely by default.

        Example:
            Hold a lock while during an operation:
                ```python
                    with TransactionRecord(key="my-transaction-record-key").lock():
                        do_stuff()
                ```
        """
        self.acquire_lock(
            key=key,
            holder=holder,
            acquire_timeout=acquire_timeout,
            hold_timeout=hold_timeout,
        )

        try:
            yield
        finally:
            self.release_lock(key=key, holder=holder)

acquire_lock(key, holder=None, acquire_timeout=None, hold_timeout=None)

Acquire a lock for a transaction record with the given key. Will block other actors from updating this transaction record until the lock is released.

Parameters:

Name Type Description Default
key str

Unique identifier for the transaction record.

required
holder Optional[str]

Unique identifier for the holder of the lock. If not provided, a default holder based on the current host, process, and thread will be used.

None
acquire_timeout Optional[float]

Max number of seconds to wait for the record to become available if it is locked while attempting to acquire a lock. Pass 0 to attempt to acquire a lock without waiting. Blocks indefinitely by default.

None
hold_timeout Optional[float]

Max number of seconds to hold the lock for. Holds the lock indefinitely by default.

None

Returns:

Name Type Description
bool bool

True if the lock was successfully acquired; False otherwise.

Source code in src/prefect/records/base.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def acquire_lock(
    self,
    key: str,
    holder: Optional[str] = None,
    acquire_timeout: Optional[float] = None,
    hold_timeout: Optional[float] = None,
) -> bool:
    """
    Acquire a lock for a transaction record with the given key. Will block other
    actors from updating this transaction record until the lock is
    released.

    Args:
        key: Unique identifier for the transaction record.
        holder: Unique identifier for the holder of the lock. If not provided,
            a default holder based on the current host, process, and thread will
            be used.
        acquire_timeout: Max number of seconds to wait for the record to become
            available if it is locked while attempting to acquire a lock. Pass 0
            to attempt to acquire a lock without waiting. Blocks indefinitely by
            default.
        hold_timeout: Max number of seconds to hold the lock for. Holds the lock
            indefinitely by default.

    Returns:
        bool: True if the lock was successfully acquired; False otherwise.
    """
    raise NotImplementedError

exists(key) abstractmethod

Check if the transaction record with the given key exists.

Parameters:

Name Type Description Default
key str

Unique identifier for the transaction record.

required

Returns:

Name Type Description
bool bool

True if the record exists; False otherwise.

Source code in src/prefect/records/base.py
61
62
63
64
65
66
67
68
69
70
71
72
@abc.abstractmethod
def exists(self, key: str) -> bool:
    """
    Check if the transaction record with the given key exists.

    Args:
        key: Unique identifier for the transaction record.

    Returns:
        bool: True if the record exists; False otherwise.
    """
    ...

generate_default_holder() staticmethod

Generate a default holder string using hostname, PID, and thread ID.

Returns:

Name Type Description
str str

A unique identifier string.

Source code in src/prefect/records/base.py
168
169
170
171
172
173
174
175
176
177
178
179
180
@staticmethod
def generate_default_holder() -> str:
    """
    Generate a default holder string using hostname, PID, and thread ID.

    Returns:
        str: A unique identifier string.
    """
    hostname = socket.gethostname()
    pid = os.getpid()
    thread_name = threading.current_thread().name
    thread_id = threading.get_ident()
    return f"{hostname}:{pid}:{thread_id}:{thread_name}"

is_lock_holder(key, holder=None)

Check if the current holder is the lock holder for the transaction record.

Parameters:

Name Type Description Default
key str

Unique identifier for the transaction record.

required
holder Optional[str]

Unique identifier for the holder of the lock. If not provided, a default holder based on the current host, process, and thread will be used.

None

Returns:

Name Type Description
bool bool

True if the current holder is the lock holder; False otherwise.

Source code in src/prefect/records/base.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def is_lock_holder(self, key: str, holder: Optional[str] = None) -> bool:
    """
    Check if the current holder is the lock holder for the transaction record.

    Args:
        key: Unique identifier for the transaction record.
        holder: Unique identifier for the holder of the lock. If not provided,
            a default holder based on the current host, process, and thread will
            be used.

    Returns:
        bool: True if the current holder is the lock holder; False otherwise.
    """
    raise NotImplementedError

is_locked(key)

Simple check to see if the corresponding record is currently locked.

Parameters:

Name Type Description Default
key str

Unique identifier for the transaction record.

required

Returns:

Type Description
bool

True is the record is locked; False otherwise.

Source code in src/prefect/records/base.py
127
128
129
130
131
132
133
134
135
136
137
def is_locked(self, key: str) -> bool:
    """
    Simple check to see if the corresponding record is currently locked.

    Args:
        key: Unique identifier for the transaction record.

    Returns:
        True is the record is locked; False otherwise.
    """
    raise NotImplementedError

lock(key, holder=None, acquire_timeout=None, hold_timeout=None)

Context manager to lock the transaction record during the execution of the nested code block.

Parameters:

Name Type Description Default
key str

Unique identifier for the transaction record.

required
holder Optional[str]

Unique identifier for the holder of the lock. If not provided, a default holder based on the current host, process, and thread will be used.

None
acquire_timeout Optional[float]

Max number of seconds to wait for the record to become available if it is locked while attempting to acquire a lock. Pass 0 to attempt to acquire a lock without waiting. Blocks indefinitely by default.

None
hold_timeout Optional[float]

Max number of seconds to hold the lock for. Holds the lock indefinitely by default.

None
Example

Hold a lock while during an operation: python with TransactionRecord(key="my-transaction-record-key").lock(): do_stuff()

Source code in src/prefect/records/base.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@contextmanager
def lock(
    self,
    key: str,
    holder: Optional[str] = None,
    acquire_timeout: Optional[float] = None,
    hold_timeout: Optional[float] = None,
):
    """
    Context manager to lock the transaction record during the execution
    of the nested code block.

    Args:
        key: Unique identifier for the transaction record.
        holder: Unique identifier for the holder of the lock. If not provided,
            a default holder based on the current host, process, and thread will
            be used.
        acquire_timeout: Max number of seconds to wait for the record to become
            available if it is locked while attempting to acquire a lock. Pass 0
            to attempt to acquire a lock without waiting. Blocks indefinitely by
            default.
        hold_timeout: Max number of seconds to hold the lock for. Holds the lock
            indefinitely by default.

    Example:
        Hold a lock while during an operation:
            ```python
                with TransactionRecord(key="my-transaction-record-key").lock():
                    do_stuff()
            ```
    """
    self.acquire_lock(
        key=key,
        holder=holder,
        acquire_timeout=acquire_timeout,
        hold_timeout=hold_timeout,
    )

    try:
        yield
    finally:
        self.release_lock(key=key, holder=holder)

read(key, holder=None) abstractmethod

Read the transaction record with the given key.

Parameters:

Name Type Description Default
key str

Unique identifier for the transaction record.

required
holder Optional[str]

Unique identifier for the holder of the lock. If a lock exists on the record being written, the read will be blocked until the lock is released if the provided holder does not match the holder of the lock. If not provided, a default holder based on the current host, process, and thread will be used.

None

Returns:

Name Type Description
TransactionRecord Optional[TransactionRecord]

The transaction record with the given key.

Source code in src/prefect/records/base.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@abc.abstractmethod
def read(
    self, key: str, holder: Optional[str] = None
) -> Optional[TransactionRecord]:
    """
    Read the transaction record with the given key.

    Args:
        key: Unique identifier for the transaction record.
        holder: Unique identifier for the holder of the lock. If a lock exists on
            the record being written, the read will be blocked until the lock is
            released if the provided holder does not match the holder of the lock.
            If not provided, a default holder based on the current host, process,
            and thread will be used.

    Returns:
        TransactionRecord: The transaction record with the given key.
    """
    ...

release_lock(key, holder=None)

Releases the lock on the corresponding transaction record.

Parameters:

Name Type Description Default
key str

Unique identifier for the transaction record.

required
holder Optional[str]

Unique identifier for the holder of the lock. Must match the holder provided when acquiring the lock.

None
Source code in src/prefect/records/base.py
116
117
118
119
120
121
122
123
124
125
def release_lock(self, key: str, holder: Optional[str] = None):
    """
    Releases the lock on the corresponding transaction record.

    Args:
        key: Unique identifier for the transaction record.
        holder: Unique identifier for the holder of the lock. Must match the
            holder provided when acquiring the lock.
    """
    raise NotImplementedError

supports_isolation_level(isolation_level) abstractmethod

Check if the record store supports the given isolation level.

Parameters:

Name Type Description Default
isolation_level IsolationLevel

The isolation level to check.

required

Returns:

Name Type Description
bool bool

True if the record store supports the isolation level; False otherwise.

Source code in src/prefect/records/base.py
74
75
76
77
78
79
80
81
82
83
84
85
@abc.abstractmethod
def supports_isolation_level(self, isolation_level: "IsolationLevel") -> bool:
    """
    Check if the record store supports the given isolation level.

    Args:
        isolation_level: The isolation level to check.

    Returns:
        bool: True if the record store supports the isolation level; False otherwise.
    """
    ...

wait_for_lock(key, timeout=None)

Wait for the corresponding transaction record to become free.

Parameters:

Name Type Description Default
key str

Unique identifier for the transaction record.

required
timeout Optional[float]

Maximum time to wait. None means to wait indefinitely.

None

Returns:

Name Type Description
bool bool

True if the lock becomes free within the timeout; False otherwise.

Source code in src/prefect/records/base.py
154
155
156
157
158
159
160
161
162
163
164
165
166
def wait_for_lock(self, key: str, timeout: Optional[float] = None) -> bool:
    """
    Wait for the corresponding transaction record to become free.

    Args:
        key: Unique identifier for the transaction record.
        timeout: Maximum time to wait. None means to wait indefinitely.

    Returns:
        bool: True if the lock becomes free within the timeout; False
            otherwise.
    """
    ...

write(key, result, holder=None) abstractmethod

Write the transaction record with the given key.

Parameters:

Name Type Description Default
key str

Unique identifier for the transaction record.

required
record

The transaction record to write.

required
holder Optional[str]

Unique identifier for the holder of the lock. If a lock exists on the record being written, the write will be rejected if the provided holder does not match the holder of the lock. If not provided, a default holder based on the current host, process, and thread will be used.

None
Source code in src/prefect/records/base.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@abc.abstractmethod
def write(self, key: str, result: "BaseResult", holder: Optional[str] = None):
    """
    Write the transaction record with the given key.

    Args:
        key: Unique identifier for the transaction record.
        record: The transaction record to write.
        holder: Unique identifier for the holder of the lock. If a lock exists on
            the record being written, the write will be rejected if the provided
            holder does not match the holder of the lock. If not provided,
            a default holder based on the current host, process, and thread will
            be used.
    """
    ...

TransactionRecord dataclass

A dataclass representation of a transaction record.

Source code in src/prefect/records/base.py
14
15
16
17
18
19
20
21
@dataclass
class TransactionRecord:
    """
    A dataclass representation of a transaction record.
    """

    key: str
    result: "BaseResult"