Skip to content

ZmqttFakeProducer

faststream.mqtt.publisher.producer.ZmqttFakeProducer #

ZmqttFakeProducer()

Bases: ZmqttBaseProducer

Source code in faststream/mqtt/publisher/producer.py
def __init__(self) -> None: ...

serializer instance-attribute #

serializer: SerializerProto | None = None

connect #

connect(
    client: MQTTClient,
    serializer: Optional[SerializerProto],
) -> None
Source code in faststream/mqtt/publisher/producer.py
def connect(
    self,
    client: "zmqtt.MQTTClient",
    serializer: Optional["SerializerProto"],
) -> None:
    raise NotImplementedError

disconnect #

disconnect() -> None
Source code in faststream/mqtt/publisher/producer.py
def disconnect(self) -> None:
    raise NotImplementedError

publish async #

publish(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/publisher/producer.py
@override
async def publish(self, cmd: "MQTTPublishCommand") -> None:
    raise NotImplementedError

request async #

request(cmd: MQTTPublishCommand) -> Any
Source code in faststream/mqtt/publisher/producer.py
@override
async def request(self, cmd: "MQTTPublishCommand") -> Any:
    raise NotImplementedError

publish_batch async #

publish_batch(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/publisher/producer.py
@override
async def publish_batch(self, cmd: "MQTTPublishCommand") -> None:
    msg = "MQTT does not support batch publishing."
    raise FeatureNotSupportedException(msg)