Bases: ZmqttBaseProducer
In-memory producer that routes messages directly to matching subscribers.
Encodes messages in the wire format matching the broker's configured MQTT version: V311 envelope for 3.1.1, PublishProperties for 5.0.
Source code in faststream/mqtt/testing.py
| def __init__(self, broker: MQTTBroker) -> None:
self.broker = broker
self.serializer: SerializerProto | None = None
version = _broker_version(broker)
default = _parser_for_version(version)
self._parser = ParserComposition(broker._parser, default.parse_message)
self._decoder = ParserComposition(broker._decoder, default.decode_message)
|
broker instance-attribute
serializer instance-attribute
serializer: SerializerProto | None = None
publish async
Source code in faststream/mqtt/testing.py
| @override
async def publish(self, cmd: MQTTPublishCommand) -> None:
msg = build_message(
message=cmd.body,
topic=cmd.destination,
version=self._version,
qos=cmd.qos,
retain=cmd.retain,
reply_to=cmd.reply_to,
correlation_id=cmd.correlation_id,
headers=cmd.headers,
serializer=self.broker.config.fd_config._serializer,
)
# For shared subscriptions, only deliver to one subscriber per group
seen_shared_groups: set[str] = set()
for handler in cast("list[MQTTBaseSubscriber]", self.broker.subscribers):
handler_topic = handler.topic
if not mqtt_topic_matches(handler_topic, cmd.destination):
continue
if handler_topic.startswith("$share/"):
_, group, _ = handler_topic.split("/", 2)
if group in seen_shared_groups:
continue
seen_shared_groups.add(group)
await handler.process_message(msg)
|
request async
Source code in faststream/mqtt/testing.py
| @override
async def request(self, cmd: MQTTPublishCommand) -> "zmqtt.Message":
msg = build_message(
message=cmd.body,
topic=cmd.destination,
version=self._version,
qos=cmd.qos,
retain=cmd.retain,
correlation_id=cmd.correlation_id,
headers=cmd.headers,
serializer=self.broker.config.fd_config._serializer,
)
for handler in cast("list[MQTTBaseSubscriber]", self.broker.subscribers):
if not mqtt_topic_matches(handler.topic, cmd.destination):
continue
with anyio.fail_after(cmd.timeout or 30.0):
result = await handler.process_message(msg)
return build_message(
message=result.body,
topic=cmd.destination,
version=self._version,
correlation_id=result.correlation_id,
headers=result.headers,
serializer=self.broker.config.fd_config._serializer,
)
raise SubscriberNotFound
|
publish_batch async
Source code in faststream/mqtt/publisher/producer.py
| @override
async def publish_batch(self, cmd: "MQTTPublishCommand") -> None:
msg = "MQTT does not support batch publishing."
raise FeatureNotSupportedException(msg)
|
connect
connect(
client: MQTTClient,
serializer: Optional[SerializerProto],
) -> None
Source code in faststream/mqtt/publisher/producer.py
| def connect(
self,
client: "zmqtt.MQTTClient",
serializer: Optional["SerializerProto"],
) -> None:
self._client = client
self.serializer = serializer
|
disconnect
Source code in faststream/mqtt/publisher/producer.py
| def disconnect(self) -> None:
self._client = None
self.serializer = None
|