Skip to content

TestMQTTBroker

faststream.mqtt.testing.TestMQTTBroker #

TestMQTTBroker(
    broker: Broker,
    with_real: bool = False,
    connect_only: bool | None = None,
)

Bases: TestBroker[MQTTBroker]

In-memory test double for MQTTBroker.

Routes published messages to matching subscribers without a real MQTT connection, using MQTT wildcard rules for topic matching. Messages are encoded in the same wire format as the configured broker version (V311 envelope or V5 PublishProperties).

Usage::

async with TestMQTTBroker(broker) as br:
    await br.publish("hello", "sensors/temp")
    handler.mock.assert_called_once_with("hello")
Source code in faststream/_internal/testing/broker.py
def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: bool | None = None,
) -> None:
    self.with_real = with_real
    self.broker = broker

    if connect_only is None:
        try:
            connect_only = is_contains_context_name(
                self.__class__.__name__,
                TestApp.__name__,
            )
        except Exception:  # pragma: no cover
            warnings.warn(
                (
                    "\nError `{e!r}` occurred at `{self.__class__.__name__}` AST parsing."
                    "\n`connect_only` is set to `False` by default."
                ),
                category=RuntimeWarning,
                stacklevel=1,
            )

            connect_only = False

    self.connect_only = connect_only
    self._fake_subscribers: list[SubscriberUsecase[Any]] = []

with_real instance-attribute #

with_real = with_real

broker instance-attribute #

broker = broker

connect_only instance-attribute #

connect_only = connect_only

create_publisher_fake_subscriber staticmethod #

create_publisher_fake_subscriber(
    broker: MQTTBroker, publisher: MQTTPublisher
) -> tuple[MQTTBaseSubscriber, bool]
Source code in faststream/mqtt/testing.py
@staticmethod
def create_publisher_fake_subscriber(
    broker: MQTTBroker,
    publisher: "MQTTPublisher",
) -> tuple["MQTTBaseSubscriber", bool]:
    sub: MQTTBaseSubscriber | None = None
    for handler in broker.subscribers:
        handler = cast("MQTTBaseSubscriber", handler)
        if mqtt_topic_matches(handler.topic, publisher.topic):
            sub = handler
            break

    if sub is None:
        is_real = False
        sub = broker.subscriber(publisher.topic, persistent=False)
        # Apply the correct version parser so fake subs match FakeProducer output.
        parser = _parser_for_version(_broker_version(broker))
        sub._parser = parser.parse_message
        sub._decoder = parser.decode_message
    else:
        is_real = True

    return sub, is_real