Skip to content

MQTTPublisher

faststream.mqtt.broker.router.MQTTPublisher #

MQTTPublisher(
    topic: str,
    *,
    qos: QoS = AT_MOST_ONCE,
    retain: bool = False,
    headers: dict[str, str] | None = None,
    persistent: bool = True,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
)

Bases: ArgsContainer

Delayed MQTTPublisher registration object.

A copy of MQTTRegistrator.publisher(...) arguments for use in MQTTRoute.

Source code in faststream/mqtt/broker/router.py
def __init__(
    self,
    topic: str,
    *,
    qos: QoS = QoS.AT_MOST_ONCE,
    retain: bool = False,
    headers: dict[str, str] | None = None,
    persistent: bool = True,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> None:
    super().__init__(
        topic,
        qos=qos,
        retain=retain,
        headers=headers,
        persistent=persistent,
        title=title,
        description=description,
        schema=schema,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args: Iterable[Any] = args

kwargs instance-attribute #

kwargs: dict[str, Any] = kwargs