Skip to content

MQTTBroker

faststream.mqtt.MQTTBroker #

MQTTBroker(
    host: str = "localhost:1883",
    port: int = EMPTY,
    *,
    client_id: str = "",
    keepalive: int = 60,
    clean_session: bool = True,
    version: Literal["3.1.1", "5.0"] = "5.0",
    reconnect: ReconnectConfig | None = None,
    session_expiry_interval: int = 0,
    graceful_timeout: float | None = 15.0,
    decoder: Optional[CustomCallable] = None,
    parser: Optional[CustomCallable] = None,
    dependencies: Iterable[Dependant] = (),
    middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
    routers: Iterable[MQTTRegistrator] = (),
    ack_policy: AckPolicy = EMPTY,
    specification_url: str | None = None,
    protocol_version: str | None = None,
    description: str | None = None,
    tags: Iterable[Tag | TagDict] = (),
    security: Optional[BaseSecurity] = None,
    logger: Optional[LoggerProto] = EMPTY,
    log_level: int = INFO,
    apply_types: bool = True,
    serializer: Optional[SerializerProto] = EMPTY,
    provider: Optional[Provider] = None,
    context: Optional[ContextRepo] = None,
)

Bases: MQTTRegistrator, BrokerUsecase[Message, MQTTClient]

MQTT broker for FastStream using the zmqtt client library.

Source code in faststream/mqtt/broker/broker.py
def __init__(
    self,
    host: str = "localhost:1883",
    port: int = EMPTY,
    *,
    client_id: str = "",
    keepalive: int = 60,
    clean_session: bool = True,
    version: Literal["3.1.1", "5.0"] = "5.0",
    reconnect: zmqtt.ReconnectConfig | None = None,
    session_expiry_interval: int = 0,
    graceful_timeout: float | None = 15.0,
    decoder: Optional["CustomCallable"] = None,
    parser: Optional["CustomCallable"] = None,
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    routers: Iterable[MQTTRegistrator] = (),
    ack_policy: AckPolicy = EMPTY,
    # AsyncAPI args
    specification_url: str | None = None,
    protocol_version: str | None = None,
    description: str | None = None,
    tags: Iterable["Tag | TagDict"] = (),
    security: Optional["BaseSecurity"] = None,
    # logging args
    logger: Optional["LoggerProto"] = EMPTY,
    log_level: int = logging.INFO,
    # FastDepends args
    apply_types: bool = True,
    serializer: Optional["SerializerProto"] = EMPTY,
    provider: Optional["Provider"] = None,
    context: Optional["ContextRepo"] = None,
) -> None:
    secure_kwargs = parse_security(security)

    producer: ZmqttBaseProducer
    if version == "5.0":
        producer = ZmqttProducerV5(parser=parser, decoder=decoder)
    else:
        producer = ZmqttProducerV311(parser=parser, decoder=decoder)

    if ":" in host:
        host, p = host.split(":", 2)
    else:
        p = "1883"
    if port is EMPTY:
        port = int(p)

    if specification_url is None:
        specification_url = f"mqtt://{host}:{port}"

    super().__init__(
        host=host,
        port=port,
        client_id=client_id,
        keepalive=keepalive,
        clean_session=clean_session,
        version=version,
        reconnect=reconnect,
        session_expiry_interval=session_expiry_interval,
        **secure_kwargs,
        # broker config
        routers=routers,
        config=MQTTBrokerConfig(
            version=version,
            producer=producer,
            broker_middlewares=middlewares,
            broker_parser=parser,
            broker_decoder=decoder,
            logger=make_mqtt_logger_state(
                logger=logger,
                log_level=log_level,
            ),
            fd_config=FastDependsConfig(
                use_fastdepends=apply_types,
                serializer=serializer,
                provider=provider or dependency_provider,
                context=context or ContextRepo(),
            ),
            broker_dependencies=dependencies,
            graceful_timeout=graceful_timeout,
            ack_policy=ack_policy,
            extra_context={
                "broker": self,
            },
        ),
        specification=BrokerSpec(
            description=description,
            url=[specification_url],
            protocol="mqtt",
            protocol_version=protocol_version or version,
            tags=tags,
            security=security,
        ),
    )

middlewares property #

middlewares: Sequence[BrokerMiddleware[MsgType]]

context property #

context: ContextRepo

config instance-attribute #

config: ConfigComposition[BrokerConfigType] = (
    ConfigComposition(config)
)

routers instance-attribute #

routers: list[Registrator[MsgType, Any]] = []

subscribers property #

subscribers: list[SubscriberUsecase[MsgType]]

publishers property #

publishers: list[PublisherUsecase]

parent property writable #

parent: Registrator[MsgType, Any] | None

specification instance-attribute #

specification = specification

running instance-attribute #

running = False

provider property #

provider: Provider

publish_batch async #

