Skip to content

prefect_redis.tasks

Prebuilt Prefect tasks for reading and writing data to Redis

redis_execute(credentials, cmd) async

Execute Redis command

Parameters:

Name Type Description Default
credentials RedisDatabase

Redis credential block

required
cmd str

Command to be executed

required

Returns:

Type Description
Any

Command response

Source code in prefect_redis/tasks.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@task
async def redis_execute(
    credentials: "RedisDatabase",
    cmd: str,
) -> Any:
    """Execute Redis command

    Args:
        credentials: Redis credential block
        cmd: Command to be executed

    Returns:
        Command response
    """
    async with credentials.get_async_client() as client:
        ret = await client.execute_command(cmd)

    return ret

redis_get(credentials, key) async

Get an object stored at a redis key. Will use cloudpickle to reconstruct the object.

Parameters:

Name Type Description Default
credentials RedisDatabase

Redis credential block

required
key str

Key to get

required

Returns:

Type Description
Any

Fully reconstructed object, decoded brom bytes in redis

Source code in prefect_redis/tasks.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@task
async def redis_get(
    credentials: "RedisDatabase",
    key: str,
) -> Any:
    """Get an object stored at a redis key. Will use cloudpickle to reconstruct
    the object.

    Args:
        credentials: Redis credential block
        key: Key to get

    Returns:
        Fully reconstructed object, decoded brom bytes in redis
    """
    binary_obj = await redis_get_binary.fn(credentials, key)

    return cloudpickle.loads(binary_obj)

redis_get_binary(credentials, key) async

Get an bytes stored at a redis key

Parameters:

Name Type Description Default
credentials RedisDatabase

Redis credential block

required
key str

Key to get

required

Returns:

Type Description
bytes

Bytes from key in Redis

Source code in prefect_redis/tasks.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@task
async def redis_get_binary(
    credentials: "RedisDatabase",
    key: str,
) -> bytes:
    """Get an bytes stored at a redis key

    Args:
        credentials: Redis credential block
        key: Key to get

    Returns:
        Bytes from `key` in Redis
    """
    async with credentials.get_async_client() as client:
        ret = await client.get(key)
        return ret

redis_set(credentials, key, value, ex=None, px=None, nx=False, xx=False) async

Set a Redis key to a any value.

Will use cloudpickle to convert value to binary representation.

Parameters:

Name Type Description Default
credentials RedisDatabase

Redis credential block

required
key str

Key to be set

required
value Any

Value to be set to key. Does not accept open connections such as database-connections

required
ex Optional[float]

If provided, sets an expire flag in seconds on key set

None
px Optional[float]

If provided, sets an expire flag in milliseconds on key set

None
nx bool

If set to True, set the value at key to value only if it does not already exist

False
xx bool

If set tot True, set the value at key to value only if it already exists

False
Source code in prefect_redis/tasks.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@task
async def redis_set(
    credentials: "RedisDatabase",
    key: str,
    value: Any,
    ex: Optional[float] = None,
    px: Optional[float] = None,
    nx: bool = False,
    xx: bool = False,
) -> None:
    """
    Set a Redis key to a any value.

    Will use `cloudpickle` to convert `value` to binary representation.

    Args:
        credentials: Redis credential block
        key: Key to be set
        value: Value to be set to `key`. Does not accept open connections such as
            database-connections
        ex: If provided, sets an expire flag in seconds on `key` set
        px: If provided, sets an expire flag in milliseconds on `key` set
        nx: If set to `True`, set the value at `key` to `value` only if it does not
            already exist
        xx: If set tot `True`, set the value at `key` to `value` only if it already
            exists
    """
    return await redis_set_binary.fn(
        credentials, key, cloudpickle.dumps(value), ex, px, nx, xx
    )

redis_set_binary(credentials, key, value, ex=None, px=None, nx=False, xx=False) async

Set a Redis key to a binary value

Parameters:

Name Type Description Default
credentials RedisDatabase

Redis credential block

required
key str

Key to be set

required
value bytes

Value to be set to key. Must be bytes

required
ex Optional[float]

If provided, sets an expire flag in seconds on key set

None
px Optional[float]

If provided, sets an expire flag in milliseconds on key set

None
nx bool

If set to True, set the value at key to value only if it does not already exist

False
xx bool

If set tot True, set the value at key to value only if it already exists

False
Source code in prefect_redis/tasks.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@task
async def redis_set_binary(
    credentials: "RedisDatabase",
    key: str,
    value: bytes,
    ex: Optional[float] = None,
    px: Optional[float] = None,
    nx: bool = False,
    xx: bool = False,
) -> None:
    """
    Set a Redis key to a binary value

    Args:
        credentials: Redis credential block
        key: Key to be set
        value: Value to be set to `key`. Must be bytes
        ex: If provided, sets an expire flag in seconds on `key` set
        px: If provided, sets an expire flag in milliseconds on `key` set
        nx: If set to `True`, set the value at `key` to `value` only if it does not
            already exist
        xx: If set tot `True`, set the value at `key` to `value` only if it already
            exists
    """
    async with credentials.get_async_client() as client:
        await client.set(key, value, ex=ex, px=px, nx=nx, xx=xx)