ZmqttBaseProducer(
default_parser: Any,
parser: Optional[CustomCallable],
decoder: Optional[CustomCallable],
)
Bases: ProducerProto[MQTTPublishCommand]
Source code in faststream/mqtt/publisher/producer.py
| def __init__(
self,
default_parser: Any,
parser: Optional["CustomCallable"],
decoder: Optional["CustomCallable"],
) -> None:
self.serializer: SerializerProto | None = None
self._client: zmqtt.MQTTClient | None = None
self._parser = ParserComposition(parser, default_parser.parse_message)
self._decoder = ParserComposition(decoder, default_parser.decode_message)
|
serializer instance-attribute
serializer: SerializerProto | None = None
connect
connect(
client: MQTTClient,
serializer: Optional[SerializerProto],
) -> None
Source code in faststream/mqtt/publisher/producer.py
| def connect(
self,
client: "zmqtt.MQTTClient",
serializer: Optional["SerializerProto"],
) -> None:
self._client = client
self.serializer = serializer
|
disconnect
Source code in faststream/mqtt/publisher/producer.py
| def disconnect(self) -> None:
self._client = None
self.serializer = None
|
publish async
Source code in faststream/mqtt/publisher/producer.py
| @override
async def publish(self, cmd: "MQTTPublishCommand") -> None:
raise NotImplementedError
|
request async
Source code in faststream/mqtt/publisher/producer.py
| @override
async def request(self, cmd: "MQTTPublishCommand") -> Any:
raise NotImplementedError
|
publish_batch async
Source code in faststream/mqtt/publisher/producer.py
| @override
async def publish_batch(self, cmd: "MQTTPublishCommand") -> None:
msg = "MQTT does not support batch publishing."
raise FeatureNotSupportedException(msg)
|