Skip to content

RedisBroker

faststream.redis.RedisBroker #

RedisBroker(
    url="redis://localhost:6379",
    *,
    host=EMPTY,
    port=EMPTY,
    db=EMPTY,
    connection_class=EMPTY,
    client_name=None,
    health_check_interval=0,
    max_connections=None,
    socket_timeout=None,
    socket_connect_timeout=None,
    socket_read_size=65536,
    socket_keepalive=False,
    socket_keepalive_options=None,
    socket_type=0,
    retry_on_timeout=False,
    encoding="utf-8",
    encoding_errors="strict",
    parser_class=DefaultParser,
    encoder_class=Encoder,
    graceful_timeout=15.0,
    decoder=None,
    parser=None,
    dependencies=(),
    middlewares=(),
    routers=(),
    message_format=BinaryMessageFormatV1,
    security=None,
    specification_url=None,
    protocol=None,
    protocol_version="custom",
    description=None,
    tags=(),
    logger=EMPTY,
    log_level=INFO,
    apply_types=True,
    serializer=EMPTY,
    provider=None,
    context=None,
)

Bases: RedisRegistrator, BrokerUsecase[UnifyRedisDict, 'Redis[bytes]']

Redis broker.

Initialized the RedisBroker.

PARAMETER DESCRIPTION
url

The Redis connection URL. Defaults to "redis://localhost:6379".

TYPE: str DEFAULT: 'redis://localhost:6379'

host

The Redis host to connect to. If not provided, it will be extracted from the URL.

TYPE: str DEFAULT: EMPTY

port

The Redis port to connect to. If not provided, it will be extracted from the URL.

TYPE: str | int DEFAULT: EMPTY

db

The Redis database to use. If not provided, it will be extracted from the URL.

TYPE: str | int DEFAULT: EMPTY

connection_class

The class to use for establishing connections. Defaults to EMPTY.

TYPE: type[Connection] DEFAULT: EMPTY

client_name

The name of the Redis client. Defaults to None.

TYPE: str | None DEFAULT: None

health_check_interval

The interval at which to perform health checks on the broker. Defaults to 0.

TYPE: float DEFAULT: 0

max_connections

The maximum number of connections to establish. Defaults to None.

TYPE: int | None DEFAULT: None

socket_timeout

The timeout for socket operations. Defaults to None.

TYPE: float | None DEFAULT: None

socket_connect_timeout

The timeout for connecting sockets. Defaults to None.

TYPE: float | None DEFAULT: None

socket_read_size

The size of the buffer used for reading from sockets. Defaults to 65536.

TYPE: int DEFAULT: 65536

socket_keepalive

Whether to enable keep-alive on sockets. Defaults to False.

TYPE: bool DEFAULT: False

socket_keepalive_options

Options for keep-alive on sockets. Defaults to None.

TYPE: Mapping[int, int | bytes] | None DEFAULT: None

socket_type

The type of socket to use (if supported by your platform). Defaults to 0.

TYPE: int DEFAULT: 0

retry_on_timeout

Whether to retry operations that timeout. Defaults to False.

TYPE: bool DEFAULT: False

encoding

The encoding used for sending and receiving data. Defaults to "utf-8".

TYPE: str DEFAULT: 'utf-8'

encoding_errors

How to handle encoding errors. Defaults to "strict".

TYPE: str DEFAULT: 'strict'

parser_class

The class to use for parsing messages. Defaults to DefaultParser.

TYPE: type[BaseParser] DEFAULT: DefaultParser

encoder_class

The class to use for encoding messages. Defaults to Encoder.

TYPE: type[Encoder] DEFAULT: Encoder

graceful_timeout

Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down. Defaults to 15.0.

TYPE: float | None DEFAULT: 15.0

decoder

Custom decoder object. Defaults to None.

TYPE: Optional[CustomCallable] DEFAULT: None

parser

Custom parser object. Defaults to None.

