Skip to content

FakeProducer

faststream.mqtt.testing.FakeProducer #

FakeProducer(broker: MQTTBroker)

Bases: ZmqttBaseProducer

In-memory producer that routes messages directly to matching subscribers.

Encodes messages in the wire format matching the broker's configured MQTT version: V311 envelope for 3.1.1, PublishProperties for 5.0.

Source code in faststream/mqtt/testing.py
def __init__(self, broker: MQTTBroker) -> None:
    self.broker = broker
    self.serializer: SerializerProto | None = None

    version = _broker_version(broker)
    default = _parser_for_version(version)
    self._parser = ParserComposition(broker._parser, default.parse_message)
    self._decoder = ParserComposition(broker._decoder, default.decode_message)

broker instance-attribute #

broker = broker

serializer instance-attribute #

serializer: SerializerProto | None = None

publish async #

publish(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/testing.py
@override
async def publish(self, cmd: MQTTPublishCommand) -> None:
    msg = build_message(
        message=cmd.body,
        topic=cmd.destination,
        version=self._version,
        qos=cmd.qos,
        retain=cmd.retain,
        reply_to=cmd.reply_to,
        correlation_id=cmd.correlation_id,
        headers=cmd.headers,
        serializer=self.broker.config.fd_config._serializer,
    )

    # For shared subscriptions, only deliver to one subscriber per group
    seen_shared_groups: set[str] = set()

    for handler in cast("list[MQTTBaseSubscriber]", self.broker.subscribers):
        handler_topic = handler.topic
        if not mqtt_topic_matches(handler_topic, cmd.destination):
            continue

        if handler_topic.startswith("$share/"):
            _, group, _ = handler_topic.split("/", 2)
            if group in seen_shared_groups:
                continue
            seen_shared_groups.add(group)

        await handler.process_message(msg)

request async #

request(cmd: MQTTPublishCommand) -> Message
Source code in faststream/mqtt/testing.py
@override
async def request(self, cmd: MQTTPublishCommand) -> "zmqtt.Message":
    msg = build_message(
        message=cmd.body,
        topic=cmd.destination,
        version=self._version,
        qos=cmd.qos,
        retain=cmd.retain,
        correlation_id=cmd.correlation_id,
        headers=cmd.headers,
        serializer=self.broker.config.fd_config._serializer,
    )

    for handler in cast("list[MQTTBaseSubscriber]", self.broker.subscribers):
        if not mqtt_topic_matches(handler.topic, cmd.destination):
            continue

        with anyio.fail_after(cmd.timeout or 30.0):
            result = await handler.process_message(msg)

        return build_message(
            message=result.body,
            topic=cmd.destination,
            version=self._version,
            correlation_id=result.correlation_id,
            headers=result.headers,
            serializer=self.broker.config.fd_config._serializer,
        )

    raise SubscriberNotFound

publish_batch async #

publish_batch(cmd: MQTTPublishCommand) -> None
Source code in faststream/mqtt/publisher/producer.py
@override
async def publish_batch(self, cmd: "MQTTPublishCommand") -> None:
    msg = "MQTT does not support batch publishing."
    raise FeatureNotSupportedException(msg)

connect #

connect(
    client: MQTTClient,
    serializer: Optional[SerializerProto],
) -> None
Source code in faststream/mqtt/publisher/producer.py
def connect(
    self,
    client: "zmqtt.MQTTClient",
    serializer: Optional["SerializerProto"],
) -> None:
    self._client = client
    self.serializer = serializer

disconnect #

disconnect() -> None
Source code in faststream/mqtt/publisher/producer.py
def disconnect(self) -> None:
    self._client = None
    self.serializer = None