Skip to content

BaseRedisFastProducer

faststream.redis.publisher.producer.BaseRedisFastProducer #

BaseRedisFastProducer(
    connection: ConnectionState[Any],
    parser: Optional[CustomCallable],
    decoder: Optional[CustomCallable],
    message_format: type[MessageFormat],
    serializer: Optional[SerializerProto],
    codec: Optional[CodecProto] = None,
)

Bases: ProducerProto[RedisPublishCommand]

Shared logic for Redis producers.

Source code in faststream/redis/publisher/producer.py
def __init__(
    self,
    connection: "ConnectionState[Any]",
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
    message_format: type["MessageFormat"],
    serializer: Optional["SerializerProto"],
    codec: Optional["CodecProto"] = None,
) -> None:
    self._connection = connection

    default = RedisPubSubParser(SimpleParserConfig(message_format))
    self._parser = ParserComposition(
        parser,
        default.parse_message,
    )
    self._decoder = ParserComposition(
        decoder,
        default.decode_message,
    )
    self.serializer = serializer
    self.codec = codec or DefaultCodec()

serializer instance-attribute #

serializer = serializer

codec instance-attribute #

codec = codec or DefaultCodec()

publish_batch async #

publish_batch(cmd: RedisPublishCommand) -> int
Source code in faststream/redis/publisher/producer.py
@override
async def publish_batch(self, cmd: "RedisPublishCommand") -> int:
    batch = [
        await cmd.message_format.encode(
            message=msg,
            correlation_id=cmd.correlation_id or "",
            reply_to=cmd.reply_to,
            headers=cmd.headers,
            serializer=self.serializer,
            codec=self.codec,
        )
        for msg in cmd.batch_bodies
    ]

    connection = cmd.pipeline or self._connection.client
    return cast("int", await connection.rpush(cmd.destination, *batch))

connect #

connect(
    serializer: Optional[SerializerProto] = None,
    codec: Optional[CodecProto] = None,
) -> None
Source code in faststream/redis/publisher/producer.py
def connect(
    self,
    serializer: Optional["SerializerProto"] = None,
    codec: Optional["CodecProto"] = None,
) -> None:
    self.serializer = serializer
    if codec is not None:
        self.codec = codec

publish abstractmethod async #

publish(cmd: PublishCommandType_contra) -> Any

Publishes a message asynchronously.

Source code in faststream/_internal/producer.py
@abstractmethod
async def publish(self, cmd: "PublishCommandType_contra") -> Any:
    """Publishes a message asynchronously."""
    ...

request abstractmethod async #

request(cmd: PublishCommandType_contra) -> Any

Publishes a message synchronously.

Source code in faststream/_internal/producer.py
@abstractmethod
async def request(self, cmd: "PublishCommandType_contra") -> Any:
    """Publishes a message synchronously."""
    ...