TYPE: Optional[CustomCallable] DEFAULT: None

dependencies

Dependencies to apply to all broker subscribers. Defaults to ().

TYPE: Iterable[Dependant] DEFAULT: ()

middlewares

Middlewares to apply to all broker publishers/subscribers. Defaults to ().

TYPE: Sequence[BrokerMiddleware[Any, Any]] DEFAULT: ()

routers

Routers to apply to broker. Defaults to ().

TYPE: Iterable[RedisRegistrator] DEFAULT: ()

message_format

What format to use when parsing messages. Defaults to BinaryMessageFormatV1.

TYPE: type[MessageFormat] DEFAULT: BinaryMessageFormatV1

security

Security options to connect broker and generate AsyncAPI server security information. Defaults to None.

TYPE: Optional[BaseSecurity] DEFAULT: None

specification_url

AsyncAPI hardcoded server addresses. Use servers if not specified. Defaults to None.

TYPE: str | None DEFAULT: None

protocol

AsyncAPI server protocol. Defaults to None.

TYPE: str | None DEFAULT: None

protocol_version

AsyncAPI server protocol version. Defaults to "custom".

TYPE: str | None DEFAULT: 'custom'

description

AsyncAPI server description. Defaults to None.

TYPE: str | None DEFAULT: None

tags

AsyncAPI server tags. Defaults to ().

TYPE: Iterable[Union[Tag, TagDict]] DEFAULT: ()

logger

User specified logger to pass into Context and log service messages. Defaults to EMPTY.

TYPE: Optional[LoggerProto] DEFAULT: EMPTY

log_level

Service messages log level. Defaults to logging.INFO.

TYPE: int DEFAULT: INFO

apply_types

Whether to use FastDepends or not. Defaults to True.

TYPE: bool DEFAULT: True

serializer

Serializer object. Defaults to EMPTY.

TYPE: Optional[SerializerProto] DEFAULT: EMPTY

provider

Provider for FastDepends library. Defaults to None.

TYPE: Optional[Provider] DEFAULT: None

context

Context repository for FastDepends library. Defaults to None.

TYPE: Optional[ContextRepo] DEFAULT: None

