Skip to content

RedisFastProducer

faststream.redis.publisher.producer.RedisFastProducer #

RedisFastProducer(
    connection, parser, decoder, message_format, serializer
)

Bases: ProducerProto[RedisPublishCommand]

A class to represent a Redis producer.

Source code in faststream/redis/publisher/producer.py
def __init__(
    self,
    connection: "ConnectionState",
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
    message_format: type["MessageFormat"],
    serializer: Optional["SerializerProto"],
) -> None:
    self._connection = connection

    default = RedisPubSubParser(SimpleParserConfig(message_format))
    self._parser = ParserComposition(
        parser,
        default.parse_message,
    )
    self._decoder = ParserComposition(
        decoder,
        default.decode_message,
    )
    self.serializer = serializer

serializer instance-attribute #

serializer = serializer

publish async #

publish(cmd)
Source code in faststream/redis/publisher/producer.py
@override
async def publish(self, cmd: "RedisPublishCommand") -> int | bytes:
    msg = cmd.message_format.encode(
        message=cmd.body,
        reply_to=cmd.reply_to,
        headers=cmd.headers,
        correlation_id=cmd.correlation_id or "",
        serializer=self.serializer,
    )

    return await self.__publish(msg, cmd)

request async #

request(cmd)
Source code in faststream/redis/publisher/producer.py
@override
async def request(self, cmd: "RedisPublishCommand") -> "Any":
    nuid = NUID()
    reply_to = str(nuid.next(), "utf-8")
    psub = self._connection.client.pubsub()

    try:
        await psub.subscribe(reply_to)

        msg = cmd.message_format.encode(
            message=cmd.body,
            reply_to=reply_to,
            headers=cmd.headers,
            correlation_id=cmd.correlation_id or "",
            serializer=self.serializer,
        )

        await self.__publish(msg, cmd)

        with anyio.fail_after(cmd.timeout) as scope:
            # skip subscribe message
            await psub.get_message(
                ignore_subscribe_messages=True,
                timeout=cmd.timeout or 0.0,
            )

            # get real response
            response_msg = await psub.get_message(
                ignore_subscribe_messages=True,
                timeout=cmd.timeout or 0.0,
            )

        if scope.cancel_called:
            raise TimeoutError

        return response_msg

    finally:
        with suppress(Exception):
            await psub.unsubscribe()
            await psub.aclose()  # type: ignore[attr-defined]

publish_batch async #

publish_batch(cmd)
Source code in faststream/redis/publisher/producer.py
@override
async def publish_batch(self, cmd: "RedisPublishCommand") -> int:
    batch = [
        cmd.message_format.encode(
            message=msg,
            correlation_id=cmd.correlation_id or "",
            reply_to=cmd.reply_to,
            headers=cmd.headers,
            serializer=self.serializer,
        )
        for msg in cmd.batch_bodies
    ]

    connection = cmd.pipeline or self._connection.client
    return await connection.rpush(cmd.destination, *batch)

connect #

connect(serializer=None)
Source code in faststream/redis/publisher/producer.py
def connect(self, serializer: Optional["SerializerProto"] = None) -> None:
    self.serializer = serializer