Bases: ConnectionState['Redis[bytes]']
Source code in faststream/redis/configs/state.py
| def __init__(self, options: dict[str, Any] | None = None) -> None:
self._options = options or {}
self._connected = False
self._client: ClientT | None = None
self._sync_cluster: Any = None
self._thread_pool: Any = None
|
disconnect async
Source code in faststream/redis/configs/state.py
| async def disconnect(self) -> None:
if self._client:
await self._client.aclose() # type: ignore[attr-defined]
self._client = None
self._connected = False
|
connect async
connect() -> Redis[bytes]
Source code in faststream/redis/configs/state.py
| async def connect(self) -> "Redis[bytes]":
pool = ConnectionPool(
**self._options,
lib_name="faststream",
lib_version=__version__,
)
client: Redis[bytes] = Redis.from_pool(pool) # type: ignore[attr-defined]
self._client = client
self._connected = True
return client
|