Skip to content

ListPublisher

faststream.redis.publisher.usecase.ListPublisher #

ListPublisher(config, specification, *, list)

Bases: LogicPublisher

Source code in faststream/redis/publisher/usecase.py
def __init__(
    self,
    config: "RedisPublisherConfig",
    specification: "PublisherSpecification[Any, Any]",
    *,
    list: "ListSub",
) -> None:
    super().__init__(config, specification)

    self._list = list

list property #

list

specification instance-attribute #

specification = specification

middlewares instance-attribute #

middlewares = middlewares

mock instance-attribute #

mock = MagicMock()

config instance-attribute #

config = config

reply_to instance-attribute #

reply_to = reply_to

headers instance-attribute #

headers = headers or {}

producer instance-attribute #

producer = producer

subscriber_property #

subscriber_property(*, name_only)
Source code in faststream/redis/publisher/usecase.py
@override
def subscriber_property(self, *, name_only: bool) -> dict[str, Any]:
    return {
        "channel": None,
        "list": self.list.name if name_only else self.list,
        "stream": None,
    }

publish async #

publish(
    message=None,
    list=None,
    reply_to="",
    headers=None,
    correlation_id=None,
    *,
    pipeline=None,
)
Source code in faststream/redis/publisher/usecase.py
@override
async def publish(
    self,
    message: "SendableMessage" = None,
    list: str | None = None,
    reply_to: str = "",
    headers: dict[str, Any] | None = None,
    correlation_id: str | None = None,
    *,
    pipeline: Optional["Pipeline[bytes]"] = None,
) -> int:
    cmd = RedisPublishCommand(
        message,
        list=list or self.list.name,
        reply_to=reply_to or self.reply_to,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        pipeline=pipeline,
        _publish_type=PublishType.PUBLISH,
        message_format=self.config.message_format,
    )

    result: int = await self._basic_publish(
        cmd,
        producer=self.producer,
        _extra_middlewares=(),
    )
    return result

request async #

request(
    message=None,
    list=None,
    *,
    correlation_id=None,
    headers=None,
    timeout=30.0,
)
Source code in faststream/redis/publisher/usecase.py
@override
async def request(
    self,
    message: "SendableMessage" = None,
    list: str | None = None,
    *,
    correlation_id: str | None = None,
    headers: dict[str, Any] | None = None,
    timeout: float | None = 30.0,
) -> "RedisChannelMessage":
    cmd = RedisPublishCommand(
        message,
        list=list or self.list.name,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
        message_format=self.config.message_format,
    )

    msg: RedisChannelMessage = await self._basic_request(
        cmd,
        producer=self.producer,
    )
    return msg

start async #

start()
Source code in faststream/redis/publisher/usecase.py
async def start(self) -> None:
    await super().start()

    broker_producer = self.config._outer_config.producer

    self.producer = RedisFastProducer(
        connection=self.config._outer_config.connection,
        parser=broker_producer._parser.custom_func,
        decoder=broker_producer._decoder.custom_func,
        message_format=self.config.message_format,
        serializer=self.config._outer_config.fd_config._serializer,
    )

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema()
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()