Skip to content

RedisResponse

faststream.redis.RedisResponse #

RedisResponse(
    body=None,
    *,
    headers=None,
    correlation_id=None,
    maxlen=None,
    message_format=BinaryMessageFormatV1,
)

Bases: Response

Source code in faststream/redis/response.py
def __init__(
    self,
    body: Optional["SendableMessage"] = None,
    *,
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    maxlen: int | None = None,
    message_format: type["MessageFormat"] = BinaryMessageFormatV1,
) -> None:
    super().__init__(
        body=body,
        headers=headers,
        correlation_id=correlation_id,
    )
    self.maxlen = maxlen
    self.message_format = message_format

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

maxlen instance-attribute #

maxlen = maxlen

message_format instance-attribute #

message_format = message_format

as_publish_command #

as_publish_command()
Source code in faststream/redis/response.py
@override
def as_publish_command(self) -> "RedisPublishCommand":
    return RedisPublishCommand(
        self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
        maxlen=self.maxlen,
        message_format=self.message_format,
        channel="fake-channel",  # it will be replaced by reply-sender
    )