Skip to content

FakeNatsFastProducer

faststream.nats.publisher.producer.FakeNatsFastProducer #

Bases: NatsFastProducer

codec instance-attribute #

codec: CodecProto

connect #

connect(
    connection: Any,
    serializer: Optional[SerializerProto],
    codec: Optional[CodecProto] = None,
) -> None
Source code in faststream/nats/publisher/producer.py
def connect(
    self,
    connection: Any,
    serializer: Optional["SerializerProto"],
    codec: Optional["CodecProto"] = None,
) -> None:
    raise NotImplementedError

disconnect #

disconnect() -> None
Source code in faststream/nats/publisher/producer.py
def disconnect(self) -> None:
    raise NotImplementedError

publish async #

publish(cmd: NatsPublishCommand) -> None
Source code in faststream/nats/publisher/producer.py
@override
async def publish(self, cmd: "NatsPublishCommand") -> None:
    raise NotImplementedError

request async #

request(cmd: NatsPublishCommand) -> Msg
Source code in faststream/nats/publisher/producer.py
@override
async def request(self, cmd: "NatsPublishCommand") -> "Msg":
    raise NotImplementedError

publish_batch async #

publish_batch(cmd: NatsPublishCommand) -> None
Source code in faststream/nats/publisher/producer.py
@override
async def publish_batch(self, cmd: "NatsPublishCommand") -> None:
    msg = "NATS doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)