Skip to content

ZmqttBaseProducer

faststream.mqtt.publisher.producer.ZmqttBaseProducer #

ZmqttBaseProducer(
    default_parser: Any,
    parser: Optional[CustomCallable],
    decoder: Optional[CustomCallable],
)

Bases: ProducerProto[MQTTPublishCommand]

Source code in faststream/mqtt/publisher/producer.py
def __init__(
    self,
    default_parser: Any,
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    self.serializer: SerializerProto | None = None
    self._client: zmqtt.MQTTClient | None = None

    self._parser = ParserComposition(parser, default_parser.parse_message)
    self._decoder = ParserComposition(decoder, default_parser.decode_message)

serializer instance-attribute #

serializer: SerializerProto | None = None

connect #

connect(
    client: MQTTClient,
    serializer: Optional[SerializerProto],
) -> None
Source code in faststream/mqtt/publisher/producer.py
def connect(
    self,
    client: "zmqtt.MQTTClient",
    serializer: Optional["SerializerProto"],
) -> None:
    self._client = client
    self.serializer = serializer

disconnect #

disconnect() -> None
Source code in faststream/mqtt/publisher/producer.py
def disconnect(self) -> None:
    self._client = None
    self.serializer = None

publish async #

publish(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/publisher/producer.py
@override
async def publish(self, cmd: "MQTTPublishCommand") -> None:
    raise NotImplementedError

request async #

request(cmd: MQTTPublishCommand) -> Any
Source code in faststream/mqtt/publisher/producer.py
@override
async def request(self, cmd: "MQTTPublishCommand") -> Any:
    raise NotImplementedError

publish_batch async #

publish_batch(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/publisher/producer.py
@override
async def publish_batch(self, cmd: "MQTTPublishCommand") -> None:
    msg = "MQTT does not support batch publishing."
    raise FeatureNotSupportedException(msg)