Skip to content

MQTTRoute

faststream.mqtt.MQTTRoute #

MQTTRoute(
    call: Callable[..., SendableMessage]
    | Callable[..., Awaitable[SendableMessage]],
    topic: str,
    *,
    publishers: Iterable[MQTTPublisher] = (),
    qos: QoS = AT_MOST_ONCE,
    shared: str | None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    max_workers: int = 1,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
)

Bases: SubscriberRoute

Class to store delayed MQTTBroker subscriber registration.

Source code in faststream/mqtt/broker/router.py
def __init__(
    self,
    call: Callable[..., "SendableMessage"]
    | Callable[..., Awaitable["SendableMessage"]],
    topic: str,
    *,
    publishers: Iterable["MQTTPublisher"] = (),
    qos: QoS = QoS.AT_MOST_ONCE,
    shared: str | None = None,
    # broker arguments
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    max_workers: int = 1,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> None:
    super().__init__(
        call,
        topic,
        publishers=publishers,
        persistent=persistent,
        qos=qos,
        shared=shared,
        ack_policy=ack_policy,
        no_reply=no_reply,
        dependencies=dependencies,
        parser=parser,
        decoder=decoder,
        max_workers=max_workers,
        title=title,
        description=description,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args: Iterable[Any] = args

kwargs instance-attribute #

kwargs: dict[str, Any] = kwargs

call instance-attribute #

call: Callable[..., Any] = call

publishers instance-attribute #

publishers: Iterable[Any] = publishers