Bases: NatsFastProducer
A class to represent a NATS JetStream producer.
Source code in faststream/nats/publisher/producer.py
| def __init__(
self,
*,
parser: Optional["CustomCallable"],
decoder: Optional["CustomCallable"],
) -> None:
self.serializer: SerializerProto | None = None
default = NatsParser(pattern="", is_ack_disabled=True)
self._parser = ParserComposition(parser, default.parse_message)
self._decoder = ParserComposition(decoder, default.decode_message)
self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState()
|
serializer
instance-attribute
connect
connect(connection, serializer)
Source code in faststream/nats/publisher/producer.py
| def connect(
self,
connection: "JetStreamContext",
serializer: Optional["SerializerProto"],
) -> None:
self.serializer = serializer
self.__state = ConnectedState(connection)
|
disconnect
Source code in faststream/nats/publisher/producer.py
| def disconnect(self) -> None:
self.__state = EmptyConnectionState()
|
publish
async
Source code in faststream/nats/publisher/producer.py
| @override
async def publish(self, cmd: "NatsPublishCommand") -> "PubAck":
payload, content_type = encode_message(cmd.body, self.serializer)
headers_to_send = {
"content-type": content_type or "",
**cmd.headers_to_publish(js=True),
}
return await self.__state.connection.publish(
subject=cmd.destination,
payload=payload,
headers=headers_to_send,
stream=cmd.stream,
timeout=cmd.timeout,
)
|
request
async
Source code in faststream/nats/publisher/producer.py
| @override
async def request(self, cmd: "NatsPublishCommand") -> "Msg":
payload, content_type = encode_message(cmd.body, self.serializer)
reply_to = self.__state.connection._nc.new_inbox()
future: asyncio.Future[Msg] = asyncio.Future()
sub = await self.__state.connection._nc.subscribe(
reply_to,
future=future,
max_msgs=1,
)
await sub.unsubscribe(limit=1)
headers_to_send = {
"content-type": content_type or "",
"reply_to": reply_to,
**cmd.headers_to_publish(js=False),
}
with anyio.fail_after(cmd.timeout):
await self.__state.connection.publish(
subject=cmd.destination,
payload=payload,
headers=headers_to_send,
stream=cmd.stream,
timeout=cmd.timeout,
)
msg = await future
if ( # pragma: no cover
msg.headers
and (
msg.headers.get(nats.js.api.Header.STATUS)
== nats.aio.client.NO_RESPONDERS_STATUS
)
):
raise nats.errors.NoRespondersError
return msg
|
publish_batch
async
Source code in faststream/nats/publisher/producer.py
| async def publish_batch(self, cmd: "NatsPublishCommand") -> None:
msg = "NATS doesn't support publishing in batches."
raise FeatureNotSupportedException(msg)
|