Skip to content

RedisPublishCommand

faststream.redis.response.RedisPublishCommand #

RedisPublishCommand(
    message,
    /,
    *messages,
    _publish_type,
    correlation_id=None,
    channel=None,
    list=None,
    stream=None,
    maxlen=None,
    headers=None,
    reply_to="",
    timeout=30.0,
    pipeline=None,
    message_format=BinaryMessageFormatV1,
)

Bases: BatchPublishCommand

Source code in faststream/redis/response.py
def __init__(
    self,
    message: "SendableMessage",
    /,
    *messages: "SendableMessage",
    _publish_type: "PublishType",
    correlation_id: str | None = None,
    channel: str | None = None,
    list: str | None = None,
    stream: str | None = None,
    maxlen: int | None = None,
    headers: dict[str, Any] | None = None,
    reply_to: str = "",
    timeout: float | None = 30.0,
    pipeline: Optional["Pipeline[bytes]"] = None,
    message_format: type["MessageFormat"] = BinaryMessageFormatV1,
) -> None:
    super().__init__(
        message,
        *messages,
        _publish_type=_publish_type,
        correlation_id=correlation_id,
        reply_to=reply_to,
        destination="",
        headers=headers,
    )

    self.pipeline = pipeline

    self.message_format = message_format

    self.set_destination(
        channel=channel,
        list=list,
        stream=stream,
    )

    # Stream option
    self.maxlen = maxlen

    # Request option
    self.timeout = timeout

destination_type instance-attribute #

destination_type

pipeline instance-attribute #

pipeline = pipeline

message_format instance-attribute #

message_format = message_format

maxlen instance-attribute #

maxlen = maxlen

timeout instance-attribute #

timeout = timeout

body instance-attribute #

body = body

headers instance-attribute #

headers = headers or {}

correlation_id instance-attribute #

correlation_id = correlation_id

destination instance-attribute #

destination = destination

reply_to instance-attribute #

reply_to = reply_to

publish_type instance-attribute #

publish_type = _publish_type

batch_bodies property writable #

batch_bodies

extra_bodies instance-attribute #

extra_bodies = bodies

set_destination #

set_destination(*, channel=None, list=None, stream=None)
Source code in faststream/redis/response.py
def set_destination(
    self,
    *,
    channel: str | None = None,
    list: str | None = None,
    stream: str | None = None,
) -> None:
    if channel is not None:
        self.destination_type = DestinationType.Channel
        self.destination = channel
    elif list is not None:
        self.destination_type = DestinationType.List
        self.destination = list
    elif stream is not None:
        self.destination_type = DestinationType.Stream
        self.destination = stream
    else:
        raise SetupError(INCORRECT_SETUP_MSG)

from_cmd classmethod #

from_cmd(
    cmd,
    *,
    batch=False,
    message_format=BinaryMessageFormatV1,
)
Source code in faststream/redis/response.py
@classmethod
def from_cmd(
    cls,
    cmd: Union["PublishCommand", "RedisPublishCommand"],
    *,
    batch: bool = False,
    message_format: type["MessageFormat"] = BinaryMessageFormatV1,
) -> "RedisPublishCommand":
    if isinstance(cmd, RedisPublishCommand):
        # NOTE: Should return a copy probably.
        return cmd

    body, extra_bodies = cls._parse_bodies(cmd.body, batch=batch)

    return cls(
        body,
        *extra_bodies,
        channel=cmd.destination,
        correlation_id=cmd.correlation_id,
        headers=cmd.headers,
        reply_to=cmd.reply_to,
        message_format=message_format,
        _publish_type=cmd.publish_type,
    )

as_publish_command #

as_publish_command()

Method to transform handlers' Response result to DTO for publishers.

Source code in faststream/response/response.py
def as_publish_command(self) -> "PublishCommand":
    """Method to transform handlers' Response result to DTO for publishers."""
    return PublishCommand(
        body=self.body,
        headers=self.headers,
        correlation_id=self.correlation_id,
        _publish_type=PublishType.PUBLISH,
    )

add_headers #

add_headers(headers, *, override=True)
Source code in faststream/response/response.py
def add_headers(
    self,
    headers: dict[str, Any],
    *,
    override: bool = True,
) -> None:
    if override:
        self.headers |= headers
    else:
        self.headers = headers | self.headers