Skip to content

RabbitRouter

faststream.rabbit.RabbitRouter #

RabbitRouter(
    prefix="",
    handlers=(),
    *,
    dependencies=(),
    middlewares=(),
    routers=(),
    parser=None,
    decoder=None,
    include_in_schema=None,
)

Bases: RabbitRegistrator, BrokerRouter[IncomingMessage]

Includable to RabbitBroker router.

Initialized RabbitRouter.

PARAMETER DESCRIPTION
prefix

String prefix to add to all subscribers queues.

TYPE: str DEFAULT: ''

handlers

Route object to include.

TYPE: Iterable[RabbitRoute] DEFAULT: ()

dependencies

Dependencies list ([Dependant(),]) to apply to all routers' publishers/subscribers. Defaults to ().

TYPE: Iterable[Dependant] DEFAULT: ()

middlewares

Router middlewares to apply to all routers' publishers/subscribers. Defaults to ().

TYPE: Sequence[BrokerMiddleware[Any, Any]] DEFAULT: ()

routers

Routers to apply to broker. Defaults to ().

TYPE: Iterable[RabbitRegistrator] DEFAULT: ()

parser

Parser to map original IncomingMessage Msg to FastStream one. Defaults to None.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects. Defaults to None.

TYPE: Optional[CustomCallable] DEFAULT: None

include_in_schema

Whetever to include operation in AsyncAPI schema or not. Defaults to None.

TYPE: bool | None DEFAULT: None

Source code in faststream/rabbit/broker/router.py
def __init__(
    self,
    prefix: str = "",
    handlers: Iterable[RabbitRoute] = (),
    *,
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    routers: Iterable[RabbitRegistrator] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    include_in_schema: bool | None = None,
) -> None:
    """Initialized RabbitRouter.

    Args:
        prefix:
            String prefix to add to all subscribers queues.
        handlers:
            Route object to include.
        dependencies:
            Dependencies list (`[Dependant(),]`) to apply to all routers' publishers/subscribers. Defaults to ().
        middlewares:
            Router middlewares to apply to all routers' publishers/subscribers. Defaults to ().
        routers:
            Routers to apply to broker. Defaults to ().
        parser:
            Parser to map original **IncomingMessage** Msg to FastStream one. Defaults to None.
        decoder:
            Function to decode FastStream msg bytes body to python objects. Defaults to None.
        include_in_schema:
            Whetever to include operation in AsyncAPI schema or not. Defaults to None.
    """
    super().__init__(
        handlers=handlers,
        config=RabbitBrokerConfig(
            broker_middlewares=middlewares,
            broker_dependencies=dependencies,
            broker_parser=parser,
            broker_decoder=decoder,
            include_in_schema=include_in_schema,
            prefix=prefix,
        ),
        routers=routers,
    )

config instance-attribute #

config = ConfigComposition(config)

routers instance-attribute #

routers = []

subscribers property #

subscribers

publishers property #

publishers

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(middleware)

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

subscriber #

subscriber(
    queue,
    exchange=None,
    *,
    channel=None,
    consume_args=None,
    no_ack=EMPTY,
    ack_policy=EMPTY,
    dependencies=(),
    parser=None,
    decoder=None,
    middlewares=(),
    no_reply=False,
    persistent=True,
    title=None,
    description=None,
    include_in_schema=True,
)

Subscribe a handler to a RabbitMQ queue.

PARAMETER DESCRIPTION
queue

RabbitMQ queue to listen. FastStream declares and binds queue object to exchange automatically by default.

TYPE: Union[str, RabbitQueue]

exchange

RabbitMQ exchange to bind queue to. Uses default exchange if not presented. FastStream declares exchange object automatically by default.

TYPE: Union[str, RabbitExchange, None] DEFAULT: None

channel

Channel to use for consuming messages.

TYPE: Optional[Channel] DEFAULT: None

consume_args

Extra consumer arguments to use in queue.consume(...) method.

TYPE: dict[str, Any] | None DEFAULT: None

no_ack

Whether to disable FastStream auto acknowledgement logic or not.

TYPE: bool DEFAULT: EMPTY

