classRedisLockManager(LockManager):""" A lock manager that uses Redis as a backend. Attributes: host: The host of the Redis server port: The port the Redis server is running on db: The database to write to and read from username: The username to use when connecting to the Redis server password: The password to use when connecting to the Redis server ssl: Whether to use SSL when connecting to the Redis server client: The Redis client used to communicate with the Redis server async_client: The asynchronous Redis client used to communicate with the Redis server Example: Use with a cache policy: ```python from prefect import task from prefect.cache_policies import TASK_SOURCE, INPUTS from prefect.isolation_levels import SERIALIZABLE from prefect_redis import RedisLockManager cache_policy = (INPUTS + TASK_SOURCE).configure( isolation_level=SERIALIZABLE, lock_manager=RedisLockManager(host="my-redis-host"), ) @task(cache_policy=cache_policy) def my_cached_task(x: int): return x + 42 ``` Configure with a `RedisDatabase` block: ```python from prefect_redis import RedisDatabase, RedisLockManager block = RedisDatabase(host="my-redis-host") lock_manager = RedisLockManager(**block.as_connection_params()) ``` """def__init__(self,host:str="localhost",port:int=6379,db:int=0,username:Optional[str]=None,password:Optional[str]=None,ssl:bool=False,)->None:self.host=hostself.port=portself.db=dbself.username=usernameself.password=passwordself.ssl=sslself.client=Redis(host=self.host,port=self.port,db=self.db,username=self.username,password=self.password,)self.async_client=AsyncRedis(host=self.host,port=self.port,db=self.db,username=self.username,password=self.password,)self._locks={}@staticmethoddef_lock_name_for_key(key:str)->str:returnf"lock:{key}"defacquire_lock(self,key:str,holder:str,acquire_timeout:Optional[float]=None,hold_timeout:Optional[float]=None,)->bool:lock_name=self._lock_name_for_key(key)lock=self._locks.get(lock_name)iflockisnotNoneandself.is_lock_holder(key,holder):returnTrueelse:lock=Lock(self.client,lock_name,timeout=hold_timeout,thread_local=False)lock_acquired=lock.acquire(token=holder,blocking_timeout=acquire_timeout)iflock_acquired:self._locks[lock_name]=lockreturnlock_acquiredasyncdefaacquire_lock(self,key:str,holder:str,acquire_timeout:Optional[float]=None,hold_timeout:Optional[float]=None,)->bool:lock_name=self._lock_name_for_key(key)lock=self._locks.get(lock_name)iflockisnotNoneandself.is_lock_holder(key,holder):returnTrueelse:lock=AsyncLock(self.async_client,lock_name,timeout=hold_timeout,thread_local=False)lock_acquired=awaitlock.acquire(token=holder,blocking_timeout=acquire_timeout)iflock_acquired:self._locks[lock_name]=lockreturnlock_acquireddefrelease_lock(self,key:str,holder:str)->None:lock_name=self._lock_name_for_key(key)lock=self._locks.get(lock_name)iflockisNoneornotself.is_lock_holder(key,holder):raiseValueError(f"No lock held by {holder} for transaction with key {key}")lock.release()delself._locks[lock_name]defwait_for_lock(self,key:str,timeout:Optional[float]=None)->bool:lock_name=self._lock_name_for_key(key)lock=Lock(self.client,lock_name)lock_freed=lock.acquire(blocking_timeout=timeout)iflock_freed:lock.release()returnlock_freedasyncdefawait_for_lock(self,key:str,timeout:Optional[float]=None)->bool:lock_name=self._lock_name_for_key(key)lock=AsyncLock(self.async_client,lock_name)lock_freed=awaitlock.acquire(blocking_timeout=timeout)iflock_freed:lock.release()returnlock_freeddefis_locked(self,key:str)->bool:lock_name=self._lock_name_for_key(key)lock=Lock(self.client,lock_name)returnlock.locked()defis_lock_holder(self,key:str,holder:str)->bool:lock_name=self._lock_name_for_key(key)lock=self._locks.get(lock_name)iflockisNone:returnFalseif(token:=getattr(lock.local,"token",None))isNone:returnFalsereturntoken.decode()==holder