Skip to content

prefect_redis.blocks

Redis credentials handling

RedisDatabase

Bases: WritableFileSystem

Block used to manage authentication with a Redis database

Attributes:

Name Type Description
host str

The host of the Redis server

port int

The port the Redis server is running on

db int

The database to write to and read from

username Optional[SecretStr]

The username to use when connecting to the Redis server

password Optional[SecretStr]

The password to use when connecting to the Redis server

ssl bool

Whether to use SSL when connecting to the Redis server

Example

Create a new block from hostname, username and password: ```python from prefect_redis import RedisDatabase

block = RedisDatabase(
    host="myredishost.com", username="redis", password="SuperSecret")
block.save("BLOCK_NAME")
```

Create a new block from a connection string python from prefect_redis import RedisBlock block = RedisBlock.from_url(""redis://redis:SuperSecret@myredishost.com:6379") block.save("BLOCK_NAME")

Get Redis client in order to interact directly with Redis python from prefect_redis import RedisBlock block = RedisBlock.load("BLOCK_NAME") redis_client = block.get_client()

Source code in prefect_redis/blocks.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class RedisDatabase(WritableFileSystem):
    """
    Block used to manage authentication with a Redis database

    Attributes:
        host: The host of the Redis server
        port: The port the Redis server is running on
        db: The database to write to and read from
        username: The username to use when connecting to the Redis server
        password: The password to use when connecting to the Redis server
        ssl: Whether to use SSL when connecting to the Redis server

    Example:
        Create a new block from hostname, username and password:
            ```python
            from prefect_redis import RedisDatabase

            block = RedisDatabase(
                host="myredishost.com", username="redis", password="SuperSecret")
            block.save("BLOCK_NAME")
            ```

        Create a new block from a connection string
            ```python
            from prefect_redis import RedisBlock
            block = RedisBlock.from_url(""redis://redis:SuperSecret@myredishost.com:6379")
            block.save("BLOCK_NAME")
            ```

        Get Redis client in order to interact directly with Redis
            ```python
            from prefect_redis import RedisBlock
            block = RedisBlock.load("BLOCK_NAME")
            redis_client = block.get_client()
            ```
    """

    _logo_url = "https://stprododpcmscdnendpoint.azureedge.net/assets/icons/redis.png"
    _block_type_name = "Redis Database"

    host: str = Field(default="localhost", description="Redis hostname")
    port: int = Field(default=DEFAULT_PORT, description="Redis port")
    db: int = Field(default=0, description="Redis DB index")
    username: Optional[SecretStr] = Field(default=None, description="Redis username")
    password: Optional[SecretStr] = Field(default=None, description="Redis password")
    ssl: bool = Field(default=False, description="Whether to use SSL")

    def block_initialization(self) -> None:
        """Validate parameters"""

        if not self.host:
            raise ValueError("Missing hostname")
        if self.username and not self.password:
            raise ValueError("Missing password")

    async def read_path(self, path: str) -> bytes:
        """Read a redis key

        Args:
            path: Redis key to read from

        Returns:
            Contents at key as bytes
        """
        client = self.get_client()
        ret = await client.get(path)

        await client.close()
        return ret

    async def write_path(self, path: str, content: bytes) -> None:
        """Write to a redis key

        Args:
            path: Redis key to write to
            content: Binary object to write
        """
        client = self.get_client()
        ret = await client.set(path, content)

        await client.close()
        return ret

    def get_client(self) -> redis.Redis:
        """Get Redis Client

        Returns:
            An initialized Redis async client
        """
        return redis.Redis(
            host=self.host,
            port=self.port,
            username=self.username.get_secret_value() if self.username else None,
            password=self.password.get_secret_value() if self.password else None,
            db=self.db,
            ssl=self.ssl,
        )

    def get_async_client(self) -> redis.asyncio.Redis:
        """Get Redis Client

        Returns:
            An initialized Redis async client
        """
        return redis.asyncio.Redis(
            host=self.host,
            port=self.port,
            username=self.username.get_secret_value() if self.username else None,
            password=self.password.get_secret_value() if self.password else None,
            db=self.db,
            ssl=self.ssl,
        )

    @classmethod
    def from_connection_string(
        cls, connection_string: Union[str, SecretStr]
    ) -> "RedisDatabase":
        """Create block from a Redis connection string

        Supports the following URL schemes:
        - `redis://` creates a TCP socket connection
        - `rediss://` creates a SSL wrapped TCP socket connection

        Args:
            connection_string: Redis connection string

        Returns:
            `RedisCredentials` instance
        """
        connection_kwargs = parse_url(
            connection_string
            if isinstance(connection_string, str)
            else connection_string.get_secret_value()
        )
        ssl = connection_kwargs.get("connection_class") == redis.asyncio.SSLConnection
        return cls(
            host=connection_kwargs.get("host", "localhost"),
            port=connection_kwargs.get("port", DEFAULT_PORT),
            db=connection_kwargs.get("db", 0),
            username=connection_kwargs.get("username"),
            password=connection_kwargs.get("password"),
            ssl=ssl,
        )

    def as_connection_params(self) -> Dict[str, Any]:
        """
        Return a dictionary suitable for unpacking
        """
        data = self.model_dump()
        data.pop("block_type_slug", None)
        # Unwrap SecretStr fields
        if self.username is not None:
            data["username"] = self.username.get_secret_value()
        else:
            data.pop("username", None)

        if self.password is not None:
            data["password"] = self.password.get_secret_value()
        else:
            data.pop("password", None)

        return data

as_connection_params()

Return a dictionary suitable for unpacking

Source code in prefect_redis/blocks.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def as_connection_params(self) -> Dict[str, Any]:
    """
    Return a dictionary suitable for unpacking
    """
    data = self.model_dump()
    data.pop("block_type_slug", None)
    # Unwrap SecretStr fields
    if self.username is not None:
        data["username"] = self.username.get_secret_value()
    else:
        data.pop("username", None)

    if self.password is not None:
        data["password"] = self.password.get_secret_value()
    else:
        data.pop("password", None)

    return data

block_initialization()

Validate parameters

Source code in prefect_redis/blocks.py
63
64
65
66
67
68
69
def block_initialization(self) -> None:
    """Validate parameters"""

    if not self.host:
        raise ValueError("Missing hostname")
    if self.username and not self.password:
        raise ValueError("Missing password")

from_connection_string(connection_string) classmethod

Create block from a Redis connection string

Supports the following URL schemes: - redis:// creates a TCP socket connection - rediss:// creates a SSL wrapped TCP socket connection

Parameters:

Name Type Description Default
connection_string Union[str, SecretStr]

Redis connection string

required

Returns:

Type Description
RedisDatabase

RedisCredentials instance

Source code in prefect_redis/blocks.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@classmethod
def from_connection_string(
    cls, connection_string: Union[str, SecretStr]
) -> "RedisDatabase":
    """Create block from a Redis connection string

    Supports the following URL schemes:
    - `redis://` creates a TCP socket connection
    - `rediss://` creates a SSL wrapped TCP socket connection

    Args:
        connection_string: Redis connection string

    Returns:
        `RedisCredentials` instance
    """
    connection_kwargs = parse_url(
        connection_string
        if isinstance(connection_string, str)
        else connection_string.get_secret_value()
    )
    ssl = connection_kwargs.get("connection_class") == redis.asyncio.SSLConnection
    return cls(
        host=connection_kwargs.get("host", "localhost"),
        port=connection_kwargs.get("port", DEFAULT_PORT),
        db=connection_kwargs.get("db", 0),
        username=connection_kwargs.get("username"),
        password=connection_kwargs.get("password"),
        ssl=ssl,
    )

get_async_client()

Get Redis Client

Returns:

Type Description
Redis

An initialized Redis async client

Source code in prefect_redis/blocks.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def get_async_client(self) -> redis.asyncio.Redis:
    """Get Redis Client

    Returns:
        An initialized Redis async client
    """
    return redis.asyncio.Redis(
        host=self.host,
        port=self.port,
        username=self.username.get_secret_value() if self.username else None,
        password=self.password.get_secret_value() if self.password else None,
        db=self.db,
        ssl=self.ssl,
    )

get_client()

Get Redis Client

Returns:

Type Description
Redis

An initialized Redis async client

Source code in prefect_redis/blocks.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def get_client(self) -> redis.Redis:
    """Get Redis Client

    Returns:
        An initialized Redis async client
    """
    return redis.Redis(
        host=self.host,
        port=self.port,
        username=self.username.get_secret_value() if self.username else None,
        password=self.password.get_secret_value() if self.password else None,
        db=self.db,
        ssl=self.ssl,
    )

read_path(path) async

Read a redis key

Parameters:

Name Type Description Default
path str

Redis key to read from

required

Returns:

Type Description
bytes

Contents at key as bytes

Source code in prefect_redis/blocks.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
async def read_path(self, path: str) -> bytes:
    """Read a redis key

    Args:
        path: Redis key to read from

    Returns:
        Contents at key as bytes
    """
    client = self.get_client()
    ret = await client.get(path)

    await client.close()
    return ret

write_path(path, content) async

Write to a redis key

Parameters:

Name Type Description Default
path str

Redis key to write to

required
content bytes

Binary object to write

required
Source code in prefect_redis/blocks.py
86
87
88
89
90
91
92
93
94
95
96
97
async def write_path(self, path: str, content: bytes) -> None:
    """Write to a redis key

    Args:
        path: Redis key to write to
        content: Binary object to write
    """
    client = self.get_client()
    ret = await client.set(path, content)

    await client.close()
    return ret