Skip to content

MQTTFakePublisher

faststream.mqtt.publisher.fake.MQTTFakePublisher #

MQTTFakePublisher(
    producer: ProducerProto[MQTTPublishCommand], topic: str
)

Bases: FakePublisher

Publisher used for RPC / reply-to responses in MQTT.

Source code in faststream/mqtt/publisher/fake.py
def __init__(
    self,
    producer: "ProducerProto[MQTTPublishCommand]",
    topic: str,
) -> None:
    super().__init__(producer=producer)
    self.topic = topic

topic instance-attribute #

topic = topic

patch_command #

patch_command(
    cmd: Union[PublishCommand, MQTTPublishCommand],
) -> MQTTPublishCommand
Source code in faststream/mqtt/publisher/fake.py
def patch_command(
    self,
    cmd: Union["PublishCommand", "MQTTPublishCommand"],
) -> "MQTTPublishCommand":
    cmd = super().patch_command(cmd)
    real_cmd = MQTTPublishCommand.from_cmd(cmd)
    real_cmd.destination = self.topic
    return real_cmd

publish async #

publish(
    message: SendableMessage,
    /,
    *,
    correlation_id: str | None = None,
) -> Any | None
Source code in faststream/_internal/endpoint/publisher/fake.py
async def publish(
    self,
    message: SendableMessage,
    /,
    *,
    correlation_id: str | None = None,
) -> Any | None:
    msg = (
        f"`{self.__class__.__name__}` can be used only to publish "
        "a response for `reply-to` or `RPC` messages."
    )
    raise NotImplementedError(msg)

request async #

request(
    message: SendableMessage,
    /,
    *,
    correlation_id: str | None = None,
) -> Any
Source code in faststream/_internal/endpoint/publisher/fake.py
async def request(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: str | None = None,
) -> Any:
    msg = (
        f"`{self.__class__.__name__}` can be used only to publish "
        "a response for `reply-to` or `RPC` messages."
    )
    raise NotImplementedError(msg)