Skip to content

MQTTPublisher

faststream.mqtt.publisher.usecase.MQTTPublisher #

MQTTPublisher(
    config: MQTTPublisherConfig,
    specification: PublisherSpecification[Any, Any],
)

Bases: PublisherUsecase

Publisher for MQTT topics.

Source code in faststream/mqtt/publisher/usecase.py
def __init__(
    self,
    config: "MQTTPublisherConfig",
    specification: "PublisherSpecification[Any, Any]",
) -> None:
    super().__init__(config, specification)

    self._topic = config.topic
    self.qos = config.qos
    self.retain = config.retain
    self.headers = config.headers or {}

qos instance-attribute #

qos = qos

retain instance-attribute #

retain = retain

headers instance-attribute #

headers = headers or {}

topic property #

topic: str

specification instance-attribute #

specification = specification

mock instance-attribute #

mock = MagicMock()

publish async #

publish(
    message: SendableMessage,
    topic: str = "",
    *,
    qos: QoS | None = None,
    retain: bool | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
) -> None
Source code in faststream/mqtt/publisher/usecase.py
@override
async def publish(
    self,
    message: "SendableMessage",
    topic: str = "",
    *,
    qos: QoS | None = None,
    retain: bool | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
) -> None:
    cmd = MQTTPublishCommand(
        message,
        topic=topic or self.topic,
        qos=qos if qos is not None else self.qos,
        retain=retain if retain is not None else self.retain,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        _publish_type=PublishType.PUBLISH,
    )

    await self._basic_publish(
        cmd,
        producer=self._outer_config.producer,
        _extra_middlewares=(),
    )

request async #

request(
    message: SendableMessage,
    topic: str = "",
    *,
    correlation_id: str | None = None,
    reply_to: str = "",
    timeout: float | None = 30.0,
) -> Any
Source code in faststream/mqtt/publisher/usecase.py
@override
async def request(
    self,
    message: "SendableMessage",
    topic: str = "",
    *,
    correlation_id: str | None = None,
    reply_to: str = "",
    timeout: float | None = 30.0,
) -> Any:
    cmd = MQTTPublishCommand(
        message,
        topic=topic or self.topic,
        qos=self.qos,
        retain=self.retain,
        headers=self.headers,
        correlation_id=correlation_id or gen_cor_id(),
        reply_to=reply_to,
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
    )
    return await self._basic_request(
        cmd,
        producer=self._outer_config.producer,
    )

start async #

start() -> None
Source code in faststream/_internal/endpoint/publisher/usecase.py
async def start(self) -> None:
    pass

set_test #

set_test(*, mock: MagicMock, with_fake: bool) -> None

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test() -> None

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema() -> dict[str, PublisherSpec]
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()