Skip to content

RedisRegistrator

faststream.redis.broker.registrator.RedisRegistrator #

RedisRegistrator(*, config, routers)

Bases: Registrator[UnifyRedisDict, RedisBrokerConfig]

Includable to RedisBroker router.

Source code in faststream/_internal/broker/registrator.py
def __init__(
    self,
    *,
    config: BrokerConfigType,
    routers: Iterable["Registrator[MsgType]"],
) -> None:
    self._parser = config.broker_parser
    self._decoder = config.broker_decoder

    self.config: ConfigComposition[BrokerConfigType] = ConfigComposition(config)

    self._subscribers: WeakSet[SubscriberUsecase[MsgType]] = WeakSet()
    self._publishers: WeakSet[PublisherUsecase] = WeakSet()
    self.routers: list[Registrator[MsgType, Any]] = []

    self.__persistent_subscribers: list[SubscriberUsecase[MsgType]] = []
    self.__persistent_publishers: list[PublisherUsecase] = []

    self.include_routers(*routers)

config instance-attribute #

config = ConfigComposition(config)

routers instance-attribute #

routers = []

subscribers property #

subscribers

publishers property #

publishers

subscriber #

subscriber(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> ChannelSubscriber
subscriber(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = ...,
) -> ChannelConcurrentSubscriber
subscriber(
    channel: None = None,
    *,
    list: str = ...,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> ListSubscriber
subscriber(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> Union[ListSubscriber, ListBatchSubscriber]
subscriber(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = ...,
) -> ListConcurrentSubscriber
subscriber(
    channel: None = None,
    *,
    list: None = None,
    stream: str = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> StreamSubscriber
subscriber(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: None = None,
) -> Union[StreamSubscriber, StreamBatchSubscriber]
subscriber(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = ...,
) -> StreamConcurrentSubscriber
subscriber(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int | None = None,
) -> LogicSubscriber
subscriber(
    channel=None,
    *,
    list=None,
    stream=None,
    dependencies=(),
    parser=None,
    decoder=None,
    middlewares=(),
    no_ack=EMPTY,
    ack_policy=EMPTY,
    no_reply=False,
    message_format=None,
    persistent=True,
    title=None,
    description=None,
    include_in_schema=True,
    max_workers=None,
)

Subscribe a handler to a RabbitMQ queue.

PARAMETER DESCRIPTION
channel

Redis PubSub object name to send message.

TYPE: Union[PubSub, str, None] DEFAULT: None

list

Redis List object name to send message.

TYPE: Union[ListSub, str, None] DEFAULT: None

stream

Redis Stream object name to send message.

TYPE: Union[StreamSub, str, None] DEFAULT: None

no_ack

Whether to disable FastStream auto acknowledgement logic or not.

TYPE: bool DEFAULT: EMPTY

ack_policy

Acknowledgement policy for message processing.

TYPE: AckPolicy DEFAULT: EMPTY

dependencies

Dependencies list ([Depends(),]) to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Parser to map original IncomingMessage Msg to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

middlewares

Subscriber middlewares to wrap incoming message processing.

TYPE: Sequence[SubscriberMiddleware[Any]] DEFAULT: ()

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

message_format

Which format to use when parsing messages.

TYPE: type[MessageFormat] | None DEFAULT: None

persistent

Whether to make the subscriber persistent or not.

TYPE: bool DEFAULT: True

max_workers

Number of workers to process messages concurrently.

TYPE: int | None DEFAULT: None

title

AsyncAPI subscriber object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI subscriber object description. Uses decorated docstring as default.

TYPE: str | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
SubscriberType

The subscriber object.

TYPE: LogicSubscriber

Source code in faststream/redis/broker/registrator.py
@override
def subscriber(
    self,
    channel: Union["PubSub", str, None] = None,
    *,
    list: Union["ListSub", str, None] = None,
    stream: Union["StreamSub", str, None] = None,
    # broker arguments
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[Any]"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    no_ack: Annotated[
        bool,
        deprecated(
            "This option was deprecated in 0.6.0 to prior to **ack_policy=AckPolicy.MANUAL**. "
            "Scheduled to remove in 0.7.0",
        ),
    ] = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    message_format: type["MessageFormat"] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    max_workers: int | None = None,
) -> "LogicSubscriber":
    """Subscribe a handler to a RabbitMQ queue.

    Args:
        channel: Redis PubSub object name to send message.
        list: Redis List object name to send message.
        stream: Redis Stream object name to send message.
        no_ack: Whether to disable **FastStream** auto acknowledgement logic or not.
        ack_policy: Acknowledgement policy for message processing.
        dependencies: Dependencies list (`[Depends(),]`) to apply to the subscriber.
        parser: Parser to map original **IncomingMessage** Msg to FastStream one.
        decoder: Function to decode FastStream msg bytes body to python objects.
        middlewares: Subscriber middlewares to wrap incoming message processing.
        no_reply: Whether to disable **FastStream** RPC and Reply To auto responses or not.
        message_format: Which format to use when parsing messages.
        persistent: Whether to make the subscriber persistent or not.
        max_workers: Number of workers to process messages concurrently.
        title: AsyncAPI subscriber object title.
        description: AsyncAPI subscriber object description. Uses decorated docstring as default.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.

    Returns:
        SubscriberType: The subscriber object.
    """
    subscriber = create_subscriber(
        channel=channel,
        list=list,
        stream=stream,
        # subscriber args
        max_workers=max_workers or 1,
        no_ack=no_ack,
        no_reply=no_reply,
        ack_policy=ack_policy,
        message_format=message_format,
        config=cast("RedisBrokerConfig", self.config),
        # AsyncAPI
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    return subscriber.add_call(
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(
    channel: None = None,
    *,
    list: None = None,
    stream: Union[StreamSub, str] = ...,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> StreamPublisher
publisher(
    channel: None = None,
    *,
    list: str = ...,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> ListPublisher
publisher(
    channel: None = None,
    *,
    list: Union[ListSub, str] = ...,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> Union[ListPublisher, ListBatchPublisher]
publisher(
    channel: Union[PubSub, str] = ...,
    *,
    list: None = None,
    stream: None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> ChannelPublisher
publisher(
    channel: Union[PubSub, str, None] = None,
    *,
    list: Union[ListSub, str, None] = None,
    stream: Union[StreamSub, str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Sequence[PublisherMiddleware] = (),
    message_format: type[MessageFormat] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> LogicPublisher
publisher(
    channel=None,
    *,
    list=None,
    stream=None,
    headers=None,
    reply_to="",
    middlewares=(),
    message_format=None,
    persistent=True,
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
channel

Redis PubSub object name to send message.

TYPE: Union[PubSub, str, None] DEFAULT: None

list

Redis List object name to send message.

TYPE: Union[ListSub, str, None] DEFAULT: None

stream

Redis Stream object name to send message.

TYPE: Union[StreamSub, str, None] DEFAULT: None

headers

Message headers to store meta-information. Can be overridden by publish.headers if specified.

TYPE: dict[str, Any] | None DEFAULT: None

reply_to

Reply message destination PubSub object name.

TYPE: str DEFAULT: ''

middlewares

Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

message_format

Which format to use when parsing messages.

TYPE: type[MessageFormat] | None DEFAULT: None

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the publisher persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/redis/broker/registrator.py
@override
def publisher(
    self,
    channel: Union["PubSub", str, None] = None,
    *,
    list: Union["ListSub", str, None] = None,
    stream: Union["StreamSub", str, None] = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    message_format: type["MessageFormat"] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> "LogicPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        channel: Redis PubSub object name to send message.
        list: Redis List object name to send message.
        stream: Redis Stream object name to send message.
        headers: Message headers to store meta-information. Can be overridden
            by `publish.headers` if specified.
        reply_to: Reply message destination PubSub object name.
        middlewares: Publisher middlewares to wrap outgoing messages.
        message_format: Which format to use when parsing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type. Should be any python-native
            object annotation or `pydantic.BaseModel`.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.
        persistent: Whether to make the publisher persistent or not.
    """
    publisher = create_publisher(
        channel=channel,
        list=list,
        stream=stream,
        headers=headers,
        reply_to=reply_to,
        # Specific
        config=cast("RedisBrokerConfig", self.config),
        middlewares=middlewares,
        message_format=message_format,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )
    super().publisher(publisher, persistent=persistent)
    return publisher

include_router #

include_router(
    router,
    *,
    prefix="",
    dependencies=(),
    middlewares=(),
    include_in_schema=None,
)
Source code in faststream/redis/broker/registrator.py
@override
def include_router(
    self,
    router: "RedisRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, RedisRegistrator):
        msg = (
            f"Router must be an instance of RedisRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(middleware)

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)