Source code in faststream/redis/broker/broker.py
def __init__(
    self,
    url: str = "redis://localhost:6379",
    *,
    host: str = EMPTY,
    port: str | int = EMPTY,
    db: str | int = EMPTY,
    connection_class: type["Connection"] = EMPTY,
    client_name: str | None = None,
    health_check_interval: float = 0,
    max_connections: int | None = None,
    socket_timeout: float | None = None,
    socket_connect_timeout: float | None = None,
    socket_read_size: int = 65536,
    socket_keepalive: bool = False,
    socket_keepalive_options: Mapping[int, int | bytes] | None = None,
    socket_type: int = 0,
    retry_on_timeout: bool = False,
    encoding: str = "utf-8",
    encoding_errors: str = "strict",
    parser_class: type["BaseParser"] = DefaultParser,
    encoder_class: type["Encoder"] = Encoder,
    graceful_timeout: float | None = 15.0,
    decoder: Optional["CustomCallable"] = None,
    parser: Optional["CustomCallable"] = None,
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    routers: Iterable[RedisRegistrator] = (),
    message_format: type["MessageFormat"] = BinaryMessageFormatV1,
    security: Optional["BaseSecurity"] = None,
    specification_url: str | None = None,
    protocol: str | None = None,
    protocol_version: str | None = "custom",
    description: str | None = None,
    tags: Iterable[Union["Tag", "TagDict"]] = (),
    logger: Optional["LoggerProto"] = EMPTY,
    log_level: int = logging.INFO,
    apply_types: bool = True,
    serializer: Optional["SerializerProto"] = EMPTY,
    provider: Optional["Provider"] = None,
    context: Optional["ContextRepo"] = None,
) -> None:
    """Initialized the RedisBroker.

    Args:
        url:
            The Redis connection URL. Defaults to "redis://localhost:6379".
        host:
            The Redis host to connect to. If not provided, it will be extracted from the URL.
        port:
            The Redis port to connect to. If not provided, it will be extracted from the URL.
        db:
            The Redis database to use. If not provided, it will be extracted from the URL.
        connection_class:
            The class to use for establishing connections. Defaults to EMPTY.
        client_name:
            The name of the Redis client. Defaults to None.
        health_check_interval:
            The interval at which to perform health checks on the broker. Defaults to 0.
        max_connections:
            The maximum number of connections to establish. Defaults to None.
        socket_timeout:
            The timeout for socket operations. Defaults to None.
        socket_connect_timeout:
            The timeout for connecting sockets. Defaults to None.
        socket_read_size:
            The size of the buffer used for reading from sockets. Defaults to 65536.
        socket_keepalive:
            Whether to enable keep-alive on sockets. Defaults to False.
        socket_keepalive_options:
            Options for keep-alive on sockets. Defaults to None.
        socket_type:
            The type of socket to use (if supported by your platform). Defaults to 0.
        retry_on_timeout:
            Whether to retry operations that timeout. Defaults to False.
        encoding:
            The encoding used for sending and receiving data. Defaults to "utf-8".
        encoding_errors:
            How to handle encoding errors. Defaults to "strict".
        parser_class:
            The class to use for parsing messages. Defaults to DefaultParser.
        encoder_class:
            The class to use for encoding messages. Defaults to Encoder.
        graceful_timeout:
            Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down. Defaults to 15.0.
        decoder:
            Custom decoder object. Defaults to None.
        parser:
            Custom parser object. Defaults to None.
        dependencies:
            Dependencies to apply to all broker subscribers. Defaults to ().
        middlewares:
            Middlewares to apply to all broker publishers/subscribers. Defaults to ().
        routers:
            Routers to apply to broker. Defaults to ().
        message_format:
            What format to use when parsing messages. Defaults to BinaryMessageFormatV1.
        security:
            Security options to connect broker and generate AsyncAPI server security information. Defaults to None.
        specification_url:
            AsyncAPI hardcoded server addresses. Use `servers` if not specified. Defaults to None.
        protocol:
            AsyncAPI server protocol. Defaults to None.
        protocol_version:
            AsyncAPI server protocol version. Defaults to "custom".
        description:
            AsyncAPI server description. Defaults to None.
        tags:
            AsyncAPI server tags. Defaults to ().
        logger:
            User specified logger to pass into Context and log service messages. Defaults to EMPTY.
        log_level:
            Service messages log level. Defaults to logging.INFO.
        apply_types:
            Whether to use FastDepends or not. Defaults to True.
        serializer:
            Serializer object. Defaults to EMPTY.
        provider:
            Provider for FastDepends library. Defaults to None.
        context:
            Context repository for FastDepends library. Defaults to None.
    """
    if message_format == JSONMessageFormat:
        warnings.warn(
            "JSONMessageFormat has been deprecated and will be removed in version 0.7. "
            "Instead, use BinaryMessageFormatV1 when initializing broker",
            category=DeprecationWarning,
            stacklevel=2,
        )

    self.message_format = message_format

    if specification_url is None:
        specification_url = url

    if protocol is None:
        url_kwargs = urlparse(specification_url)
        protocol = url_kwargs.scheme

    connection_options = _resolve_url_options(
        url,
        security=security,
        host=host,
        port=port,
        db=db,
        client_name=client_name,
        health_check_interval=health_check_interval,
        max_connections=max_connections,
        socket_timeout=socket_timeout,
        socket_connect_timeout=socket_connect_timeout,
        socket_read_size=socket_read_size,
        socket_keepalive=socket_keepalive,
        socket_keepalive_options=socket_keepalive_options,
        socket_type=socket_type,
        retry_on_timeout=retry_on_timeout,
        encoding=encoding,
        encoding_errors=encoding_errors,
        parser_class=parser_class,
        connection_class=connection_class,
        encoder_class=encoder_class,
    )

    connection_state = ConnectionState(connection_options)

    super().__init__(
        **connection_options,
        routers=routers,
        config=RedisBrokerConfig(
            connection=connection_state,
            producer=RedisFastProducer(
                connection=connection_state,
                parser=parser,
                decoder=decoder,
                message_format=self.message_format,
                serializer=serializer,
            ),
            message_format=self.message_format,
            # both args
            broker_middlewares=middlewares,
            broker_parser=parser,
            broker_decoder=decoder,
            logger=make_redis_logger_state(
                logger=logger,
                log_level=log_level,
            ),
            fd_config=FastDependsConfig(
                use_fastdepends=apply_types,
                serializer=serializer,
                provider=provider or dependency_provider,
                context=context or ContextRepo(),
            ),
            # subscriber args
            broker_dependencies=dependencies,
            graceful_timeout=graceful_timeout,
            extra_context={
                "broker": self,
            },
        ),
        specification=BrokerSpec(
            description=description,
            url=[specification_url],
            protocol=protocol,
            protocol_version=protocol_version,
            security=security,
            tags=tags,
        ),
    )

