Skip to content

NatsJSFastProducer

faststream.nats.publisher.producer.NatsJSFastProducer #

NatsJSFastProducer(*, parser, decoder)

Bases: NatsFastProducer

A class to represent a NATS JetStream producer.

Source code in faststream/nats/publisher/producer.py
def __init__(
    self,
    *,
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    self.serializer: SerializerProto | None = None

    default = NatsParser(pattern="", is_ack_disabled=True)
    self._parser = ParserComposition(parser, default.parse_message)
    self._decoder = ParserComposition(decoder, default.decode_message)

    self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState()

serializer instance-attribute #

serializer = None

connect #

connect(connection, serializer)
Source code in faststream/nats/publisher/producer.py
def connect(
    self,
    connection: "JetStreamContext",
    serializer: Optional["SerializerProto"],
) -> None:
    self.serializer = serializer
    self.__state = ConnectedState(connection)

disconnect #

disconnect()
Source code in faststream/nats/publisher/producer.py
def disconnect(self) -> None:
    self.__state = EmptyConnectionState()

publish async #

publish(cmd)
Source code in faststream/nats/publisher/producer.py
@override
async def publish(self, cmd: "NatsPublishCommand") -> "PubAck":
    payload, content_type = encode_message(cmd.body, self.serializer)

    headers_to_send = {
        "content-type": content_type or "",
        **cmd.headers_to_publish(js=True),
    }

    return await self.__state.connection.publish(
        subject=cmd.destination,
        payload=payload,
        headers=headers_to_send,
        stream=cmd.stream,
        timeout=cmd.timeout,
    )

request async #

request(cmd)
Source code in faststream/nats/publisher/producer.py
@override
async def request(self, cmd: "NatsPublishCommand") -> "Msg":
    payload, content_type = encode_message(cmd.body, self.serializer)

    reply_to = self.__state.connection._nc.new_inbox()
    future: asyncio.Future[Msg] = asyncio.Future()
    sub = await self.__state.connection._nc.subscribe(
        reply_to,
        future=future,
        max_msgs=1,
    )
    await sub.unsubscribe(limit=1)

    headers_to_send = {
        "content-type": content_type or "",
        "reply_to": reply_to,
        **cmd.headers_to_publish(js=False),
    }

    with anyio.fail_after(cmd.timeout):
        await self.__state.connection.publish(
            subject=cmd.destination,
            payload=payload,
            headers=headers_to_send,
            stream=cmd.stream,
            timeout=cmd.timeout,
        )

        msg = await future

        if (  # pragma: no cover
            msg.headers
            and (
                msg.headers.get(nats.js.api.Header.STATUS)
                == nats.aio.client.NO_RESPONDERS_STATUS
            )
        ):
            raise nats.errors.NoRespondersError

        return msg

publish_batch async #

publish_batch(cmd)
Source code in faststream/nats/publisher/producer.py
async def publish_batch(self, cmd: "NatsPublishCommand") -> None:
    msg = "NATS doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)