Skip to content

RedisClusterConnectionState

faststream.redis.configs.RedisClusterConnectionState #

RedisClusterConnectionState(
    options: dict[str, Any] | None = None,
)

Bases: ConnectionState['RedisCluster[bytes]']

Manages a Redis Cluster connection lifecycle.

Uses an async RedisCluster for List/Stream/KV commands and a sync redis.cluster.RedisCluster (wrapped via run_in_executor) for Pub/Sub — the async client doesn't expose publish / pubsub until redis-py >= 8.0.0.

Source code in faststream/redis/configs/state.py
def __init__(self, options: dict[str, Any] | None = None) -> None:
    self._options = options or {}

    self._connected = False
    self._client: RedisCluster[bytes] | None = None
    self._sync_cluster: Any = None
    self._thread_pool: ThreadPoolExecutor | None = None

client property #

client: RedisCluster[bytes]

connect async #

connect() -> RedisCluster[bytes]
Source code in faststream/redis/configs/state.py
async def connect(self) -> "RedisCluster[bytes]":
    if self._connected:
        return self._client  # type: ignore[return-value]

    opts = {k: v for k, v in self._options.items() if v is not None}
    opts["lib_name"] = "faststream"
    opts["lib_version"] = __version__

    client: RedisCluster[bytes] = RedisCluster(**opts)
    self._client = client
    self._connected = True
    return client

disconnect async #

disconnect() -> None
Source code in faststream/redis/configs/state.py
async def disconnect(self) -> None:
    if self._thread_pool is not None:
        self._thread_pool.shutdown(wait=False)
        self._thread_pool = None
        self._sync_cluster = None
    if self._client:
        await self._client.aclose()  # type: ignore[attr-defined]
    self._client = None
    self._connected = False

sync_publish async #

sync_publish(channel: str, body: bytes) -> int
Source code in faststream/redis/configs/state.py
async def sync_publish(self, channel: str, body: bytes) -> int:
    sync = self._get_sync_cluster()
    return await run_in_executor(self._thread_pool, sync.publish, channel, body)

pubsub #

pubsub() -> _SyncPubSubProxy
Source code in faststream/redis/configs/state.py
def pubsub(self) -> "_SyncPubSubProxy":
    if self._thread_pool is not None:
        pool = self._thread_pool
    elif self._client is not None:
        # Test mode: store the pool so disconnect() cleans it up
        pool = self._thread_pool = ThreadPoolExecutor(max_workers=1)
    else:
        msg = "Pub/Sub proxy is not available"
        raise IncorrectState(msg)

    sync = self._get_sync_cluster()
    return _SyncPubSubProxy(sync, pool)