Skip to content

RedisBroker

faststream.redis.RedisBroker #

RedisBroker(
    url: str = "redis://localhost:6379",
    **kwargs: Unpack[RedisBrokerParams],
)

Bases: RedisRegistrator, BrokerUsecase[UnifyRedisDict, 'Redis[bytes]']

Redis broker.

Initialized the RedisBroker.

Source code in faststream/redis/broker/broker.py
def __init__(
    self,
    url: str = "redis://localhost:6379",
    **kwargs: Unpack["RedisBrokerParams"],
) -> None:
    """Initialized the RedisBroker."""
    host = kwargs.pop("host", EMPTY)
    port = kwargs.pop("port", EMPTY)
    security = kwargs.pop("security", None)
    specification_url = kwargs.pop("specification_url", None)
    protocol = kwargs.pop("protocol", None)
    message_format = kwargs.pop("message_format", BinaryMessageFormatV1)

    self.message_format = message_format

    if specification_url is None:
        specification_url = url
    if protocol is None:
        protocol = urlparse(specification_url).scheme

    connection_kwargs = {
        k: v for k, v in kwargs.items() if k not in NON_CONNECTION_PARAMS
    }
    connection_options = self._resolve_url_options(
        url,
        security=security,
        host=host,
        port=port,
        **connection_kwargs,
    )

    connection_state = RedisConnectionState(connection_options)

    super().__init__(
        **connection_options,
        routers=kwargs.get("routers", ()),
        config=RedisBrokerConfig(
            connection=connection_state,
            producer=RedisFastProducer(
                connection=connection_state,
                parser=kwargs.get("parser"),
                decoder=kwargs.get("decoder"),
                message_format=self.message_format,
                serializer=kwargs.get("serializer"),
            ),
            message_format=self.message_format,
            broker_middlewares=kwargs.get("middlewares", ()),
            broker_parser=kwargs.get("parser"),
            broker_decoder=kwargs.get("decoder"),
            broker_codec=kwargs.get("codec"),
            logger=make_redis_logger_state(
                logger=kwargs.get("logger", EMPTY),
                log_level=kwargs.get("log_level", logging.INFO),
            ),
            fd_config=FastDependsConfig(
                use_fastdepends=kwargs.get("apply_types", True),
                serializer=kwargs.get("serializer", EMPTY),
                provider=kwargs.get("provider") or dependency_provider,
                context=kwargs.get("context") or ContextRepo(),
            ),
            broker_dependencies=kwargs.get("dependencies", ()),
            graceful_timeout=kwargs.get("graceful_timeout", 15.0),
            ack_policy=kwargs.get("ack_policy", EMPTY),
            extra_context={"broker": self},
        ),
        specification=BrokerSpec(
            description=kwargs.get("description"),
            url=[specification_url],
            protocol=protocol,
            protocol_version=kwargs.get("protocol_version", "custom"),
            security=security,
            tags=kwargs.get("tags", ()),
        ),
    )

middlewares property #

middlewares: Sequence[BrokerMiddleware[MsgType]]

context property #

context: ContextRepo

config instance-attribute #

config: ConfigComposition[BrokerConfigType] = (
    ConfigComposition(config)
)

routers instance-attribute #

routers: list[Registrator[MsgType, Any]] = []

subscribers property #

subscribers: list[SubscriberUsecase[MsgType]]

publishers property #

publishers: list[PublisherUsecase]

parent property writable #

parent: Registrator[MsgType, Any] | None

specification instance-attribute #

specification = specification

running instance-attribute #

running = False

provider property #

provider: Provider

message_format instance-attribute #

message_format = message_format

add_middleware #

add_middleware(
    middleware: BrokerMiddleware[Any, Any],
) -> None

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(
    middleware: BrokerMiddleware[Any, Any],
) -> None

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

subscriber #