ack_policy

Acknowledgement policy for message processing.

TYPE: AckPolicy DEFAULT: EMPTY

dependencies

Dependencies list ([Dependant(),]) to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Parser to map original IncomingMessage Msg to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

middlewares

Subscriber middlewares to wrap incoming message processing.

TYPE: Sequence[SubscriberMiddleware[Any]] DEFAULT: ()

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

title

AsyncAPI subscriber object title.

TYPE: Optional[str] DEFAULT: None

description

AsyncAPI subscriber object description. Uses decorated docstring as default.

TYPE: Optional[str] DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the subscriber persistent or not.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
RabbitSubscriber

The subscriber specification object.

TYPE: RabbitSubscriber

Source code in faststream/rabbit/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    queue: Union[str, "RabbitQueue"],
    exchange: Union[str, "RabbitExchange", None] = None,
    *,
    channel: Optional["Channel"] = None,
    consume_args: dict[str, Any] | None = None,
    no_ack: Annotated[
        bool,
        deprecated(
            "Deprecated in 0.6.0, use `ack_policy=AckPolicy.MANUAL` instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    # broker arguments
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[Any]"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    no_reply: bool = False,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> "RabbitSubscriber":
    """Subscribe a handler to a RabbitMQ queue.

    Args:
        queue (Union[str, RabbitQueue]): RabbitMQ queue to listen. **FastStream** declares and binds queue object to `exchange` automatically by default.
        exchange (Union[str, RabbitExchange, None], optional): RabbitMQ exchange to bind queue to. Uses default exchange if not presented. **FastStream** declares exchange object automatically by default.
        channel (Optional[Channel], optional): Channel to use for consuming messages.
        consume_args (dict[str, Any] | None, optional): Extra consumer arguments to use in `queue.consume(...)` method.
        no_ack (bool, optional): Whether to disable **FastStream** auto acknowledgement logic or not.
        ack_policy (AckPolicy, optional): Acknowledgement policy for message processing.
        dependencies (Iterable[Dependant], optional): Dependencies list (`[Dependant(),]`) to apply to the subscriber.
        parser (Optional[CustomCallable], optional): Parser to map original **IncomingMessage** Msg to FastStream one.
        decoder (Optional[CustomCallable], optional): Function to decode FastStream msg bytes body to python objects.
        middlewares (Sequence[SubscriberMiddleware[Any]], optional): Subscriber middlewares to wrap incoming message processing.
        no_reply (bool, optional): Whether to disable **FastStream** RPC and Reply To auto responses or not.
        title (Optional[str], optional): AsyncAPI subscriber object title.
        description (Optional[str], optional): AsyncAPI subscriber object description. Uses decorated docstring as default.
        include_in_schema (bool, optional): Whether to include operation in AsyncAPI schema or not.
        persistent (bool): Whether to make the subscriber persistent or not.

    Returns:
        RabbitSubscriber: The subscriber specification object.
    """
    subscriber = create_subscriber(
        queue=RabbitQueue.validate(queue),
        exchange=RabbitExchange.validate(exchange),
        consume_args=consume_args,
        channel=channel,
        # subscriber args
        ack_policy=ack_policy,
        no_ack=no_ack,
        no_reply=no_reply,
        # broker args
        config=cast("RabbitBrokerConfig", self.config),
        # specification args
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    return subscriber.add_call(
        parser_=parser,
        decoder_=decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(
    queue="",
    exchange=None,
    *,
    routing_key="",
    mandatory=True,
    immediate=False,
    timeout=None,
    persist=False,
    reply_to=None,
    priority=None,
    persistent=True,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
    headers=None,
    content_type=None,
    content_encoding=None,
    expiration=None,
    message_type=None,
    user_id=None,
)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
queue

Default message routing key to publish with.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

Target exchange to publish message to.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

Default message routing key to publish with.

TYPE: str DEFAULT: ''

mandatory

Client waits for confirmation that the message is placed to some queue. RabbitMQ returns message to client if there is no suitable queue.

TYPE: bool DEFAULT: True

immediate

Client expects that there is a consumer ready to take the message to work. RabbitMQ returns message to client if there is no suitable consumer.

TYPE: bool DEFAULT: False

timeout

Send confirmation time from RabbitMQ.

TYPE: TimeoutType DEFAULT: None

persist

Restore the message on RabbitMQ reboot.

TYPE: bool DEFAULT: False

reply_to

Reply message routing key to send with (always sending to default exchange).

TYPE: str | None DEFAULT: None

priority

The message priority (0 by default).

TYPE: int | None DEFAULT: None

middlewares

Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

headers

Message headers to store meta-information. Can be overridden by publish.headers if specified.

TYPE: Optional[HeadersType] DEFAULT: None

content_type

Message content-type header. Used by application, not core RabbitMQ. Will be set automatically if not specified.

TYPE: str | None DEFAULT: None

content_encoding

Message body content encoding, e.g. gzip.

TYPE: str | None DEFAULT: None

expiration

Message expiration (lifetime) in seconds (or datetime or timedelta).

TYPE: Optional[DateType] DEFAULT: None

message_type

Application-specific message type, e.g. orders.created.

TYPE: str | None DEFAULT: None

user_id

Publisher connection User ID, validated if set.

TYPE: str | None DEFAULT: None

persistent

Whether to make the publisher persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/rabbit/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    queue: Union["RabbitQueue", str] = "",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    persist: bool = False,
    reply_to: str | None = None,
    priority: int | None = None,
    persistent: bool = True,
    # specific
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
    # message args
    headers: Optional["HeadersType"] = None,
    content_type: str | None = None,
    content_encoding: str | None = None,
    expiration: Optional["DateType"] = None,
    message_type: str | None = None,
    user_id: str | None = None,
) -> "RabbitPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        queue: Default message routing key to publish with.
        exchange: Target exchange to publish message to.
        routing_key: Default message routing key to publish with.
        Overrides `queue` option if presented.
        mandatory: Client waits for confirmation that the message is placed
            to some queue. RabbitMQ returns message to client if there is no suitable queue.
        immediate: Client expects that there is a consumer ready to take the message to work.
            RabbitMQ returns message to client if there is no suitable consumer.
        timeout: Send confirmation time from RabbitMQ.
        persist: Restore the message on RabbitMQ reboot.
        reply_to: Reply message routing key to send with (always sending to default exchange).
        priority: The message priority (0 by default).
        middlewares: Publisher middlewares to wrap outgoing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type. Should be any python-native
            object annotation or `pydantic.BaseModel`.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.
        headers: Message headers to store meta-information. Can be overridden
            by `publish.headers` if specified.
        content_type: Message **content-type** header. Used by application, not core RabbitMQ.
            Will be set automatically if not specified.
        content_encoding: Message body content encoding, e.g. **gzip**.
        expiration: Message expiration (lifetime) in seconds (or datetime or timedelta).
        message_type: Application-specific message type, e.g. **orders.created**.
        user_id: Publisher connection User ID, validated if set.
        persistent: Whether to make the publisher persistent or not.
    """
    message_kwargs = PublishKwargs(
        mandatory=mandatory,
        immediate=immediate,
        timeout=timeout,
        persist=persist,
        reply_to=reply_to,
        headers=headers,
        priority=priority,
        content_type=content_type,
        content_encoding=content_encoding,
        message_type=message_type,
        user_id=user_id,
        expiration=expiration,
    )

    publisher = create_publisher(
        routing_key=routing_key,
        queue=RabbitQueue.validate(queue),
        exchange=RabbitExchange.validate(exchange),
        message_kwargs=message_kwargs,
        # publisher args
        middlewares=middlewares,
        # broker args
        config=cast("RabbitBrokerConfig", self.config),
        # specification args
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )

    super().publisher(publisher, persistent=persistent)

    return publisher

include_router #

include_router(
    router,
    *,
    prefix="",
    dependencies=(),
    middlewares=(),
    include_in_schema=None,
)
Source code in faststream/rabbit/broker/registrator.py
@override
def include_router(
    self,
    router: "RabbitRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, RabbitRegistrator):
        msg = (
            f"Router must be an instance of RabbitRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)