Skip to content

NatsFastProducer

faststream.nats.publisher.producer.NatsFastProducer #

Bases: ProducerProto[NatsPublishCommand]

connect #

connect(connection, serializer)
Source code in faststream/nats/publisher/producer.py
def connect(
    self,
    connection: Any,
    serializer: Optional["SerializerProto"],
) -> None: ...

disconnect #

disconnect()
Source code in faststream/nats/publisher/producer.py
def disconnect(self) -> None: ...

publish abstractmethod async #

publish(cmd)
Source code in faststream/nats/publisher/producer.py
@abstractmethod
async def publish(self, cmd: "NatsPublishCommand") -> Optional["PubAck"]: ...

request abstractmethod async #

request(cmd)
Source code in faststream/nats/publisher/producer.py
@abstractmethod
async def request(self, cmd: "NatsPublishCommand") -> "Msg": ...

publish_batch async #

publish_batch(cmd)
Source code in faststream/nats/publisher/producer.py
async def publish_batch(self, cmd: "NatsPublishCommand") -> None:
    msg = "NATS doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)