RedisFastProducer(
connection: ConnectionState[Redis[bytes]],
parser: Optional[CustomCallable],
decoder: Optional[CustomCallable],
message_format: type[MessageFormat],
serializer: Optional[SerializerProto],
)
Bases: BaseRedisFastProducer
Producer for a single-node Redis.
Source code in faststream/redis/publisher/producer.py
| def __init__(
self,
connection: "ConnectionState[Redis[bytes]]",
parser: Optional["CustomCallable"],
decoder: Optional["CustomCallable"],
message_format: type["MessageFormat"],
serializer: Optional["SerializerProto"],
) -> None:
super().__init__(
connection=connection,
parser=parser,
decoder=decoder,
message_format=message_format,
serializer=serializer,
)
|
codec instance-attribute
codec = codec or DefaultCodec()
serializer instance-attribute
publish async
Source code in faststream/redis/publisher/producer.py
| @override
async def publish(self, cmd: "RedisPublishCommand") -> int | bytes:
msg = await cmd.message_format.encode(
message=cmd.body,
reply_to=cmd.reply_to,
headers=cmd.headers,
correlation_id=cmd.correlation_id or "",
serializer=self.serializer,
codec=self.codec,
)
return await self.__publish(msg, cmd)
|
request async
Source code in faststream/redis/publisher/producer.py
| @override
async def request(self, cmd: "RedisPublishCommand") -> "Any":
nuid = NUID()
reply_to = str(nuid.next(), "utf-8")
psub = self._connection.client.pubsub()
try:
await psub.subscribe(reply_to)
msg = await cmd.message_format.encode(
message=cmd.body,
reply_to=reply_to,
headers=cmd.headers,
correlation_id=cmd.correlation_id or "",
serializer=self.serializer,
codec=self.codec,
)
await self.__publish(msg, cmd)
with anyio.fail_after(cmd.timeout) as scope:
# skip subscribe message
await psub.get_message(
ignore_subscribe_messages=True,
timeout=cmd.timeout or 0.0,
)
# get real response
response_msg = await psub.get_message(
ignore_subscribe_messages=True,
timeout=cmd.timeout or 0.0,
)
if scope.cancel_called:
raise TimeoutError
return response_msg
finally:
with suppress(Exception):
await psub.unsubscribe()
await psub.aclose() # type: ignore[attr-defined]
|
publish_batch async
Source code in faststream/redis/publisher/producer.py
| @override
async def publish_batch(self, cmd: "RedisPublishCommand") -> int:
batch = [
await cmd.message_format.encode(
message=msg,
correlation_id=cmd.correlation_id or "",
reply_to=cmd.reply_to,
headers=cmd.headers,
serializer=self.serializer,
codec=self.codec,
)
for msg in cmd.batch_bodies
]
connection = cmd.pipeline or self._connection.client
return cast("int", await connection.rpush(cmd.destination, *batch))
|
connect
connect(
serializer: Optional[SerializerProto] = None,
codec: Optional[CodecProto] = None,
) -> None
Source code in faststream/redis/publisher/producer.py
| def connect(
self,
serializer: Optional["SerializerProto"] = None,
codec: Optional["CodecProto"] = None,
) -> None:
self.serializer = serializer
if codec is not None:
self.codec = codec
|