Bases: ConnectionState['RedisCluster[bytes]']
Manages a Redis Cluster connection lifecycle.
Uses an async RedisCluster for List/Stream/KV commands and a sync redis.cluster.RedisCluster (wrapped via run_in_executor) for Pub/Sub — the async client doesn't expose publish / pubsub until redis-py >= 8.0.0.
Source code in faststream/redis/configs/state.py
| def __init__(self, options: dict[str, Any] | None = None) -> None:
self._options = options or {}
self._connected = False
self._client: RedisCluster[bytes] | None = None
self._sync_cluster: Any = None
self._thread_pool: ThreadPoolExecutor | None = None
|
client property
client: RedisCluster[bytes]
connect async
connect() -> RedisCluster[bytes]
Source code in faststream/redis/configs/state.py
| async def connect(self) -> "RedisCluster[bytes]":
if self._connected:
return self._client # type: ignore[return-value]
opts = {k: v for k, v in self._options.items() if v is not None}
opts["lib_name"] = "faststream"
opts["lib_version"] = __version__
client: RedisCluster[bytes] = RedisCluster(**opts)
self._client = client
self._connected = True
return client
|
disconnect async
Source code in faststream/redis/configs/state.py
| async def disconnect(self) -> None:
if self._thread_pool is not None:
self._thread_pool.shutdown(wait=False)
self._thread_pool = None
self._sync_cluster = None
if self._client:
await self._client.aclose() # type: ignore[attr-defined]
self._client = None
self._connected = False
|
sync_publish async
sync_publish(channel: str, body: bytes) -> int
Source code in faststream/redis/configs/state.py
| async def sync_publish(self, channel: str, body: bytes) -> int:
sync = self._get_sync_cluster()
return await run_in_executor(self._thread_pool, sync.publish, channel, body)
|
pubsub
pubsub() -> _SyncPubSubProxy
Source code in faststream/redis/configs/state.py
| def pubsub(self) -> "_SyncPubSubProxy":
if self._thread_pool is not None:
pool = self._thread_pool
elif self._client is not None:
# Test mode: store the pool so disconnect() cleans it up
pool = self._thread_pool = ThreadPoolExecutor(max_workers=1)
else:
msg = "Pub/Sub proxy is not available"
raise IncorrectState(msg)
sync = self._get_sync_cluster()
return _SyncPubSubProxy(sync, pool)
|