Bases: BaseRedisFastProducer
Producer that routes channel operations through the sync cluster.
Source code in faststream/redis/publisher/producer.py
| def __init__(
self,
connection: "ConnectionState[RedisCluster[bytes]]",
cluster_state: RedisClusterConnectionState,
**kwargs: Any,
) -> None:
super().__init__(connection=connection, **kwargs)
self._cluster_state = cluster_state
|
codec instance-attribute
codec = codec or DefaultCodec()
serializer instance-attribute
publish async
Source code in faststream/redis/publisher/producer.py
| @override
async def publish(self, cmd: "RedisPublishCommand") -> int | bytes:
msg = await cmd.message_format.encode(
message=cmd.body,
reply_to=cmd.reply_to,
headers=cmd.headers,
correlation_id=cmd.correlation_id or "",
serializer=self.serializer,
codec=self.codec,
)
if cmd.destination_type is DestinationType.Channel:
return await self._cluster_state.sync_publish(cmd.destination, msg)
if cmd.destination_type is DestinationType.List:
return cast("int", await self._connection.client.rpush(cmd.destination, msg))
if cmd.destination_type is DestinationType.Stream:
return cast(
"bytes",
await self._connection.client.xadd(
name=cmd.destination,
fields={DATA_KEY: msg},
maxlen=cmd.maxlen,
),
)
raise UnreachablePathError
|
request async
Source code in faststream/redis/publisher/producer.py
| @override
async def request(self, cmd: "RedisPublishCommand") -> "Any":
nuid = NUID()
reply_to = str(nuid.next(), "utf-8")
psub = self._cluster_state.pubsub()
try:
await psub.subscribe(reply_to)
msg = await cmd.message_format.encode(
message=cmd.body,
reply_to=reply_to,
headers=cmd.headers,
correlation_id=cmd.correlation_id or "",
serializer=self.serializer,
codec=self.codec,
)
if cmd.destination_type is DestinationType.Channel:
await self._cluster_state.sync_publish(cmd.destination, msg)
elif cmd.destination_type is DestinationType.List:
await self._connection.client.rpush(cmd.destination, msg)
elif cmd.destination_type is DestinationType.Stream:
await self._connection.client.xadd(
name=cmd.destination,
fields={DATA_KEY: msg},
maxlen=cmd.maxlen,
)
else:
raise UnreachablePathError
with anyio.fail_after(cmd.timeout) as scope:
await psub.get_message(
ignore_subscribe_messages=True,
timeout=cmd.timeout or 0.0,
)
response_msg = await psub.get_message(
ignore_subscribe_messages=True,
timeout=cmd.timeout or 0.0,
)
if scope.cancel_called:
raise TimeoutError
return response_msg
finally:
with suppress(Exception):
await psub.unsubscribe()
await psub.aclose()
|
publish_batch async
Source code in faststream/redis/publisher/producer.py
| @override
async def publish_batch(self, cmd: "RedisPublishCommand") -> int:
batch = [
await cmd.message_format.encode(
message=msg,
correlation_id=cmd.correlation_id or "",
reply_to=cmd.reply_to,
headers=cmd.headers,
serializer=self.serializer,
codec=self.codec,
)
for msg in cmd.batch_bodies
]
connection = cmd.pipeline or self._connection.client
return cast("int", await connection.rpush(cmd.destination, *batch))
|
connect
connect(
serializer: Optional[SerializerProto] = None,
codec: Optional[CodecProto] = None,
) -> None
Source code in faststream/redis/publisher/producer.py
| def connect(
self,
serializer: Optional["SerializerProto"] = None,
codec: Optional["CodecProto"] = None,
) -> None:
self.serializer = serializer
if codec is not None:
self.codec = codec
|