Skip to content

RabbitBroker

faststream.rabbit.broker.RabbitBroker #

RabbitBroker(
    url="amqp://guest:guest@localhost:5672/",
    *,
    host=None,
    port=None,
    virtualhost=None,
    ssl_options=None,
    client_properties=None,
    timeout=None,
    fail_fast=True,
    reconnect_interval=5.0,
    default_channel=None,
    app_id=SERVICE_NAME,
    graceful_timeout=None,
    decoder=None,
    parser=None,
    dependencies=(),
    middlewares=(),
    routers=(),
    security=None,
    specification_url=None,
    protocol=None,
    protocol_version="0.9.1",
    description=None,
    tags=(),
    logger=EMPTY,
    log_level=INFO,
    apply_types=True,
    serializer=EMPTY,
    provider=None,
    context=None,
)

Bases: RabbitRegistrator, BrokerUsecase[IncomingMessage, RobustConnection]

A class to represent a RabbitMQ broker.

Initialize the RabbitBroker.

PARAMETER DESCRIPTION
url

RabbitMQ destination location to connect.

TYPE: Union[str, URL, None] DEFAULT: 'amqp://guest:guest@localhost:5672/'

host

Destination host. This option overrides url option host.

TYPE: str | None DEFAULT: None

port

Destination port. This option overrides url option port.

TYPE: int | None DEFAULT: None

virtualhost

RabbitMQ virtual host to use in the current broker connection.

TYPE: str | None DEFAULT: None

ssl_options

Extra ssl options to establish connection.

TYPE: Optional[SSLOptions] DEFAULT: None

client_properties

Add custom client capability.

TYPE: Optional[RabbitClientProperties] DEFAULT: None

timeout

Connection establishment timeout.

TYPE: TimeoutType DEFAULT: None

fail_fast

Broker startup raises AMQPConnectionError if RabbitMQ is unreachable.

TYPE: bool DEFAULT: True

reconnect_interval

Time to sleep between reconnection attempts.

TYPE: TimeoutType DEFAULT: 5.0

default_channel

Default channel settings to use.

TYPE: Optional[Channel] DEFAULT: None

app_id

Application name to mark outgoing messages by.

TYPE: str | None DEFAULT: SERVICE_NAME

graceful_timeout

Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.

TYPE: float | None DEFAULT: None

decoder

Custom decoder object.

TYPE: Optional[CustomCallable] DEFAULT: None

parser

Custom parser object.

TYPE: Optional[CustomCallable] DEFAULT: None

dependencies

Dependencies to apply to all broker subscribers.

TYPE: Iterable[Dependant] DEFAULT: ()

middlewares

Middlewares to apply to all broker publishers/subscribers.

TYPE: Sequence[BrokerMiddleware[Any, Any]] DEFAULT: ()

routers

RabbitRouters to build a broker with.

TYPE: Iterable[RabbitRegistrator] DEFAULT: ()

security

Security options to connect broker and generate AsyncAPI server security information.

TYPE: Optional[BaseSecurity] DEFAULT: None

specification_url

AsyncAPI hardcoded server addresses. Use servers if not specified.

TYPE: str | None DEFAULT: None

protocol

AsyncAPI server protocol.

TYPE: str | None DEFAULT: None

protocol_version

AsyncAPI server protocol version.

TYPE: str | None DEFAULT: '0.9.1'

description

AsyncAPI server description.

TYPE: str | None DEFAULT: None

tags

AsyncAPI server tags.

TYPE: Iterable[Union[Tag, TagDict]] DEFAULT: ()

logger

User-specified logger to pass into Context and log service messages.

TYPE: Optional[LoggerProto] DEFAULT: EMPTY

log_level

Service messages log level.

TYPE: int DEFAULT: INFO

apply_types

Whether to use FastDepends or not.

TYPE: bool DEFAULT: True

serializer

FastDepends-compatible serializer to validate incoming messages.

TYPE: Optional[SerializerProto] DEFAULT: EMPTY

provider

Provider for FastDepends.

TYPE: Optional[Provider] DEFAULT: None

