Skip to content

RedisClusterBroker

faststream.redis.RedisClusterBroker #

RedisClusterBroker(
    url: str = "redis://localhost:6379",
    **kwargs: Unpack[RedisClusterParams],
)

Bases: RedisBroker

A Redis Cluster broker.

Source code in faststream/redis/broker/cluster_broker.py
def __init__(
    self,
    url: str = "redis://localhost:6379",
    **kwargs: Unpack["RedisClusterParams"],
) -> None:
    startup_nodes = kwargs.get("startup_nodes") or ()
    message_format = kwargs.pop("message_format", BinaryMessageFormatV1)
    specification_url = kwargs.pop("specification_url", None)
    protocol = kwargs.pop("protocol", None)
    self.message_format = message_format

    if specification_url is None:
        specification_url = url
    if protocol is None:
        protocol = urlparse(specification_url).scheme

    connection_options = self._resolve_url_options(
        url,
        startup_nodes=startup_nodes,
        host=kwargs.get("host", EMPTY),
        port=kwargs.get("port", EMPTY),
        security=kwargs.get("security"),
        **{
            k: v
            for k, v in kwargs.items()
            if k
            not in NON_CONNECTION_PARAMS
            | {"startup_nodes", "host", "port", "security"}
        },
    )

    connection_state = RedisClusterConnectionState(connection_options)

    super(RedisBroker, self).__init__(
        **connection_options,
        routers=kwargs.get("routers", ()),
        config=RedisBrokerConfig(
            connection=connection_state,
            producer=RedisClusterFastProducer(
                connection=connection_state,
                cluster_state=connection_state,
                parser=kwargs.get("parser"),
                decoder=kwargs.get("decoder"),
                message_format=self.message_format,
                serializer=kwargs.get("serializer"),
            ),
            message_format=self.message_format,
            broker_middlewares=kwargs.get("middlewares", ()),
            broker_parser=kwargs.get("parser"),
            broker_decoder=kwargs.get("decoder"),
            broker_codec=kwargs.get("codec"),
            logger=make_redis_logger_state(
                logger=kwargs.get("logger", EMPTY),
                log_level=kwargs.get("log_level", logging.INFO),
            ),
            fd_config=FastDependsConfig(
                use_fastdepends=kwargs.get("apply_types", True),
                serializer=kwargs.get("serializer", EMPTY),
                provider=kwargs.get("provider") or dependency_provider,
                context=kwargs.get("context") or ContextRepo(),
            ),
            broker_dependencies=kwargs.get("dependencies", ()),
            graceful_timeout=kwargs.get("graceful_timeout", 15.0),
            ack_policy=kwargs.get("ack_policy", EMPTY),
            extra_context={"broker": self},
        ),
        specification=BrokerSpec(
            description=kwargs.get("description"),
            url=[specification_url],
            protocol=protocol,
            protocol_version=kwargs.get("protocol_version", "custom"),
            security=kwargs.get("security"),
            tags=kwargs.get("tags", ()),
        ),
    )

middlewares property #

middlewares: Sequence[BrokerMiddleware[MsgType]]

context property #

context: ContextRepo

config instance-attribute #

config: ConfigComposition[BrokerConfigType] = (
    ConfigComposition(config)
)

routers instance-attribute #

routers: list[Registrator[MsgType, Any]] = []

subscribers property #

subscribers: list[SubscriberUsecase[MsgType]]

publishers property #

publishers: list[PublisherUsecase]

parent property writable #

parent: Registrator[MsgType, Any] | None

specification instance-attribute #

specification = specification

running instance-attribute #

running = False

provider property #

provider: Provider

message_format instance-attribute #

message_format = message_format

request async #

request(
    message: SendableMessage,
    channel: str | None = None,
    *,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    correlation_id: str | None = None,
    headers: dict[str, Any] | None = None,
    timeout: float | None = 30.0,
) -> RedisChannelMessage
Source code in faststream/redis/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    channel: str | None = None,
    *,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    correlation_id: str | None = None,
    headers: dict[str, Any] | None = None,
    timeout: float | None = 30.0,
) -> "RedisChannelMessage":
    cmd = RedisPublishCommand(
        message,
        correlation_id=correlation_id or gen_cor_id(),
        channel=channel,
        list=list,
        stream=stream,
        maxlen=maxlen,
        headers=headers,
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
        message_format=self.message_format,
    )
    msg: RedisChannelMessage = await super()._basic_request(
        cmd,
        producer=self.config.producer,
    )
    return msg

add_middleware #

add_middleware(
    middleware: BrokerMiddleware[Any, Any],
) -> None

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(
    middleware: BrokerMiddleware[Any, Any],
) -> None

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

publisher #

