Skip to content

prefect.blocks.redis

RedisStorageContainer

Bases: WritableFileSystem

Block used to interact with Redis as a filesystem

Attributes:

Name Type Description
host str

The value to store.

port int

The value to store.

db int

The value to store.

username str

The value to store.

password str

The value to store.

connection_string str

The value to store.

Example

Create a new block from hostname, username and password:

from prefect.blocks.redis import RedisStorageContainer

block = RedisStorageContainer.from_host(
    host="myredishost.com", username="redis", password="SuperSecret")
block.save("BLOCK_NAME")

Create a new block from a connection string

from prefect.blocks.redis import RedisStorageContainer
block = RedisStorageContainer.from_url(""redis://redis:SuperSecret@myredishost.com:6379")
block.save("BLOCK_NAME")
Source code in src/prefect/blocks/redis.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class RedisStorageContainer(WritableFileSystem):
    """
    Block used to interact with Redis as a filesystem

    Attributes:
        host (str): The value to store.
        port (int): The value to store.
        db (int): The value to store.
        username (str): The value to store.
        password (str): The value to store.
        connection_string (str): The value to store.

    Example:
        Create a new block from hostname, username and password:
        ```python
        from prefect.blocks.redis import RedisStorageContainer

        block = RedisStorageContainer.from_host(
            host="myredishost.com", username="redis", password="SuperSecret")
        block.save("BLOCK_NAME")
        ```

        Create a new block from a connection string
        ```python
        from prefect.blocks.redis import RedisStorageContainer
        block = RedisStorageContainer.from_url(""redis://redis:SuperSecret@myredishost.com:6379")
        block.save("BLOCK_NAME")
        ```
    """

    _logo_url = "https://stprododpcmscdnendpoint.azureedge.net/assets/icons/redis.png"

    host: Optional[str] = Field(default=None, description="Redis hostname")
    port: int = Field(default=6379, description="Redis port")
    db: int = Field(default=0, description="Redis DB index")
    username: Optional[SecretStr] = Field(default=None, description="Redis username")
    password: Optional[SecretStr] = Field(default=None, description="Redis password")
    connection_string: Optional[SecretStr] = Field(
        default=None, description="Redis connection string"
    )

    def block_initialization(self) -> None:
        if self.connection_string:
            return
        if not self.host:
            raise ValueError("Initialization error: 'host' is required but missing.")
        if self.username and not self.password:
            raise ValueError(
                "Initialization error: 'username' is provided, but 'password' is missing. Both are required."
            )

    @sync_compatible
    async def read_path(self, path: Union[Path, str]):
        """Read the redis content at `path`

        Args:
            path: Redis key to read from

        Returns:
            Contents at key as bytes
        """
        async with self._client() as client:
            return await client.get(str(path))

    @sync_compatible
    async def write_path(self, path: Union[Path, str], content: bytes):
        """Write `content` to the redis at `path`

        Args:
            path: Redis key to write to
            content: Binary object to write
        """

        async with self._client() as client:
            return await client.set(str(path), content)

    @asynccontextmanager
    async def _client(self) -> AsyncGenerator[redis.Redis, None]:
        if self.connection_string:
            client = redis.Redis.from_url(self.connection_string.get_secret_value())
        else:
            assert self.host
            client = redis.Redis(
                host=self.host,
                port=self.port,
                username=self.username.get_secret_value() if self.username else None,
                password=self.password.get_secret_value() if self.password else None,
                db=self.db,
            )

        try:
            yield client
        finally:
            await client.aclose()

    @classmethod
    def from_host(
        cls,
        host: str,
        port: int = 6379,
        db: int = 0,
        username: Union[None, str, SecretStr] = None,
        password: Union[None, str, SecretStr] = None,
    ) -> Self:
        """Create block from hostname, username and password

        Args:
            host: Redis hostname
            username: Redis username
            password: Redis password
            port: Redis port

        Returns:
            `RedisStorageContainer` instance
        """

        username = SecretStr(username) if isinstance(username, str) else username
        password = SecretStr(password) if isinstance(password, str) else password

        return cls(host=host, port=port, db=db, username=username, password=password)

    @classmethod
    def from_connection_string(cls, connection_string: Union[str, SecretStr]) -> Self:
        """Create block from a Redis connection string

        Supports the following URL schemes:
        - `redis://` creates a TCP socket connection
        - `rediss://` creates a SSL wrapped TCP socket connection
        - `unix://` creates a Unix Domain Socket connection

        See [Redis docs](https://redis.readthedocs.io/en/stable/examples
        /connection_examples.html#Connecting-to-Redis-instances-by-specifying-a-URL
        -scheme.) for more info.

        Args:
            connection_string: Redis connection string

        Returns:
            `RedisStorageContainer` instance
        """

        connection_string = (
            SecretStr(connection_string)
            if isinstance(connection_string, str)
            else connection_string
        )

        return cls(connection_string=connection_string)

from_connection_string(connection_string) classmethod

Create block from a Redis connection string

Supports the following URL schemes: - redis:// creates a TCP socket connection - rediss:// creates a SSL wrapped TCP socket connection - unix:// creates a Unix Domain Socket connection

See Redis docs for more info.

Parameters:

Name Type Description Default
connection_string Union[str, SecretStr]

Redis connection string

required

Returns:

Type Description
Self

RedisStorageContainer instance

Source code in src/prefect/blocks/redis.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@classmethod
def from_connection_string(cls, connection_string: Union[str, SecretStr]) -> Self:
    """Create block from a Redis connection string

    Supports the following URL schemes:
    - `redis://` creates a TCP socket connection
    - `rediss://` creates a SSL wrapped TCP socket connection
    - `unix://` creates a Unix Domain Socket connection

    See [Redis docs](https://redis.readthedocs.io/en/stable/examples
    /connection_examples.html#Connecting-to-Redis-instances-by-specifying-a-URL
    -scheme.) for more info.

    Args:
        connection_string: Redis connection string

    Returns:
        `RedisStorageContainer` instance
    """

    connection_string = (
        SecretStr(connection_string)
        if isinstance(connection_string, str)
        else connection_string
    )

    return cls(connection_string=connection_string)

from_host(host, port=6379, db=0, username=None, password=None) classmethod

Create block from hostname, username and password

Parameters:

Name Type Description Default
host str

Redis hostname

required
username Union[None, str, SecretStr]

Redis username

None
password Union[None, str, SecretStr]

Redis password

None
port int

Redis port

6379

Returns:

Type Description
Self

RedisStorageContainer instance

Source code in src/prefect/blocks/redis.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@classmethod
def from_host(
    cls,
    host: str,
    port: int = 6379,
    db: int = 0,
    username: Union[None, str, SecretStr] = None,
    password: Union[None, str, SecretStr] = None,
) -> Self:
    """Create block from hostname, username and password

    Args:
        host: Redis hostname
        username: Redis username
        password: Redis password
        port: Redis port

    Returns:
        `RedisStorageContainer` instance
    """

    username = SecretStr(username) if isinstance(username, str) else username
    password = SecretStr(password) if isinstance(password, str) else password

    return cls(host=host, port=port, db=db, username=username, password=password)

read_path(path) async

Read the redis content at path

Parameters:

Name Type Description Default
path Union[Path, str]

Redis key to read from

required

Returns:

Type Description

Contents at key as bytes

Source code in src/prefect/blocks/redis.py
72
73
74
75
76
77
78
79
80
81
82
83
@sync_compatible
async def read_path(self, path: Union[Path, str]):
    """Read the redis content at `path`

    Args:
        path: Redis key to read from

    Returns:
        Contents at key as bytes
    """
    async with self._client() as client:
        return await client.get(str(path))

write_path(path, content) async

Write content to the redis at path

Parameters:

Name Type Description Default
path Union[Path, str]

Redis key to write to

required
content bytes

Binary object to write

required
Source code in src/prefect/blocks/redis.py
85
86
87
88
89
90
91
92
93
94
95
@sync_compatible
async def write_path(self, path: Union[Path, str], content: bytes):
    """Write `content` to the redis at `path`

    Args:
        path: Redis key to write to
        content: Binary object to write
    """

    async with self._client() as client:
        return await client.set(str(path), content)