Skip to content

FakeProducer

faststream.nats.testing.FakeProducer #

FakeProducer(broker)

Bases: NatsFastProducer

Source code in faststream/nats/testing.py
def __init__(self, broker: NatsBroker) -> None:
    self.broker = broker

    default = NatsParser(pattern="", is_ack_disabled=True)
    self._parser = ParserComposition(broker._parser, default.parse_message)
    self._decoder = ParserComposition(broker._decoder, default.decode_message)

broker instance-attribute #

broker = broker

publish async #

publish(cmd)
Source code in faststream/nats/testing.py
@override
async def publish(self, cmd: "NatsPublishCommand") -> None:
    incoming = build_message(
        message=cmd.body,
        subject=cmd.destination,
        headers=cmd.headers,
        correlation_id=cmd.correlation_id,
        reply_to=cmd.reply_to,
        serializer=self.broker.config.fd_config._serializer,
    )

    for handler in _find_handler(
        cast("list[LogicSubscriber[Any]]", self.broker.subscribers),
        cmd.destination,
        cmd.stream,
    ):
        msg: list[PatchedMessage] | PatchedMessage

        if (pull := getattr(handler, "pull_sub", None)) and pull.batch:
            msg = [incoming]
        else:
            msg = incoming

        await self._execute_handler(msg, cmd.destination, handler)

request async #

request(cmd)
Source code in faststream/nats/testing.py
@override
async def request(self, cmd: "NatsPublishCommand") -> "PatchedMessage":
    incoming = build_message(
        message=cmd.body,
        subject=cmd.destination,
        headers=cmd.headers,
        correlation_id=cmd.correlation_id,
        serializer=self.broker.config.fd_config._serializer,
    )

    for handler in _find_handler(
        cast("list[LogicSubscriber[Any]]", self.broker.subscribers),
        cmd.destination,
        cmd.stream,
    ):
        msg: list[PatchedMessage] | PatchedMessage

        if (pull := getattr(handler, "pull_sub", None)) and pull.batch:
            msg = [incoming]
        else:
            msg = incoming

        with anyio.fail_after(cmd.timeout):
            return await self._execute_handler(msg, cmd.destination, handler)

    raise SubscriberNotFound

publish_batch async #

publish_batch(cmd)
Source code in faststream/nats/publisher/producer.py
async def publish_batch(self, cmd: "NatsPublishCommand") -> None:
    msg = "NATS doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)

connect #

connect(connection, serializer)
Source code in faststream/nats/publisher/producer.py
def connect(
    self,
    connection: Any,
    serializer: Optional["SerializerProto"],
) -> None: ...

disconnect #

disconnect()
Source code in faststream/nats/publisher/producer.py
def disconnect(self) -> None: ...