context

Context for FastDepends.

TYPE: Optional[ContextRepo] DEFAULT: None

Source code in faststream/rabbit/broker/broker.py
def __init__(
    self,
    url: Union[str, "URL", None] = "amqp://guest:guest@localhost:5672/",
    *,
    host: str | None = None,
    port: int | None = None,
    virtualhost: str | None = None,
    ssl_options: Optional["SSLOptions"] = None,
    client_properties: Optional["RabbitClientProperties"] = None,
    timeout: "TimeoutType" = None,
    fail_fast: bool = True,
    reconnect_interval: "TimeoutType" = 5.0,
    default_channel: Optional["Channel"] = None,
    app_id: str | None = SERVICE_NAME,
    # broker base args
    graceful_timeout: float | None = None,
    decoder: Optional["CustomCallable"] = None,
    parser: Optional["CustomCallable"] = None,
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    routers: Iterable[RabbitRegistrator] = (),
    # AsyncAPI args
    security: Optional["BaseSecurity"] = None,
    specification_url: str | None = None,
    protocol: str | None = None,
    protocol_version: str | None = "0.9.1",
    description: str | None = None,
    tags: Iterable[Union["Tag", "TagDict"]] = (),
    # logging args
    logger: Optional["LoggerProto"] = EMPTY,
    log_level: int = logging.INFO,
    # FastDepends args
    apply_types: bool = True,
    serializer: Optional["SerializerProto"] = EMPTY,
    provider: Optional["Provider"] = None,
    context: Optional["ContextRepo"] = None,
) -> None:
    """Initialize the RabbitBroker.

    Args:
        url: RabbitMQ destination location to connect.
        host: Destination host. This option overrides `url` option host.
        port: Destination port. This option overrides `url` option port.
        virtualhost: RabbitMQ virtual host to use in the current broker connection.
        ssl_options: Extra ssl options to establish connection.
        client_properties: Add custom client capability.
        timeout: Connection establishment timeout.
        fail_fast: Broker startup raises `AMQPConnectionError` if RabbitMQ is unreachable.
        reconnect_interval: Time to sleep between reconnection attempts.
        default_channel: Default channel settings to use.
        app_id: Application name to mark outgoing messages by.
        graceful_timeout: Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.
        decoder: Custom decoder object.
        parser: Custom parser object.
        dependencies: Dependencies to apply to all broker subscribers.
        middlewares: Middlewares to apply to all broker publishers/subscribers.
        routers: RabbitRouters to build a broker with.
        security: Security options to connect broker and generate AsyncAPI server security information.
        specification_url: AsyncAPI hardcoded server addresses. Use `servers` if not specified.
        protocol: AsyncAPI server protocol.
        protocol_version: AsyncAPI server protocol version.
        description: AsyncAPI server description.
        tags: AsyncAPI server tags.
        logger: User-specified logger to pass into Context and log service messages.
        log_level: Service messages log level.
        apply_types: Whether to use FastDepends or not.
        serializer: FastDepends-compatible serializer to validate incoming messages.
        provider: Provider for FastDepends.
        context: Context for FastDepends.
    """
    security_args = parse_security(security)

    amqp_url = build_url(
        url,
        host=host,
        port=port,
        virtualhost=virtualhost,
        ssl_options=ssl_options,
        client_properties=client_properties,
        login=security_args.get("login"),
        password=security_args.get("password"),
        ssl=security_args.get("ssl"),
    )

    if specification_url is None:
        specification_url = str(amqp_url)

    # respect ascynapi_url argument scheme
    built_asyncapi_url = urlparse(specification_url)
    if protocol is None:
        protocol = built_asyncapi_url.scheme

    cm = ChannelManagerImpl(default_channel)
    declarer = RabbitDeclarerImpl(cm)

    producer = AioPikaFastProducerImpl(
        declarer=declarer,
        decoder=decoder,
        parser=parser,
    )

    super().__init__(
        # connection args
        url=str(amqp_url),
        ssl_context=security_args.get("ssl_context"),
        timeout=timeout,
        fail_fast=fail_fast,
        reconnect_interval=reconnect_interval,
        # Basic args
        routers=routers,
        config=RabbitBrokerConfig(
            channel_manager=cm,
            producer=producer,
            declarer=declarer,
            app_id=app_id,
            virtual_host=built_asyncapi_url.path,
            # both args
            broker_middlewares=middlewares,
            broker_parser=parser,
            broker_decoder=decoder,
            logger=make_rabbit_logger_state(
                logger=logger,
                log_level=log_level,
            ),
            fd_config=FastDependsConfig(
                use_fastdepends=apply_types,
                serializer=serializer,
                provider=provider or dependency_provider,
                context=context or ContextRepo(),
            ),
            # subscriber args
            broker_dependencies=dependencies,
            graceful_timeout=graceful_timeout,
            extra_context={
                "broker": self,
            },
        ),
        specification=BrokerSpec(
            description=description,
            url=[specification_url],
            protocol=protocol or built_asyncapi_url.scheme,
            protocol_version=protocol_version,
            security=security,
            tags=tags,
        ),
    )

    self._channel: RobustChannel | None = None

