Skip to content

MQTTRegistrator

faststream.mqtt.broker.registrator.MQTTRegistrator #

MQTTRegistrator(
    *,
    config: BrokerConfigType,
    routers: Iterable[Registrator[MsgType]],
)

Bases: Registrator['zmqtt.Message', MQTTBrokerConfig]

Includable to MQTTBroker router.

Source code in faststream/_internal/broker/registrator.py
def __init__(
    self,
    *,
    config: BrokerConfigType,
    routers: Iterable["Registrator[MsgType]"],
) -> None:
    self._parser = config.broker_parser
    self._decoder = config.broker_decoder

    self.config: ConfigComposition[BrokerConfigType] = ConfigComposition(config)

    self._subscribers: WeakSet[SubscriberUsecase[MsgType]] = WeakSet()
    self._publishers: WeakSet[PublisherUsecase] = WeakSet()
    self.routers: list[Registrator[MsgType, Any]] = []

    self.__persistent_subscribers: list[SubscriberUsecase[MsgType]] = []
    self.__persistent_publishers: list[PublisherUsecase] = []

    self.__parent: Registrator[MsgType, Any] | None = None

    self.include_routers(*routers)

config instance-attribute #

config: ConfigComposition[BrokerConfigType] = (
    ConfigComposition(config)
)

routers instance-attribute #

routers: list[Registrator[MsgType, Any]] = []

subscribers property #

subscribers: list[SubscriberUsecase[MsgType]]

publishers property #

publishers: list[PublisherUsecase]

parent property writable #

parent: Registrator[MsgType, Any] | None

subscriber #

subscriber(
    topic: str,
    *,
    qos: QoS = AT_MOST_ONCE,
    shared: str | None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    max_workers: int = 1,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> MQTTDefaultSubscriber | MQTTConcurrentSubscriber

Subscribe a handler to an MQTT topic.

PARAMETER DESCRIPTION
topic

MQTT topic filter. Wildcards + (single level) and # (multi-level) are supported.

TYPE: str

qos

QoS level for the subscription (0, 1, or 2).

TYPE: QoS DEFAULT: AT_MOST_ONCE

shared

Optional shared subscription group name. When set, subscribes as $share/<group>/<topic>.

TYPE: str | None DEFAULT: None

ack_policy

Acknowledgement policy for message processing.

TYPE: AckPolicy DEFAULT: EMPTY

no_reply

Whether to disable FastStream RPC / reply-to responses.

TYPE: bool DEFAULT: False

dependencies

Dependencies list to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Custom parser to map raw messages to FastStream ones.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream message bytes to Python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

max_workers

Number of workers to process messages concurrently.

TYPE: int DEFAULT: 1

persistent

Whether to retain the subscriber across broker restarts.

TYPE: bool DEFAULT: True

title

AsyncAPI subscriber object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI subscriber object description.

TYPE: str | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema.

TYPE: bool DEFAULT: True

Source code in faststream/mqtt/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    topic: str,
    *,
    qos: QoS = QoS.AT_MOST_ONCE,
    shared: str | None = None,
    # broker arguments
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    max_workers: int = 1,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> "MQTTDefaultSubscriber | MQTTConcurrentSubscriber":
    """Subscribe a handler to an MQTT topic.

    Args:
        topic: MQTT topic filter. Wildcards ``+`` (single level) and
            ``#`` (multi-level) are supported.
        qos: QoS level for the subscription (0, 1, or 2).
        shared: Optional shared subscription group name. When set,
            subscribes as ``$share/<group>/<topic>``.
        ack_policy: Acknowledgement policy for message processing.
        no_reply: Whether to disable FastStream RPC / reply-to responses.
        dependencies: Dependencies list to apply to the subscriber.
        parser: Custom parser to map raw messages to FastStream ones.
        decoder: Function to decode FastStream message bytes to Python objects.
        max_workers: Number of workers to process messages concurrently.
        persistent: Whether to retain the subscriber across broker restarts.
        title: AsyncAPI subscriber object title.
        description: AsyncAPI subscriber object description.
        include_in_schema: Whether to include operation in AsyncAPI schema.
    """
    subscriber = create_subscriber(
        topic=topic,
        qos=qos,
        shared=shared,
        ack_policy=ack_policy,
        no_reply=no_reply,
        config=cast("MQTTBrokerConfig", self.config),
        max_workers=max_workers,
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    return subscriber.add_call(
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
    )

publisher #

publisher(
    topic: str,
    *,
    qos: QoS = AT_MOST_ONCE,
    retain: bool = False,
    headers: dict[str, str] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> MQTTPublisher

Create a persistent publisher object for the given MQTT topic.

PARAMETER DESCRIPTION
topic

MQTT topic to publish to. Must not contain wildcards.

TYPE: str

qos

QoS level for published messages (0, 1, or 2).

TYPE: QoS DEFAULT: AT_MOST_ONCE

retain

Whether the broker should retain the last message.

TYPE: bool DEFAULT: False

headers

Default headers to include in every published message.

TYPE: dict[str, str] | None DEFAULT: None

persistent

Whether to retain the publisher across broker restarts.

TYPE: bool DEFAULT: True

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type.

TYPE: Any | None DEFAULT: None

include_in_schema

Whether to include operation in AsyncAPI schema.

TYPE: bool DEFAULT: True

Source code in faststream/mqtt/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    topic: str,
    *,
    qos: QoS = QoS.AT_MOST_ONCE,
    retain: bool = False,
    headers: dict[str, str] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> "MQTTPublisher":
    """Create a persistent publisher object for the given MQTT topic.

    Args:
        topic: MQTT topic to publish to. Must not contain wildcards.
        qos: QoS level for published messages (0, 1, or 2).
        retain: Whether the broker should retain the last message.
        headers: Default headers to include in every published message.
        persistent: Whether to retain the publisher across broker restarts.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type.
        include_in_schema: Whether to include operation in AsyncAPI schema.
    """
    publisher = create_publisher(
        topic=topic,
        qos=qos,
        retain=retain,
        headers=headers,
        broker_config=cast("MQTTBrokerConfig", self.config),
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )
    super().publisher(publisher, persistent=persistent)
    return publisher

include_router #

include_router(
    router: MQTTRegistrator,
    *,
    prefix: str = "",
    dependencies: Iterable[Dependant] = (),
    middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
    include_in_schema: bool | None = None,
) -> None
Source code in faststream/mqtt/broker/registrator.py
@override
def include_router(
    self,
    router: "MQTTRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, MQTTRegistrator):
        msg = (
            f"Router must be an instance of MQTTRegistrator, "
            f"got {type(router).__name__} instead."
        )
        raise TypeError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

    for m in router.config.broker_middlewares:
        router.config._validate_middleware(m)

add_middleware #

add_middleware(
    middleware: BrokerMiddleware[Any, Any],
) -> None

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(
    middleware: BrokerMiddleware[Any, Any],
) -> None

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

include_routers #

include_routers(
    *routers: Registrator[MsgType, Any],
) -> None

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)