BaseRedisFastProducer(
connection: ConnectionState[Any],
parser: Optional[CustomCallable],
decoder: Optional[CustomCallable],
message_format: type[MessageFormat],
serializer: Optional[SerializerProto],
codec: Optional[CodecProto] = None,
)
Bases: ProducerProto[RedisPublishCommand]
Shared logic for Redis producers.
Source code in faststream/redis/publisher/producer.py
| def __init__(
self,
connection: "ConnectionState[Any]",
parser: Optional["CustomCallable"],
decoder: Optional["CustomCallable"],
message_format: type["MessageFormat"],
serializer: Optional["SerializerProto"],
codec: Optional["CodecProto"] = None,
) -> None:
self._connection = connection
default = RedisPubSubParser(SimpleParserConfig(message_format))
self._parser = ParserComposition(
parser,
default.parse_message,
)
self._decoder = ParserComposition(
decoder,
default.decode_message,
)
self.serializer = serializer
self.codec = codec or DefaultCodec()
|
serializer instance-attribute
codec instance-attribute
codec = codec or DefaultCodec()
publish_batch async
Source code in faststream/redis/publisher/producer.py
| @override
async def publish_batch(self, cmd: "RedisPublishCommand") -> int:
batch = [
await cmd.message_format.encode(
message=msg,
correlation_id=cmd.correlation_id or "",
reply_to=cmd.reply_to,
headers=cmd.headers,
serializer=self.serializer,
codec=self.codec,
)
for msg in cmd.batch_bodies
]
connection = cmd.pipeline or self._connection.client
return cast("int", await connection.rpush(cmd.destination, *batch))
|
connect
connect(
serializer: Optional[SerializerProto] = None,
codec: Optional[CodecProto] = None,
) -> None
Source code in faststream/redis/publisher/producer.py
| def connect(
self,
serializer: Optional["SerializerProto"] = None,
codec: Optional["CodecProto"] = None,
) -> None:
self.serializer = serializer
if codec is not None:
self.codec = codec
|
publish abstractmethod async
publish(cmd: PublishCommandType_contra) -> Any
Publishes a message asynchronously.
Source code in faststream/_internal/producer.py
| @abstractmethod
async def publish(self, cmd: "PublishCommandType_contra") -> Any:
"""Publishes a message asynchronously."""
...
|
request abstractmethod async
request(cmd: PublishCommandType_contra) -> Any
Publishes a message synchronously.
Source code in faststream/_internal/producer.py
| @abstractmethod
async def request(self, cmd: "PublishCommandType_contra") -> Any:
"""Publishes a message synchronously."""
...
|