Skip to content

NatsFastProducerImpl

faststream.nats.publisher.producer.NatsFastProducerImpl #

NatsFastProducerImpl(parser, decoder)

Bases: NatsFastProducer

A class to represent a NATS producer.

Source code in faststream/nats/publisher/producer.py
def __init__(
    self,
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    self.serializer: SerializerProto | None = None

    default = NatsParser(pattern="", is_ack_disabled=True)
    self._parser = ParserComposition(parser, default.parse_message)
    self._decoder = ParserComposition(decoder, default.decode_message)

    self.__state: ConnectionState[Client] = EmptyConnectionState()

serializer instance-attribute #

serializer = None

connect #

connect(connection, serializer)
Source code in faststream/nats/publisher/producer.py
def connect(
    self,
    connection: "Client",
    serializer: Optional["SerializerProto"],
) -> None:
    self.serializer = serializer
    self.__state = ConnectedState(connection)

disconnect #

disconnect()
Source code in faststream/nats/publisher/producer.py
def disconnect(self) -> None:
    self.__state = EmptyConnectionState()

publish async #

publish(cmd)
Source code in faststream/nats/publisher/producer.py
@override
async def publish(self, cmd: "NatsPublishCommand") -> None:
    payload, content_type = encode_message(cmd.body, self.serializer)

    headers_to_send = {
        "content-type": content_type or "",
        **cmd.headers_to_publish(),
    }

    return await self.__state.connection.publish(
        subject=cmd.destination,
        payload=payload,
        reply=cmd.reply_to,
        headers=headers_to_send,
    )

request async #

request(cmd)
Source code in faststream/nats/publisher/producer.py
@override
async def request(self, cmd: "NatsPublishCommand") -> "Msg":
    payload, content_type = encode_message(cmd.body, self.serializer)

    headers_to_send = {
        "content-type": content_type or "",
        **cmd.headers_to_publish(),
    }

    return await self.__state.connection.request(
        subject=cmd.destination,
        payload=payload,
        headers=headers_to_send,
        timeout=cmd.timeout,
    )

publish_batch async #

publish_batch(cmd)
Source code in faststream/nats/publisher/producer.py
async def publish_batch(self, cmd: "NatsPublishCommand") -> None:
    msg = "NATS doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)