publisher(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> StreamPublisher
publisher(
    channel: None = None,
    *,
    list: str = ...,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> ListPublisher
publisher(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> Union[ListPublisher, ListBatchPublisher]
publisher(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> ChannelPublisher
publisher(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> LogicPublisher
publisher(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> LogicPublisher

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
channel

Redis PubSub object name to send message.

TYPE: Union[PubSub, str, None] DEFAULT: None

list

Redis List object name to send message.

TYPE: Union[ListSub, str, None] DEFAULT: None

stream

Redis Stream object name to send message.

TYPE: Union[StreamSub, str, None] DEFAULT: None

headers

Message headers to store meta-information. Can be overridden by publish.headers if specified.

TYPE: dict[str, Any] | None DEFAULT: None

reply_to

Reply message destination PubSub object name.

TYPE: str DEFAULT: ''

message_format

Which format to use when parsing messages.

TYPE: type[MessageFormat] | None DEFAULT: None

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the publisher persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/redis/broker/registrator.py
@override
def publisher(
    self,
    channel: Union["PubSub", str, None] = None,
    *,
    list: Union["ListSub", str, None] = None,
    stream: Union["StreamSub", str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type["MessageFormat"] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> "LogicPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        channel: Redis PubSub object name to send message.
        list: Redis List object name to send message.
        stream: Redis Stream object name to send message.
        headers: Message headers to store meta-information. Can be overridden
            by `publish.headers` if specified.
        reply_to: Reply message destination PubSub object name.
        message_format: Which format to use when parsing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type. Should be any python-native
            object annotation or `pydantic.BaseModel`.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.
        persistent: Whether to make the publisher persistent or not.
    """
    publisher = create_publisher(
        channel=channel,
        list=list,
        stream=stream,
        headers=headers,
        reply_to=reply_to,
        # Specific
        config=cast("RedisBrokerConfig", self.config),
        message_format=message_format,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )
    super().publisher(publisher, persistent=persistent)
    return publisher

include_router #

include_router(
    router: RedisRegistrator,
    *,
    prefix: str = "",
    dependencies: Iterable[Dependant] = (),
    middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
    include_in_schema: bool | None = None,
) -> None
Source code in faststream/redis/broker/registrator.py
@override
def include_router(
    self,
    router: "RedisRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, RedisRegistrator):
        msg = (
            f"Router must be an instance of RedisRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(
    *routers: Registrator[MsgType, Any],
) -> None

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

connect async #

connect() -> ConnectionType

Connect to a remote server.

Source code in faststream/_internal/broker/broker.py
async def connect(self) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        self._connection = await self._connect()
        self._setup_logger()

    return self._connection

ping async #

ping(timeout: float | None = 3) -> bool
Source code in faststream/redis/broker/broker.py
@override
async def ping(self, timeout: float | None = 3) -> bool:
    sleep_time = (timeout or 10) / 10

    with move_on_after(timeout) as cancel_scope:
        if self._connection is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            try:
                if await self._connection.ping():
                    return True

            except ConnectionError:
                pass

            await anyio.sleep(sleep_time)

    return False

subscriber #

subscriber(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    **kwargs: Any,
) -> LogicSubscriber
Source code in faststream/redis/broker/cluster_broker.py
def subscriber(  # type: ignore[override]
    self,
    channel: Union["PubSub", str, None] = None,
    *,
    list: Union["ListSub", str, None] = None,
    stream: Union["StreamSub", str, None] = None,
    **kwargs: Any,
) -> "LogicSubscriber":
    if channel is not None:
        return self._make_channel_subscriber(channel, **kwargs)
    return super().subscriber(
        channel=None,
        list=list,
        stream=stream,
        **kwargs,
    )

publish async #

publish(
    message: SendableMessage = None,
    channel: str | None = None,
    *,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    pipeline: Optional[Pipeline[bytes]] = EMPTY,
) -> int | bytes
Source code in faststream/redis/broker/cluster_broker.py
async def publish(  # type: ignore[override]
    self,
    message: "SendableMessage" = None,
    channel: str | None = None,
    *,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    pipeline: Optional["Pipeline[bytes]"] = EMPTY,
) -> int | bytes:
    if pipeline is not EMPTY:
        warnings.warn(
            "Pipeline is not supported in Redis Cluster and will be ignored.",
            category=RuntimeWarning,
            stacklevel=2,
        )

    publish_kwargs: dict[str, Any] = {}
    if stream is not None:
        publish_kwargs["stream"] = stream
    if maxlen is not None:
        publish_kwargs["maxlen"] = maxlen

    return cast(
        "int | bytes",
        await super().publish(
            message,
            channel,
            reply_to=reply_to,
            headers=headers,
            correlation_id=correlation_id,
            list=list,
            **publish_kwargs,
        ),
    )

stop async #

stop(
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional[TracebackType] = None,
) -> None
Source code in faststream/redis/broker/cluster_broker.py
async def stop(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await super().stop(exc_type, exc_val, exc_tb)
    await self.config.disconnect()
    self._connection = None

start async #

start() -> None
Source code in faststream/redis/broker/cluster_broker.py
async def start(self) -> None:
    await self.connect()
    await super().start()

publish_batch async #

publish_batch(
    *messages: SendableMessage,
    list: str,
    correlation_id: str | None = None,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    pipeline: Optional[Pipeline[bytes]] = EMPTY,
) -> int
Source code in faststream/redis/broker/cluster_broker.py
async def publish_batch(  # type: ignore[override]
    self,
    *messages: "SendableMessage",
    list: str,
    correlation_id: str | None = None,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    pipeline: Optional["Pipeline[bytes]"] = EMPTY,
) -> int:
    if pipeline is not EMPTY:
        warnings.warn(
            "Pipeline is not supported in Redis Cluster and will be ignored.",
            category=RuntimeWarning,
            stacklevel=2,
        )

    if not self._cluster_state:
        await self._connect()

    return await super().publish_batch(
        *messages,
        list=list,
        correlation_id=correlation_id,
        reply_to=reply_to,
        headers=headers,
    )