middlewares property #

middlewares

context property #

context

config instance-attribute #

config = ConfigComposition(config)

routers instance-attribute #

routers = []

subscribers property #

subscribers

publishers property #

publishers

specification instance-attribute #

specification = specification

running instance-attribute #

running = False

provider property #

provider

publish_batch async #

publish_batch(*messages, queue)
Source code in faststream/_internal/broker/pub_base.py
async def publish_batch(
    self,
    *messages: "SendableMessage",
    queue: str,
) -> Any:
    msg = f"{self.__class__.__name__} doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(middleware)

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

subscriber #

subscriber(
    queue,
    exchange=None,
    *,
    channel=None,
    consume_args=None,
    no_ack=EMPTY,
    ack_policy=EMPTY,
    dependencies=(),
    parser=None,
    decoder=None,
    middlewares=(),
    no_reply=False,
    persistent=True,
    title=None,
    description=None,
    include_in_schema=True,
)

Subscribe a handler to a RabbitMQ queue.

PARAMETER DESCRIPTION
queue

RabbitMQ queue to listen. FastStream declares and binds queue object to exchange automatically by default.

TYPE: Union[str, RabbitQueue]

exchange

RabbitMQ exchange to bind queue to. Uses default exchange if not presented. FastStream declares exchange object automatically by default.

TYPE: Union[str, RabbitExchange, None] DEFAULT: None

channel

Channel to use for consuming messages.

TYPE: Optional[Channel] DEFAULT: None

consume_args

Extra consumer arguments to use in queue.consume(...) method.

TYPE: dict[str, Any] | None DEFAULT: None

no_ack

Whether to disable FastStream auto acknowledgement logic or not.

TYPE: bool DEFAULT: EMPTY

ack_policy

Acknowledgement policy for message processing.

TYPE: AckPolicy DEFAULT: EMPTY

dependencies

Dependencies list ([Dependant(),]) to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Parser to map original IncomingMessage Msg to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

middlewares

Subscriber middlewares to wrap incoming message processing.

TYPE: Sequence[SubscriberMiddleware[Any]] DEFAULT: ()

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

title

AsyncAPI subscriber object title.

TYPE: Optional[str] DEFAULT: None

description

AsyncAPI subscriber object description. Uses decorated docstring as default.

TYPE: Optional[str] DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the subscriber persistent or not.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
RabbitSubscriber

The subscriber specification object.

TYPE: RabbitSubscriber

