Skip to content

RedisClusterFastProducer

faststream.redis.publisher.producer.RedisClusterFastProducer #

RedisClusterFastProducer(
    connection: ConnectionState[RedisCluster[bytes]],
    cluster_state: RedisClusterConnectionState,
    **kwargs: Any,
)

Bases: BaseRedisFastProducer

Producer that routes channel operations through the sync cluster.

Source code in faststream/redis/publisher/producer.py
def __init__(
    self,
    connection: "ConnectionState[RedisCluster[bytes]]",
    cluster_state: RedisClusterConnectionState,
    **kwargs: Any,
) -> None:
    super().__init__(connection=connection, **kwargs)
    self._cluster_state = cluster_state

cluster_state property #

codec instance-attribute #

codec = codec or DefaultCodec()

serializer instance-attribute #

serializer = serializer

publish async #

publish(cmd: RedisPublishCommand) -> int | bytes
Source code in faststream/redis/publisher/producer.py
@override
async def publish(self, cmd: "RedisPublishCommand") -> int | bytes:
    msg = await cmd.message_format.encode(
        message=cmd.body,
        reply_to=cmd.reply_to,
        headers=cmd.headers,
        correlation_id=cmd.correlation_id or "",
        serializer=self.serializer,
        codec=self.codec,
    )

    if cmd.destination_type is DestinationType.Channel:
        return await self._cluster_state.sync_publish(cmd.destination, msg)

    if cmd.destination_type is DestinationType.List:
        return cast("int", await self._connection.client.rpush(cmd.destination, msg))
    if cmd.destination_type is DestinationType.Stream:
        return cast(
            "bytes",
            await self._connection.client.xadd(
                name=cmd.destination,
                fields={DATA_KEY: msg},
                maxlen=cmd.maxlen,
            ),
        )
    raise UnreachablePathError

request async #

request(cmd: RedisPublishCommand) -> Any
Source code in faststream/redis/publisher/producer.py
@override
async def request(self, cmd: "RedisPublishCommand") -> "Any":
    nuid = NUID()
    reply_to = str(nuid.next(), "utf-8")
    psub = self._cluster_state.pubsub()

    try:
        await psub.subscribe(reply_to)

        msg = await cmd.message_format.encode(
            message=cmd.body,
            reply_to=reply_to,
            headers=cmd.headers,
            correlation_id=cmd.correlation_id or "",
            serializer=self.serializer,
            codec=self.codec,
        )

        if cmd.destination_type is DestinationType.Channel:
            await self._cluster_state.sync_publish(cmd.destination, msg)
        elif cmd.destination_type is DestinationType.List:
            await self._connection.client.rpush(cmd.destination, msg)
        elif cmd.destination_type is DestinationType.Stream:
            await self._connection.client.xadd(
                name=cmd.destination,
                fields={DATA_KEY: msg},
                maxlen=cmd.maxlen,
            )
        else:
            raise UnreachablePathError

        with anyio.fail_after(cmd.timeout) as scope:
            await psub.get_message(
                ignore_subscribe_messages=True,
                timeout=cmd.timeout or 0.0,
            )

            response_msg = await psub.get_message(
                ignore_subscribe_messages=True,
                timeout=cmd.timeout or 0.0,
            )

        if scope.cancel_called:
            raise TimeoutError

        return response_msg

    finally:
        with suppress(Exception):
            await psub.unsubscribe()
            await psub.aclose()

publish_batch async #

publish_batch(cmd: RedisPublishCommand) -> int
Source code in faststream/redis/publisher/producer.py
@override
async def publish_batch(self, cmd: "RedisPublishCommand") -> int:
    batch = [
        await cmd.message_format.encode(
            message=msg,
            correlation_id=cmd.correlation_id or "",
            reply_to=cmd.reply_to,
            headers=cmd.headers,
            serializer=self.serializer,
            codec=self.codec,
        )
        for msg in cmd.batch_bodies
    ]

    connection = cmd.pipeline or self._connection.client
    return cast("int", await connection.rpush(cmd.destination, *batch))

connect #

connect(
    serializer: Optional[SerializerProto] = None,
    codec: Optional[CodecProto] = None,
) -> None
Source code in faststream/redis/publisher/producer.py
def connect(
    self,
    serializer: Optional["SerializerProto"] = None,
    codec: Optional["CodecProto"] = None,
) -> None:
    self.serializer = serializer
    if codec is not None:
        self.codec = codec