publish_batch(
    *messages: SendableMessage, queue: str
) -> Any
Source code in faststream/_internal/broker/pub_base.py
async def publish_batch(
    self,
    *messages: "SendableMessage",
    queue: str,
) -> Any:
    msg = f"{self.__class__.__name__} doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)

add_middleware #

add_middleware(
    middleware: BrokerMiddleware[Any, Any],
) -> None

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(
    middleware: BrokerMiddleware[Any, Any],
) -> None

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

subscriber #

subscriber(
    topic: str,
    *,
    qos: QoS = AT_MOST_ONCE,
    shared: str | None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    max_workers: int = 1,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> MQTTDefaultSubscriber | MQTTConcurrentSubscriber

Subscribe a handler to an MQTT topic.

PARAMETER DESCRIPTION
topic

MQTT topic filter. Wildcards + (single level) and # (multi-level) are supported.

TYPE: str

qos

QoS level for the subscription (0, 1, or 2).

TYPE: QoS DEFAULT: AT_MOST_ONCE

shared

Optional shared subscription group name. When set, subscribes as $share/<group>/<topic>.

TYPE: str | None DEFAULT: None

ack_policy

Acknowledgement policy for message processing.

TYPE: AckPolicy DEFAULT: EMPTY

no_reply

Whether to disable FastStream RPC / reply-to responses.

TYPE: bool DEFAULT: False

dependencies

Dependencies list to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Custom parser to map raw messages to FastStream ones.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream message bytes to Python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

max_workers

Number of workers to process messages concurrently.

TYPE: int DEFAULT: 1

persistent

Whether to retain the subscriber across broker restarts.

TYPE: bool DEFAULT: True

title

AsyncAPI subscriber object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI subscriber object description.

TYPE: str | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema.

TYPE: bool DEFAULT: True

Source code in faststream/mqtt/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    topic: str,
    *,
    qos: QoS = QoS.AT_MOST_ONCE,
    shared: str | None = None,
    # broker arguments
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    max_workers: int = 1,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> "MQTTDefaultSubscriber | MQTTConcurrentSubscriber":
    """Subscribe a handler to an MQTT topic.

    Args:
        topic: MQTT topic filter. Wildcards ``+`` (single level) and
            ``#`` (multi-level) are supported.
        qos: QoS level for the subscription (0, 1, or 2).
        shared: Optional shared subscription group name. When set,
            subscribes as ``$share/<group>/<topic>``.
        ack_policy: Acknowledgement policy for message processing.
        no_reply: Whether to disable FastStream RPC / reply-to responses.
        dependencies: Dependencies list to apply to the subscriber.
        parser: Custom parser to map raw messages to FastStream ones.
        decoder: Function to decode FastStream message bytes to Python objects.
        max_workers: Number of workers to process messages concurrently.
        persistent: Whether to retain the subscriber across broker restarts.
        title: AsyncAPI subscriber object title.
        description: AsyncAPI subscriber object description.
        include_in_schema: Whether to include operation in AsyncAPI schema.
    """
    subscriber = create_subscriber(
        topic=topic,
        qos=qos,
        shared=shared,
        ack_policy=ack_policy,
        no_reply=no_reply,
        config=cast("MQTTBrokerConfig", self.config),
        max_workers=max_workers,
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    return subscriber.add_call(
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
    )

publisher #

publisher(
    topic: str,
    *,
    qos: QoS = AT_MOST_ONCE,
    retain: bool = False,
    headers: dict[str, str] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> MQTTPublisher

Create a persistent publisher object for the given MQTT topic.

PARAMETER DESCRIPTION
topic

MQTT topic to publish to. Must not contain wildcards.

TYPE: str

qos

QoS level for published messages (0, 1, or 2).

TYPE: QoS DEFAULT: AT_MOST_ONCE

retain

Whether the broker should retain the last message.

TYPE: bool DEFAULT: False

headers

Default headers to include in every published message.

TYPE: dict[str, str] | None DEFAULT: None

persistent

Whether to retain the publisher across broker restarts.

TYPE: bool DEFAULT: True

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type.

TYPE: Any | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema.

TYPE: bool DEFAULT: True

Source code in faststream/mqtt/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    topic: str,
    *,
    qos: QoS = QoS.AT_MOST_ONCE,
    retain: bool = False,
    headers: dict[str, str] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> "MQTTPublisher":
    """Create a persistent publisher object for the given MQTT topic.

    Args:
        topic: MQTT topic to publish to. Must not contain wildcards.
        qos: QoS level for published messages (0, 1, or 2).
        retain: Whether the broker should retain the last message.
        headers: Default headers to include in every published message.
        persistent: Whether to retain the publisher across broker restarts.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type.
        include_in_schema: Whether to include operation in AsyncAPI schema.
    """
    publisher = create_publisher(
        topic=topic,
        qos=qos,
        retain=retain,
        headers=headers,
        broker_config=cast("MQTTBrokerConfig", self.config),
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )
    super().publisher(publisher, persistent=persistent)
    return publisher

include_router #

include_router(
    router: MQTTRegistrator,
    *,
    prefix: str = "",
    dependencies: Iterable[Dependant] = (),
    middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
    include_in_schema: bool | None = None,
) -> None
Source code in faststream/mqtt/broker/registrator.py
@override
def include_router(
    self,
    router: "MQTTRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, MQTTRegistrator):
        msg = (
            f"Router must be an instance of MQTTRegistrator, "
            f"got {type(router).__name__} instead."
        )
        raise TypeError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

    for m in router.config.broker_middlewares:
        router.config._validate_middleware(m)

include_routers #

include_routers(
    *routers: Registrator[MsgType, Any],
) -> None

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

connect async #

connect() -> ConnectionType

Connect to a remote server.

Source code in faststream/_internal/broker/broker.py
async def connect(self) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        self._connection = await self._connect()
        self._setup_logger()

    return self._connection

start async #

start() -> None
Source code in faststream/mqtt/broker/broker.py
@override
async def start(self) -> None:
    await self.connect()
    c = MQTTBaseSubscriber.build_log_context(None, "")
    self.config.logger.log("Connection established", logging.INFO, c)
    await super().start()

stop async #

stop(
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional[TracebackType] = None,
) -> None
Source code in faststream/mqtt/broker/broker.py
@override
async def stop(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await super().stop(exc_type, exc_val, exc_tb)

    if self._connection is not None:
        await self._connection.disconnect()
        self._connection = None

    self.config.disconnect()

ping async #

ping(timeout: float | None = None) -> bool
Source code in faststream/mqtt/broker/broker.py
@override
async def ping(self, timeout: float | None = None) -> bool:
    if self._connection is None:
        return False
    try:
        await self._connection.ping(timeout=timeout or 5.0)
    except Exception:
        return False
    else:
        return True

publish async #

publish(
    message: SendableMessage = None,
    topic: str = "",
    *,
    qos: QoS = AT_MOST_ONCE,
    retain: bool = False,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
) -> None

Publish a message to an MQTT topic.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: SendableMessage DEFAULT: None

topic

MQTT topic to publish to.

TYPE: str DEFAULT: ''

qos

QoS level (0, 1, or 2).

TYPE: QoS DEFAULT: AT_MOST_ONCE

retain

Whether the broker should retain the message.

TYPE: bool DEFAULT: False

headers

Message headers (MQTT 5.0 user properties).

TYPE: dict[str, str] | None DEFAULT: None

correlation_id

Correlation ID for message tracing.

TYPE: str | None DEFAULT: None

reply_to

Response topic (MQTT 5.0 response_topic property).

TYPE: str DEFAULT: ''

Source code in faststream/mqtt/broker/broker.py
@override
async def publish(
    self,
    message: "SendableMessage" = None,
    topic: str = "",
    *,
    qos: zmqtt.QoS = zmqtt.QoS.AT_MOST_ONCE,
    retain: bool = False,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
) -> None:
    """Publish a message to an MQTT topic.

    Args:
        message: Message body to send.
        topic: MQTT topic to publish to.
        qos: QoS level (0, 1, or 2).
        retain: Whether the broker should retain the message.
        headers: Message headers (MQTT 5.0 user properties).
        correlation_id: Correlation ID for message tracing.
        reply_to: Response topic (MQTT 5.0 response_topic property).
    """
    cmd = MQTTPublishCommand(
        message,
        topic=topic,
        qos=qos,
        retain=retain,
        headers=headers,
        correlation_id=correlation_id or gen_cor_id(),
        reply_to=reply_to,
        _publish_type=PublishType.PUBLISH,
    )

    await self._basic_publish(cmd, producer=self.config.producer)

request async #

request(
    message: SendableMessage = None,
    topic: str = "",
    /,
    timeout: float = 0.5,
    correlation_id: str | None = None,
    headers: dict[str, str] | None = None,
    qos: QoS = AT_MOST_ONCE,
    reply_to: str = "",
) -> MQTTMessage
Source code in faststream/mqtt/broker/broker.py
@override
async def request(
    self,
    message: "SendableMessage" = None,
    topic: str = "",
    /,
    timeout: float = 0.5,
    correlation_id: str | None = None,
    headers: dict[str, str] | None = None,
    qos: zmqtt.QoS = zmqtt.QoS.AT_MOST_ONCE,
    reply_to: str = "",
) -> "MQTTMessage":
    cmd = MQTTPublishCommand(
        message,
        topic=topic,
        correlation_id=correlation_id or gen_cor_id(),
        headers=headers,
        qos=qos,
        reply_to=reply_to,
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
    )
    msg: MQTTMessage = await self._basic_request(cmd, producer=self.config.producer)
    return msg