Source code in faststream/rabbit/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    queue: Union[str, "RabbitQueue"],
    exchange: Union[str, "RabbitExchange", None] = None,
    *,
    channel: Optional["Channel"] = None,
    consume_args: dict[str, Any] | None = None,
    no_ack: Annotated[
        bool,
        deprecated(
            "Deprecated in 0.6.0, use `ack_policy=AckPolicy.MANUAL` instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    # broker arguments
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[Any]"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    no_reply: bool = False,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> "RabbitSubscriber":
    """Subscribe a handler to a RabbitMQ queue.

    Args:
        queue (Union[str, RabbitQueue]): RabbitMQ queue to listen. **FastStream** declares and binds queue object to `exchange` automatically by default.
        exchange (Union[str, RabbitExchange, None], optional): RabbitMQ exchange to bind queue to. Uses default exchange if not presented. **FastStream** declares exchange object automatically by default.
        channel (Optional[Channel], optional): Channel to use for consuming messages.
        consume_args (dict[str, Any] | None, optional): Extra consumer arguments to use in `queue.consume(...)` method.
        no_ack (bool, optional): Whether to disable **FastStream** auto acknowledgement logic or not.
        ack_policy (AckPolicy, optional): Acknowledgement policy for message processing.
        dependencies (Iterable[Dependant], optional): Dependencies list (`[Dependant(),]`) to apply to the subscriber.
        parser (Optional[CustomCallable], optional): Parser to map original **IncomingMessage** Msg to FastStream one.
        decoder (Optional[CustomCallable], optional): Function to decode FastStream msg bytes body to python objects.
        middlewares (Sequence[SubscriberMiddleware[Any]], optional): Subscriber middlewares to wrap incoming message processing.
        no_reply (bool, optional): Whether to disable **FastStream** RPC and Reply To auto responses or not.
        title (Optional[str], optional): AsyncAPI subscriber object title.
        description (Optional[str], optional): AsyncAPI subscriber object description. Uses decorated docstring as default.
        include_in_schema (bool, optional): Whether to include operation in AsyncAPI schema or not.
        persistent (bool): Whether to make the subscriber persistent or not.

    Returns:
        RabbitSubscriber: The subscriber specification object.
    """
    subscriber = create_subscriber(
        queue=RabbitQueue.validate(queue),
        exchange=RabbitExchange.validate(exchange),
        consume_args=consume_args,
        channel=channel,
        # subscriber args
        ack_policy=ack_policy,
        no_ack=no_ack,
        no_reply=no_reply,
        # broker args
        config=cast("RabbitBrokerConfig", self.config),
        # specification args
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    return subscriber.add_call(
        parser_=parser,
        decoder_=decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(
    queue="",
    exchange=None,
    *,
    routing_key="",
    mandatory=True,
    immediate=False,
    timeout=None,
    persist=False,
    reply_to=None,
    priority=None,
    persistent=True,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
    headers=None,
    content_type=None,
    content_encoding=None,
    expiration=None,
    message_type=None,
    user_id=None,
)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
queue

Default message routing key to publish with.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

Target exchange to publish message to.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

Default message routing key to publish with.

TYPE: str DEFAULT: ''

mandatory

Client waits for confirmation that the message is placed to some queue. RabbitMQ returns message to client if there is no suitable queue.

TYPE: bool DEFAULT: True

immediate

Client expects that there is a consumer ready to take the message to work. RabbitMQ returns message to client if there is no suitable consumer.

TYPE: bool DEFAULT: False

timeout

Send confirmation time from RabbitMQ.

TYPE: TimeoutType DEFAULT: None

persist

Restore the message on RabbitMQ reboot.

TYPE: bool DEFAULT: False

reply_to

Reply message routing key to send with (always sending to default exchange).

TYPE: str | None DEFAULT: None

priority

The message priority (0 by default).

TYPE: int | None DEFAULT: None

middlewares

Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

headers

Message headers to store meta-information. Can be overridden by publish.headers if specified.

TYPE: Optional[HeadersType] DEFAULT: None

content_type

Message content-type header. Used by application, not core RabbitMQ. Will be set automatically if not specified.

TYPE: str | None DEFAULT: None

content_encoding

Message body content encoding, e.g. gzip.

TYPE: str | None DEFAULT: None

expiration

Message expiration (lifetime) in seconds (or datetime or timedelta).

TYPE: Optional[DateType] DEFAULT: None

message_type

Application-specific message type, e.g. orders.created.

TYPE: str | None DEFAULT: None

user_id

Publisher connection User ID, validated if set.

TYPE: str | None DEFAULT: None

persistent

Whether to make the publisher persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/rabbit/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    queue: Union["RabbitQueue", str] = "",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    persist: bool = False,
    reply_to: str | None = None,
    priority: int | None = None,
    persistent: bool = True,
    # specific
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
    # message args
    headers: Optional["HeadersType"] = None,
    content_type: str | None = None,
    content_encoding: str | None = None,
    expiration: Optional["DateType"] = None,
    message_type: str | None = None,
    user_id: str | None = None,
) -> "RabbitPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        queue: Default message routing key to publish with.
        exchange: Target exchange to publish message to.
        routing_key: Default message routing key to publish with.
        Overrides `queue` option if presented.
        mandatory: Client waits for confirmation that the message is placed
            to some queue. RabbitMQ returns message to client if there is no suitable queue.
        immediate: Client expects that there is a consumer ready to take the message to work.
            RabbitMQ returns message to client if there is no suitable consumer.
        timeout: Send confirmation time from RabbitMQ.
        persist: Restore the message on RabbitMQ reboot.
        reply_to: Reply message routing key to send with (always sending to default exchange).
        priority: The message priority (0 by default).
        middlewares: Publisher middlewares to wrap outgoing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type. Should be any python-native
            object annotation or `pydantic.BaseModel`.
        include_in_schema: Whether to include operation in AsyncAPI schema or not.
        headers: Message headers to store meta-information. Can be overridden
            by `publish.headers` if specified.
        content_type: Message **content-type** header. Used by application, not core RabbitMQ.
            Will be set automatically if not specified.
        content_encoding: Message body content encoding, e.g. **gzip**.
        expiration: Message expiration (lifetime) in seconds (or datetime or timedelta).
        message_type: Application-specific message type, e.g. **orders.created**.
        user_id: Publisher connection User ID, validated if set.
        persistent: Whether to make the publisher persistent or not.
    """
    message_kwargs = PublishKwargs(
        mandatory=mandatory,
        immediate=immediate,
        timeout=timeout,
        persist=persist,
        reply_to=reply_to,
        headers=headers,
        priority=priority,
        content_type=content_type,
        content_encoding=content_encoding,
        message_type=message_type,
        user_id=user_id,
        expiration=expiration,
    )

    publisher = create_publisher(
        routing_key=routing_key,
        queue=RabbitQueue.validate(queue),
        exchange=RabbitExchange.validate(exchange),
        message_kwargs=message_kwargs,
        # publisher args
        middlewares=middlewares,
        # broker args
        config=cast("RabbitBrokerConfig", self.config),
        # specification args
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )

    super().publisher(publisher, persistent=persistent)

    return publisher

include_router #

include_router(
    router,
    *,
    prefix="",
    dependencies=(),
    middlewares=(),
    include_in_schema=None,
)
Source code in faststream/rabbit/broker/registrator.py
@override
def include_router(
    self,
    router: "RabbitRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, RabbitRegistrator):
        msg = (
            f"Router must be an instance of RabbitRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

connect async #

connect()

Connect to a remote server.

Source code in faststream/_internal/broker/broker.py
async def connect(self) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        self._connection = await self._connect()
        self._setup_logger()

    return self._connection

stop async #

stop(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/rabbit/broker/broker.py
async def stop(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await super().stop(exc_type, exc_val, exc_tb)

    if self._channel is not None:
        if not self._channel.is_closed:
            await self._channel.close()

        self._channel = None

    if self._connection is not None:
        await self._connection.close()
        self._connection = None

    self.config.disconnect()

close async #

close(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/rabbit/broker/broker.py
@deprecated(
    "Deprecated in **FastStream 0.5.44**. "
    "Please, use `stop` method instead. "
    "Method `close` will be removed in **FastStream 0.7.0**.",
    category=DeprecationWarning,
    stacklevel=1,
)
async def close(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await self.stop(exc_type, exc_val, exc_tb)

start async #

start()

Connect broker to RabbitMQ and startup all subscribers.

Source code in faststream/rabbit/broker/broker.py
async def start(self) -> None:
    """Connect broker to RabbitMQ and startup all subscribers."""
    await self.connect()
    await self.declare_queue(RABBIT_REPLY)
    await super().start()

publish async #

publish(
    message=None,
    queue="",
    exchange=None,
    *,
    routing_key="",
    mandatory=True,
    immediate=False,
    timeout=None,
    persist=False,
    reply_to=None,
    correlation_id=None,
    headers=None,
    content_type=None,
    content_encoding=None,
    expiration=None,
    message_id=None,
    timestamp=None,
    message_type=None,
    user_id=None,
    priority=None,
)

Publish message directly.

This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks applications or to publish messages from time to time.

Please, use @broker.publisher(...) or broker.publisher(...).publish(...) instead in a regular way.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: AioPikaSendableMessage DEFAULT: None

queue

Message routing key to publish with.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

Target exchange to publish message to.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

Message routing key to publish with. Overrides queue option if presented.

TYPE: str DEFAULT: ''

mandatory

Client waits for confirmation that the message is placed to some queue. RabbitMQ returns message to client if there is no suitable queue.

TYPE: bool DEFAULT: True

immediate

Client expects that there is consumer ready to take the message to work. RabbitMQ returns message to client if there is no suitable consumer.

TYPE: bool DEFAULT: False

timeout

Send confirmation time from RabbitMQ.

TYPE: TimeoutType DEFAULT: None

persist

Restore the message on RabbitMQ reboot.

TYPE: bool DEFAULT: False

reply_to

Reply message routing key to send with (always sending to default exchange).

TYPE: str | None DEFAULT: None

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

headers

Message headers to store metainformation.

TYPE: Optional[HeadersType] DEFAULT: None

content_type

Message content-type header. Used by application, not core RabbitMQ. Will be set automatically if not specified.

TYPE: str | None DEFAULT: None

content_encoding

Message body content encoding, e.g. gzip.

TYPE: str | None DEFAULT: None

expiration

Message expiration (lifetime) in seconds (or datetime or timedelta).

TYPE: Optional[DateType] DEFAULT: None

message_id

Arbitrary message id. Generated automatically if not present.

TYPE: str | None DEFAULT: None

timestamp

Message publish timestamp. Generated automatically if not presented.

TYPE: Optional[DateType] DEFAULT: None

message_type

Application-specific message type, e.g. orders.created.

TYPE: str | None DEFAULT: None

user_id

Publisher connection User ID, validated if set.

TYPE: str | None DEFAULT: None

priority

The message priority (0 by default).

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
Optional[ConfirmationFrameType]

An optional aiormq.abc.ConfirmationFrameType representing the confirmation frame if RabbitMQ is configured to send confirmations.

Source code in faststream/rabbit/broker/broker.py
@override
async def publish(
    self,
    message: "AioPikaSendableMessage" = None,
    queue: Union["RabbitQueue", str] = "",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    # publish options
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    persist: bool = False,
    reply_to: str | None = None,
    correlation_id: str | None = None,
    # message options
    headers: Optional["HeadersType"] = None,
    content_type: str | None = None,
    content_encoding: str | None = None,
    expiration: Optional["DateType"] = None,
    message_id: str | None = None,
    timestamp: Optional["DateType"] = None,
    message_type: str | None = None,
    user_id: str | None = None,
    priority: int | None = None,
) -> Optional["aiormq.abc.ConfirmationFrameType"]:
    """Publish message directly.

    This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
    applications or to publish messages from time to time.

    Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.

    Args:
        message:
            Message body to send.
        queue:
            Message routing key to publish with.
        exchange:
            Target exchange to publish message to.
        routing_key:
            Message routing key to publish with. Overrides `queue` option if presented.
        mandatory:
            Client waits for confirmation that the message is placed to some queue. RabbitMQ returns message to client if there is no suitable queue.
        immediate:
            Client expects that there is consumer ready to take the message to work. RabbitMQ returns message to client if there is no suitable consumer.
        timeout:
            Send confirmation time from RabbitMQ.
        persist:
            Restore the message on RabbitMQ reboot.
        reply_to:
            Reply message routing key to send with (always sending to default exchange).
        correlation_id:
            Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages.
        headers:
            Message headers to store metainformation.
        content_type:
            Message **content-type** header. Used by application, not core RabbitMQ. Will be set automatically if not specified.
        content_encoding:
            Message body content encoding, e.g. **gzip**.
        expiration:
            Message expiration (lifetime) in seconds (or datetime or timedelta).
        message_id:
            Arbitrary message id. Generated automatically if not present.
        timestamp:
            Message publish timestamp. Generated automatically if not presented.
        message_type:
            Application-specific message type, e.g. **orders.created**.
        user_id:
            Publisher connection User ID, validated if set.
        priority:
            The message priority (0 by default).

    Returns:
        An optional `aiormq.abc.ConfirmationFrameType` representing the confirmation frame if RabbitMQ is configured to send confirmations.
    """
    cmd = RabbitPublishCommand(
        message,
        routing_key=routing_key or RabbitQueue.validate(queue).routing(),
        exchange=RabbitExchange.validate(exchange),
        correlation_id=correlation_id or gen_cor_id(),
        app_id=self.config.app_id,
        mandatory=mandatory,
        immediate=immediate,
        persist=persist,
        reply_to=reply_to,
        headers=headers,
        content_type=content_type,
        content_encoding=content_encoding,
        expiration=expiration,
        message_id=message_id,
        message_type=message_type,
        timestamp=timestamp,
        user_id=user_id,
        timeout=timeout,
        priority=priority,
        _publish_type=PublishType.PUBLISH,
    )

    result: aiormq.abc.ConfirmationFrameType | None = await super()._basic_publish(
        cmd,
        producer=self._producer,
    )
    return result

request async #

request(
    message=None,
    queue="",
    exchange=None,
    *,
    routing_key="",
    mandatory=True,
    immediate=False,
    timeout=None,
    persist=False,
    correlation_id=None,
    headers=None,
    content_type=None,
    content_encoding=None,
    expiration=None,
    message_id=None,
    timestamp=None,
    message_type=None,
    user_id=None,
    priority=None,
)

Make a synchronous request to RabbitMQ.

This method uses Direct Reply-To pattern to send a message and wait for a reply. It is a blocking call and will wait for a reply until timeout.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: AioPikaSendableMessage DEFAULT: None

queue

Message routing key to publish with.

TYPE: Union[RabbitQueue, str] DEFAULT: ''

exchange

Target exchange to publish message to.

TYPE: Union[RabbitExchange, str, None] DEFAULT: None

routing_key

Message routing key to publish with. Overrides queue option if presented.

TYPE: str DEFAULT: ''

mandatory

Client waits for confirmation that the message is placed to some queue.

TYPE: bool DEFAULT: True

immediate

Client expects that there is a consumer ready to take the message to work.

TYPE: bool DEFAULT: False

timeout

Send confirmation time from RabbitMQ.

TYPE: TimeoutType DEFAULT: None

persist

Restore the message on RabbitMQ reboot.

TYPE: bool DEFAULT: False

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

headers

Message headers to store metainformation.

TYPE: Optional[HeadersType] DEFAULT: None

content_type

Message content-type header. Used by application, not core RabbitMQ.

TYPE: str | None DEFAULT: None

content_encoding

Message body content encoding, e.g. gzip.

TYPE: str | None DEFAULT: None

expiration

Message expiration (lifetime) in seconds (or datetime or timedelta).

TYPE: Optional[DateType] DEFAULT: None

message_id

Arbitrary message id. Generated automatically if not present.

TYPE: str | None DEFAULT: None

timestamp

Message publish timestamp. Generated automatically if not present.

TYPE: Optional[DateType] DEFAULT: None

message_type

Application-specific message type, e.g. orders.created.

TYPE: str | None DEFAULT: None

user_id

Publisher connection User ID, validated if set.

TYPE: str | None DEFAULT: None

priority

The message priority (0 by default).

TYPE: int | None DEFAULT: None

Source code in faststream/rabbit/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: "AioPikaSendableMessage" = None,
    queue: Union["RabbitQueue", str] = "",
    exchange: Union["RabbitExchange", str, None] = None,
    *,
    routing_key: str = "",
    mandatory: bool = True,
    immediate: bool = False,
    timeout: "TimeoutType" = None,
    persist: bool = False,
    # message args
    correlation_id: str | None = None,
    headers: Optional["HeadersType"] = None,
    content_type: str | None = None,
    content_encoding: str | None = None,
    expiration: Optional["DateType"] = None,
    message_id: str | None = None,
    timestamp: Optional["DateType"] = None,
    message_type: str | None = None,
    user_id: str | None = None,
    priority: int | None = None,
) -> "RabbitMessage":
    """Make a synchronous request to RabbitMQ.

    This method uses Direct Reply-To pattern to send a message and wait for a reply.
    It is a blocking call and will wait for a reply until timeout.

    Args:
        message: Message body to send.
        queue: Message routing key to publish with.
        exchange: Target exchange to publish message to.
        routing_key: Message routing key to publish with. Overrides `queue` option if presented.
        mandatory: Client waits for confirmation that the message is placed to some queue.
        RabbitMQ returns message to client if there is no suitable queue.
        immediate: Client expects that there is a consumer ready to take the message to work.
        RabbitMQ returns message to client if there is no suitable consumer.
        timeout: Send confirmation time from RabbitMQ.
        persist: Restore the message on RabbitMQ reboot.
        correlation_id: Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages.
        headers: Message headers to store metainformation.
        content_type: Message **content-type** header. Used by application, not core RabbitMQ.
        Will be set automatically if not specified.
        content_encoding: Message body content encoding, e.g. **gzip**.
        expiration: Message expiration (lifetime) in seconds (or datetime or timedelta).
        message_id: Arbitrary message id. Generated automatically if not present.
        timestamp: Message publish timestamp. Generated automatically if not present.
        message_type: Application-specific message type, e.g. **orders.created**.
        user_id: Publisher connection User ID, validated if set.
        priority: The message priority (0 by default).
    """
    cmd = RabbitPublishCommand(
        message,
        routing_key=routing_key or RabbitQueue.validate(queue).routing(),
        exchange=RabbitExchange.validate(exchange),
        correlation_id=correlation_id or gen_cor_id(),
        app_id=self.config.app_id,
        mandatory=mandatory,
        immediate=immediate,
        persist=persist,
        headers=headers,
        content_type=content_type,
        content_encoding=content_encoding,
        expiration=expiration,
        message_id=message_id,
        message_type=message_type,
        timestamp=timestamp,
        user_id=user_id,
        timeout=timeout,
        priority=priority,
        _publish_type=PublishType.REQUEST,
    )

    msg: RabbitMessage = await super()._basic_request(cmd, producer=self._producer)
    return msg

declare_queue async #

declare_queue(queue)

Declares queue object in RabbitMQ.

Source code in faststream/rabbit/broker/broker.py
async def declare_queue(self, queue: "RabbitQueue") -> "RobustQueue":
    """Declares queue object in **RabbitMQ**."""
    declarer: RabbitDeclarer = self.config.declarer
    return await declarer.declare_queue(queue)

declare_exchange async #

declare_exchange(exchange)

Declares exchange object in RabbitMQ.

Source code in faststream/rabbit/broker/broker.py
async def declare_exchange(self, exchange: "RabbitExchange") -> "RobustExchange":
    """Declares exchange object in **RabbitMQ**."""
    declarer: RabbitDeclarer = self.config.declarer
    return await declarer.declare_exchange(exchange)

ping async #

ping(timeout)
Source code in faststream/rabbit/broker/broker.py
@override
async def ping(self, timeout: float | None) -> bool:
    sleep_time = (timeout or 10) / 10

    with anyio.move_on_after(timeout) as cancel_scope:
        if self._connection is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            if self._connection.connected.is_set():
                return True

            await anyio.sleep(sleep_time)

    return False