subscriber(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> ChannelSubscriber
subscriber(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = ...,
) -> ChannelConcurrentSubscriber
subscriber(
    channel: None = None,
    *,
    list: str = ...,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> ListSubscriber
subscriber(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> Union[ListSubscriber, ListBatchSubscriber]
subscriber(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = ...,
) -> ListConcurrentSubscriber
subscriber(
    channel: None = None,
    *,
    list: None = None,
    stream: str = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> StreamSubscriber
subscriber(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> Union[StreamSubscriber, StreamBatchSubscriber]
subscriber(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = ...,
) -> StreamConcurrentSubscriber
subscriber(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int | None = None,
) -> LogicSubscriber
subscriber(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    codec: Optional[CodecProto] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int | None = None,
) -> LogicSubscriber

Subscribe a handler to a RabbitMQ queue.

PARAMETER DESCRIPTION
channel

Redis PubSub object name to send message.

TYPE: Union[PubSub, str, None] DEFAULT: None

list

Redis List object name to send message.

TYPE: Union[ListSub, str, None] DEFAULT: None

stream

Redis Stream object name to send message.

TYPE: Union[StreamSub, str, None] DEFAULT: None

ack_policy

Acknowledgement policy for message processing.

TYPE: AckPolicy DEFAULT: EMPTY

dependencies

Dependencies list ([Depends(),]) to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Parser to map original IncomingMessage Msg to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

codec

Custom codec object.

TYPE: Optional[CodecProto] DEFAULT: None

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

message_format

Which format to use when parsing messages.

TYPE: type[MessageFormat] | None DEFAULT: None

persistent

Whether to make the subscriber persistent or not.

TYPE: bool DEFAULT: True

max_workers

Number of workers to process messages concurrently.

TYPE: int | None DEFAULT: None

title

AsyncAPI subscriber object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI subscriber object description. Uses decorated docstring as default.

TYPE: str | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
SubscriberType

The subscriber object.

TYPE: LogicSubscriber

Source code in faststream/redis/broker/registrator.py
@override
def subscriber(
    self,
    channel: Union["PubSub", str, None] = None,
    *,
    list: Union["ListSub", str, None] = None,
    stream: Union["StreamSub", str, None] = None,
    # broker arguments
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    codec: Optional["CodecProto"] = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type["MessageFormat"] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int | None = None,
) -> "LogicSubscriber":
    """Subscribe a handler to a RabbitMQ queue.

    Args:
        channel: Redis PubSub object name to send message.
        list: Redis List object name to send message.
        stream: Redis Stream object name to send message.
        ack_policy: Acknowledgement policy for message processing.
        dependencies: Dependencies list (`[Depends(),]`) to apply to the subscriber.
        parser: Parser to map original **IncomingMessage** Msg to FastStream one.
        decoder: Function to decode FastStream msg bytes body to python objects.
        codec: Custom codec object.
        no_reply: Whether to disable **FastStream** RPC and Reply To auto responses or not.
        message_format: Which format to use when parsing messages.
        persistent: Whether to make the subscriber persistent or not.
        max_workers: Number of workers to process messages concurrently.
        title: AsyncAPI subscriber object title.
        description: AsyncAPI subscriber object description. Uses decorated docstring as default.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.

    Returns:
        SubscriberType: The subscriber object.
    """
    subscriber = create_subscriber(
        channel=channel,
        list=list,
        stream=stream,
        # subscriber args
        max_workers=max_workers or 1,
        no_reply=no_reply,
        ack_policy=ack_policy,
        message_format=message_format,
        config=cast("RedisBrokerConfig", self.config),
        # AsyncAPI
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    return subscriber.add_call(
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        codec_=codec,
        dependencies_=dependencies,
    )

publisher #

publisher(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> StreamPublisher
publisher(
    channel: None = None,
    *,
    list: str = ...,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> ListPublisher
publisher(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> Union[ListPublisher, ListBatchPublisher]
publisher(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> ChannelPublisher
publisher(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> LogicPublisher
publisher(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> LogicPublisher

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
channel

Redis PubSub object name to send message.

TYPE: Union[PubSub, str, None] DEFAULT: None

list

Redis List object name to send message.

TYPE: Union[ListSub, str, None] DEFAULT: None

stream

Redis Stream object name to send message.

TYPE: Union[StreamSub, str, None] DEFAULT: None

headers

Message headers to store meta-information. Can be overridden by publish.headers if specified.

TYPE: dict[str, Any] | None DEFAULT: None

reply_to

Reply message destination PubSub object name.

TYPE: str DEFAULT: ''

message_format

Which format to use when parsing messages.

TYPE: type[MessageFormat] | None DEFAULT: None

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the publisher persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/redis/broker/registrator.py
@override
def publisher(
    self,
    channel: Union["PubSub", str, None] = None,
    *,
    list: Union["ListSub", str, None] = None,
    stream: Union["StreamSub", str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    message_format: type["MessageFormat"] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> "LogicPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        channel: Redis PubSub object name to send message.
        list: Redis List object name to send message.
        stream: Redis Stream object name to send message.
        headers: Message headers to store meta-information. Can be overridden
            by `publish.headers` if specified.
        reply_to: Reply message destination PubSub object name.
        message_format: Which format to use when parsing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type. Should be any python-native
            object annotation or `pydantic.BaseModel`.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.
        persistent: Whether to make the publisher persistent or not.
    """
    publisher = create_publisher(
        channel=channel,
        list=list,
        stream=stream,
        headers=headers,
        reply_to=reply_to,
        # Specific
        config=cast("RedisBrokerConfig", self.config),
        message_format=message_format,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )
    super().publisher(publisher, persistent=persistent)
    return publisher

include_router #

include_router(
    router: RedisRegistrator,
    *,
    prefix: str = "",
    dependencies: Iterable[Dependant] = (),
    middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
    include_in_schema: bool | None = None,
) -> None
Source code in faststream/redis/broker/registrator.py
@override
def include_router(
    self,
    router: "RedisRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, RedisRegistrator):
        msg = (
            f"Router must be an instance of RedisRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(
    *routers: Registrator[MsgType, Any],
) -> None

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

connect async #

connect() -> ConnectionType

Connect to a remote server.

Source code in faststream/_internal/broker/broker.py
async def connect(self) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        self._connection = await self._connect()
        self._setup_logger()

    return self._connection

stop async #

stop(
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional[TracebackType] = None,
) -> None
Source code in faststream/redis/broker/broker.py
async def stop(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await super().stop(exc_type, exc_val, exc_tb)
    await self.config.disconnect()
    self._connection = None

start async #

start() -> None
Source code in faststream/redis/broker/broker.py
async def start(self) -> None:
    await self.connect()
    await super().start()

publish async #

publish(
    message: SendableMessage = None,
    channel: str | None = None,
    *,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    list: str | None = None,
    stream: None = None,
    maxlen: int | None = None,
    pipeline: Optional[Pipeline[bytes]] = None,
) -> int
publish(
    message: SendableMessage = None,
    channel: str | None = None,
    *,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    list: str | None = None,
    stream: str = ...,
    maxlen: int | None = None,
    pipeline: Optional[Pipeline[bytes]] = None,
) -> bytes
publish(
    message: SendableMessage = None,
    channel: str | None = None,
    *,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    pipeline: Optional[Pipeline[bytes]] = None,
) -> int | bytes

Publish message directly.

This method allows you to publish a message in a non-AsyncAPI-documented way. It can be used in other frameworks or to publish messages at specific intervals.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: SendableMessage DEFAULT: None

channel

Redis PubSub object name to send message.

TYPE: str | None DEFAULT: None

reply_to

Reply message destination PubSub object name.

TYPE: str DEFAULT: ''

headers

Message headers to store metainformation.

TYPE: dict[str, Any] | None DEFAULT: None

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

list

Redis List object name to send message.

TYPE: str | None DEFAULT: None

stream

Redis Stream object name to send message.

TYPE: str | None DEFAULT: None

maxlen

Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded.

TYPE: int | None DEFAULT: None

pipeline

Redis pipeline to use for publishing messages.

TYPE: Optional[Pipeline[bytes]] DEFAULT: None

RETURNS DESCRIPTION
int

The result of the publish operation, typically the number of messages published.

TYPE: int | bytes

Source code in faststream/redis/broker/broker.py
@override
async def publish(
    self,
    message: "SendableMessage" = None,
    channel: str | None = None,
    *,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    pipeline: Optional["Pipeline[bytes]"] = None,
) -> int | bytes:
    """Publish message directly.

    This method allows you to publish a message in a non-AsyncAPI-documented way.
    It can be used in other frameworks or to publish messages at specific intervals.

    Args:
        message:
            Message body to send.
        channel:
            Redis PubSub object name to send message.
        reply_to:
            Reply message destination PubSub object name.
        headers:
            Message headers to store metainformation.
        correlation_id:
            Manual message correlation_id setter. correlation_id is a useful option to trace messages.
        list:
            Redis List object name to send message.
        stream:
            Redis Stream object name to send message.
        maxlen:
            Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded.
        pipeline:
            Redis pipeline to use for publishing messages.

    Returns:
        int: The result of the publish operation, typically the number of messages published.
    """
    cmd = RedisPublishCommand(
        message,
        correlation_id=correlation_id or gen_cor_id(),
        channel=channel,
        list=list,
        stream=stream,
        maxlen=maxlen,
        reply_to=reply_to,
        headers=headers,
        pipeline=pipeline,
        _publish_type=PublishType.PUBLISH,
        message_format=self.message_format,
    )

    result: int | bytes = await super()._basic_publish(
        cmd,
        producer=self.config.producer,
    )
    return result

request async #

request(
    message: SendableMessage,
    channel: str | None = None,
    *,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    correlation_id: str | None = None,
    headers: dict[str, Any] | None = None,
    timeout: float | None = 30.0,
) -> RedisChannelMessage
Source code in faststream/redis/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    channel: str | None = None,
    *,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    correlation_id: str | None = None,
    headers: dict[str, Any] | None = None,
    timeout: float | None = 30.0,
) -> "RedisChannelMessage":
    cmd = RedisPublishCommand(
        message,
        correlation_id=correlation_id or gen_cor_id(),
        channel=channel,
        list=list,
        stream=stream,
        maxlen=maxlen,
        headers=headers,
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
        message_format=self.message_format,
    )
    msg: RedisChannelMessage = await super()._basic_request(
        cmd,
        producer=self.config.producer,
    )
    return msg

publish_batch async #

publish_batch(
    *messages: SendableMessage,
    list: str,
    correlation_id: str | None = None,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    pipeline: Optional[Pipeline[bytes]] = None,
) -> int

Publish multiple messages to Redis List by one request.

PARAMETER DESCRIPTION
*messages

Messages bodies to send.

TYPE: SendableMessage DEFAULT: ()

list

Redis List object name to send messages.

TYPE: str

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

reply_to

Reply message destination PubSub object name.

TYPE: str DEFAULT: ''

headers

Message headers to store metainformation.

TYPE: dict[str, Any] | None DEFAULT: None

pipeline

Redis pipeline to use for publishing messages.

TYPE: Optional[Pipeline[bytes]] DEFAULT: None

RETURNS DESCRIPTION
int

The result of the batch publish operation.

TYPE: int

Source code in faststream/redis/broker/broker.py
@override
async def publish_batch(  # type: ignore[override]
    self,
    *messages: "SendableMessage",
    list: str,
    correlation_id: str | None = None,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    pipeline: Optional["Pipeline[bytes]"] = None,
) -> int:
    """Publish multiple messages to Redis List by one request.

    Args:
        *messages: Messages bodies to send.
        list: Redis List object name to send messages.
        correlation_id: Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages.
        reply_to: Reply message destination PubSub object name.
        headers: Message headers to store metainformation.
        pipeline: Redis pipeline to use for publishing messages.

    Returns:
        int: The result of the batch publish operation.
    """
    cmd = RedisPublishCommand(
        *messages,
        list=list,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id or gen_cor_id(),
        pipeline=pipeline,
        _publish_type=PublishType.PUBLISH,
        message_format=self.message_format,
    )

    result: int = await self._basic_publish_batch(
        cmd,
        producer=self.config.producer,
    )
    return result

ping async #

ping(timeout: float | None = 3) -> bool
Source code in faststream/redis/broker/broker.py
@override
async def ping(self, timeout: float | None = 3) -> bool:
    sleep_time = (timeout or 10) / 10

    with move_on_after(timeout) as cancel_scope:
        if self._connection is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            try:
                if await self._connection.ping():
                    return True

            except ConnectionError:
                pass

            await anyio.sleep(sleep_time)

    return False