middlewares property #

middlewares

context property #

context

config instance-attribute #

config = ConfigComposition(config)

routers instance-attribute #

routers = []

subscribers property #

subscribers

publishers property #

publishers

specification instance-attribute #

specification = specification

running instance-attribute #

running = False

provider property #

provider

message_format instance-attribute #

message_format = message_format

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(middleware)

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

subscriber #

subscriber(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> ChannelSubscriber
subscriber(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = ...,
) -> ChannelConcurrentSubscriber
subscriber(
    channel: None = None,
    *,
    list: str = ...,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> ListSubscriber
subscriber(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> Union[ListSubscriber, ListBatchSubscriber]
subscriber(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = ...,
) -> ListConcurrentSubscriber
subscriber(
    channel: None = None,
    *,
    list: None = None,
    stream: str = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> StreamSubscriber
subscriber(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> Union[StreamSubscriber, StreamBatchSubscriber]
subscriber(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = ...,
) -> StreamConcurrentSubscriber
subscriber(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int | None = None,
) -> LogicSubscriber
subscriber(
    channel=None,
    *,
    list=None,
    stream=None,
    dependencies=(),
    parser=None,
    decoder=None,
    middlewares=(),
    no_ack=EMPTY,
    ack_policy=EMPTY,
    no_reply=False,
    message_format=None,
    persistent=True,
    title=None,
    description=None,
    include_in_schema=True,
    max_workers=None,
)

Subscribe a handler to a RabbitMQ queue.

PARAMETER DESCRIPTION
channel

Redis PubSub object name to send message.

TYPE: Union[PubSub, str, None] DEFAULT: None

list

Redis List object name to send message.

TYPE: Union[ListSub, str, None] DEFAULT: None

stream

Redis Stream object name to send message.

TYPE: Union[StreamSub, str, None] DEFAULT: None

no_ack

Whether to disable FastStream auto acknowledgement logic or not.

TYPE: bool DEFAULT: EMPTY

ack_policy

Acknowledgement policy for message processing.

TYPE: AckPolicy DEFAULT: EMPTY

dependencies

Dependencies list ([Depends(),]) to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Parser to map original IncomingMessage Msg to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

middlewares

Subscriber middlewares to wrap incoming message processing.

TYPE: Sequence[SubscriberMiddleware[Any]] DEFAULT: ()

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

message_format

Which format to use when parsing messages.

TYPE: type[MessageFormat] | None DEFAULT: None

persistent

Whether to make the subscriber persistent or not.

TYPE: bool DEFAULT: True

max_workers

Number of workers to process messages concurrently.

TYPE: int | None DEFAULT: None

title

AsyncAPI subscriber object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI subscriber object description. Uses decorated docstring as default.

TYPE: str | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
SubscriberType

The subscriber object.

TYPE: LogicSubscriber

Source code in faststream/redis/broker/registrator.py
@override
def subscriber(
    self,
    channel: Union["PubSub", str, None] = None,
    *,
    list: Union["ListSub", str, None] = None,
    stream: Union["StreamSub", str, None] = None,
    # broker arguments
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[Any]"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    no_ack: Annotated[
        bool,
        deprecated(
            "This option was deprecated in 0.6.0 to prior to **ack_policy=AckPolicy.MANUAL**. "
            "Scheduled to remove in 0.7.0",
        ),
    ] = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type["MessageFormat"] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int | None = None,
) -> "LogicSubscriber":
    """Subscribe a handler to a RabbitMQ queue.

    Args:
        channel: Redis PubSub object name to send message.
        list: Redis List object name to send message.
        stream: Redis Stream object name to send message.
        no_ack: Whether to disable **FastStream** auto acknowledgement logic or not.
        ack_policy: Acknowledgement policy for message processing.
        dependencies: Dependencies list (`[Depends(),]`) to apply to the subscriber.
        parser: Parser to map original **IncomingMessage** Msg to FastStream one.
        decoder: Function to decode FastStream msg bytes body to python objects.
        middlewares: Subscriber middlewares to wrap incoming message processing.
        no_reply: Whether to disable **FastStream** RPC and Reply To auto responses or not.
        message_format: Which format to use when parsing messages.
        persistent: Whether to make the subscriber persistent or not.
        max_workers: Number of workers to process messages concurrently.
        title: AsyncAPI subscriber object title.
        description: AsyncAPI subscriber object description. Uses decorated docstring as default.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.

    Returns:
        SubscriberType: The subscriber object.
    """
    subscriber = create_subscriber(
        channel=channel,
        list=list,
        stream=stream,
        # subscriber args
        max_workers=max_workers or 1,
        no_ack=no_ack,
        no_reply=no_reply,
        ack_policy=ack_policy,
        message_format=message_format,
        config=cast("RedisBrokerConfig", self.config),
        # AsyncAPI
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    return subscriber.add_call(
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> StreamPublisher
publisher(
    channel: None = None,
    *,
    list: str = ...,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> ListPublisher
publisher(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> Union[ListPublisher, ListBatchPublisher]
publisher(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> ChannelPublisher
publisher(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> LogicPublisher
publisher(
    channel=None,
    *,
    list=None,
    stream=None,
    headers=None,
    reply_to="",
    middlewares=(),
    message_format=None,
    persistent=True,
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
channel

Redis PubSub object name to send message.

TYPE: Union[PubSub, str, None] DEFAULT: None

list

Redis List object name to send message.

TYPE: Union[ListSub, str, None] DEFAULT: None

stream

Redis Stream object name to send message.

TYPE: Union[StreamSub, str, None] DEFAULT: None

headers

Message headers to store meta-information. Can be overridden by publish.headers if specified.

TYPE: dict[str, Any] | None DEFAULT: None

reply_to

Reply message destination PubSub object name.

TYPE: str DEFAULT: ''

middlewares

Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

message_format

Which format to use when parsing messages.

TYPE: type[MessageFormat] | None DEFAULT: None

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the publisher persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/redis/broker/registrator.py
@override
def publisher(
    self,
    channel: Union["PubSub", str, None] = None,
    *,
    list: Union["ListSub", str, None] = None,
    stream: Union["StreamSub", str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    message_format: type["MessageFormat"] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> "LogicPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        channel: Redis PubSub object name to send message.
        list: Redis List object name to send message.
        stream: Redis Stream object name to send message.
        headers: Message headers to store meta-information. Can be overridden
            by `publish.headers` if specified.
        reply_to: Reply message destination PubSub object name.
        middlewares: Publisher middlewares to wrap outgoing messages.
        message_format: Which format to use when parsing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type. Should be any python-native
            object annotation or `pydantic.BaseModel`.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.
        persistent: Whether to make the publisher persistent or not.
    """
    publisher = create_publisher(
        channel=channel,
        list=list,
        stream=stream,
        headers=headers,
        reply_to=reply_to,
        # Specific
        config=cast("RedisBrokerConfig", self.config),
        middlewares=middlewares,
        message_format=message_format,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )
    super().publisher(publisher, persistent=persistent)
    return publisher

include_router #

include_router(
    router,
    *,
    prefix="",
    dependencies=(),
    middlewares=(),
    include_in_schema=None,
)
Source code in faststream/redis/broker/registrator.py
@override
def include_router(
    self,
    router: "RedisRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, RedisRegistrator):
        msg = (
            f"Router must be an instance of RedisRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

connect async #

connect()

Connect to a remote server.

Source code in faststream/_internal/broker/broker.py
async def connect(self) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        self._connection = await self._connect()
        self._setup_logger()

    return self._connection

stop async #

stop(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/redis/broker/broker.py
async def stop(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await super().stop(exc_type, exc_val, exc_tb)
    await self.config.disconnect()
    self._connection = None

close async #

close(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/redis/broker/broker.py
@deprecated(
    "Deprecated in **FastStream 0.5.44**. "
    "Please, use `stop` method instead. "
    "Method `close` will be removed in **FastStream 0.7.0**.",
    category=DeprecationWarning,
    stacklevel=1,
)
async def close(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await self.stop(exc_type, exc_val, exc_tb)

start async #

start()
Source code in faststream/redis/broker/broker.py
async def start(self) -> None:
    await self.connect()
    await super().start()

publish async #

publish(
    message: SendableMessage = None,
    channel: str | None = None,
    *,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    list: str | None = None,
    stream: None = None,
    maxlen: int | None = None,
    pipeline: Optional[Pipeline[bytes]] = None,
) -> int
publish(
    message: SendableMessage = None,
    channel: str | None = None,
    *,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    list: str | None = None,
    stream: str = ...,
    maxlen: int | None = None,
    pipeline: Optional[Pipeline[bytes]] = None,
) -> bytes
publish(
    message=None,
    channel=None,
    *,
    reply_to="",
    headers=None,
    correlation_id=None,
    list=None,
    stream=None,
    maxlen=None,
    pipeline=None,
)

Publish message directly.

This method allows you to publish a message in a non-AsyncAPI-documented way. It can be used in other frameworks or to publish messages at specific intervals.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: SendableMessage DEFAULT: None

channel

Redis PubSub object name to send message.

TYPE: str | None DEFAULT: None

reply_to

Reply message destination PubSub object name.

TYPE: str DEFAULT: ''

headers

Message headers to store metainformation.

TYPE: dict[str, Any] | None DEFAULT: None

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

list

Redis List object name to send message.

TYPE: str | None DEFAULT: None

stream

Redis Stream object name to send message.

TYPE: str | None DEFAULT: None

maxlen

Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded.

TYPE: int | None DEFAULT: None

pipeline

Redis pipeline to use for publishing messages.

TYPE: Optional[Pipeline[bytes]] DEFAULT: None

RETURNS DESCRIPTION
int

The result of the publish operation, typically the number of messages published.

TYPE: int | bytes

Source code in faststream/redis/broker/broker.py
@override
async def publish(
    self,
    message: "SendableMessage" = None,
    channel: str | None = None,
    *,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    pipeline: Optional["Pipeline[bytes]"] = None,
) -> int | bytes:
    """Publish message directly.

    This method allows you to publish a message in a non-AsyncAPI-documented way.
    It can be used in other frameworks or to publish messages at specific intervals.

    Args:
        message:
            Message body to send.
        channel:
            Redis PubSub object name to send message.
        reply_to:
            Reply message destination PubSub object name.
        headers:
            Message headers to store metainformation.
        correlation_id:
            Manual message correlation_id setter. correlation_id is a useful option to trace messages.
        list:
            Redis List object name to send message.
        stream:
            Redis Stream object name to send message.
        maxlen:
            Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded.
        pipeline:
            Redis pipeline to use for publishing messages.

    Returns:
        int: The result of the publish operation, typically the number of messages published.
    """
    cmd = RedisPublishCommand(
        message,
        correlation_id=correlation_id or gen_cor_id(),
        channel=channel,
        list=list,
        stream=stream,
        maxlen=maxlen,
        reply_to=reply_to,
        headers=headers,
        pipeline=pipeline,
        _publish_type=PublishType.PUBLISH,
        message_format=self.message_format,
    )

    result: int | bytes = await super()._basic_publish(
        cmd,
        producer=self.config.producer,
    )
    return result

request async #

request(
    message,
    channel=None,
    *,
    list=None,
    stream=None,
    maxlen=None,
    correlation_id=None,
    headers=None,
    timeout=30.0,
)
Source code in faststream/redis/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    channel: str | None = None,
    *,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    correlation_id: str | None = None,
    headers: dict[str, Any] | None = None,
    timeout: float | None = 30.0,
) -> "RedisChannelMessage":
    cmd = RedisPublishCommand(
        message,
        correlation_id=correlation_id or gen_cor_id(),
        channel=channel,
        list=list,
        stream=stream,
        maxlen=maxlen,
        headers=headers,
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
        message_format=self.message_format,
    )
    msg: RedisChannelMessage = await super()._basic_request(
        cmd,
        producer=self.config.producer,
    )
    return msg

publish_batch async #

publish_batch(
    *messages,
    list,
    correlation_id=None,
    reply_to="",
    headers=None,
    pipeline=None,
)

Publish multiple messages to Redis List by one request.

PARAMETER DESCRIPTION
*messages

Messages bodies to send.

TYPE: SendableMessage DEFAULT: ()

list

Redis List object name to send messages.

TYPE: str

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

reply_to

Reply message destination PubSub object name.

TYPE: str DEFAULT: ''

headers

Message headers to store metainformation.

TYPE: dict[str, Any] | None DEFAULT: None

pipeline

Redis pipeline to use for publishing messages.

TYPE: Optional[Pipeline[bytes]] DEFAULT: None

RETURNS DESCRIPTION
int

The result of the batch publish operation.

TYPE: int

Source code in faststream/redis/broker/broker.py
@override
async def publish_batch(  # type: ignore[override]
    self,
    *messages: "SendableMessage",
    list: str,
    correlation_id: str | None = None,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    pipeline: Optional["Pipeline[bytes]"] = None,
) -> int:
    """Publish multiple messages to Redis List by one request.

    Args:
        *messages: Messages bodies to send.
        list: Redis List object name to send messages.
        correlation_id: Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages.
        reply_to: Reply message destination PubSub object name.
        headers: Message headers to store metainformation.
        pipeline: Redis pipeline to use for publishing messages.

    Returns:
        int: The result of the batch publish operation.
    """
    cmd = RedisPublishCommand(
        *messages,
        list=list,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id or gen_cor_id(),
        pipeline=pipeline,
        _publish_type=PublishType.PUBLISH,
        message_format=self.message_format,
    )

    result: int = await self._basic_publish_batch(
        cmd,
        producer=self.config.producer,
    )
    return result

ping async #

ping(timeout=3)
Source code in faststream/redis/broker/broker.py
@override
async def ping(self, timeout: float | None = 3) -> bool:
    sleep_time = (timeout or 10) / 10

    with move_on_after(timeout) as cancel_scope:
        if self._connection is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            try:
                if await self._connection.ping():
                    return True

            except ConnectionError:
                pass

            await anyio.sleep(sleep_time)

    return False