Skip to content

ConnectionState

faststream.redis.configs.ConnectionState #

ConnectionState(options=None)
Source code in faststream/redis/configs/state.py
def __init__(self, options: dict[str, Any] | None = None) -> None:
    self._options = options or {}

    self._connected = False
    self._client: Redis[bytes] | None = None

client property #

client

connect async #

connect()
Source code in faststream/redis/configs/state.py
async def connect(self) -> "Redis[bytes]":
    pool = ConnectionPool(
        **self._options,
        lib_name="faststream",
        lib_version=__version__,
    )
    client: Redis[bytes] = Redis.from_pool(pool)  # type: ignore[attr-defined]

    self._client = client
    self._connected = True

    return client

disconnect async #

disconnect()
Source code in faststream/redis/configs/state.py
async def disconnect(self) -> None:
    if self._client:
        await self._client.aclose()  # type: ignore[attr-defined]

    self._client = None
    self._connected = False