Skip to content

LogicPublisher

faststream.redis.publisher.usecase.LogicPublisher #

LogicPublisher(config, specification)

Bases: PublisherUsecase

A class to represent a Redis publisher.

Source code in faststream/redis/publisher/usecase.py
def __init__(
    self,
    config: "RedisPublisherConfig",
    specification: "PublisherSpecification[Any, Any]",
) -> None:
    super().__init__(config, specification)

    self.config = config

    self.reply_to = config.reply_to
    self.headers = config.headers or {}

    self.producer = self.config._outer_config.producer

config instance-attribute #

config = config

reply_to instance-attribute #

reply_to = reply_to

headers instance-attribute #

headers = headers or {}

producer instance-attribute #

producer = producer

specification instance-attribute #

specification = specification

middlewares instance-attribute #

middlewares = middlewares

mock instance-attribute #

mock = MagicMock()

start async #

start()
Source code in faststream/redis/publisher/usecase.py
async def start(self) -> None:
    await super().start()

    broker_producer = self.config._outer_config.producer

    self.producer = RedisFastProducer(
        connection=self.config._outer_config.connection,
        parser=broker_producer._parser.custom_func,
        decoder=broker_producer._decoder.custom_func,
        message_format=self.config.message_format,
        serializer=self.config._outer_config.fd_config._serializer,
    )

subscriber_property abstractmethod #

subscriber_property(*, name_only)
Source code in faststream/redis/publisher/usecase.py
@abstractmethod
def subscriber_property(self, *, name_only: bool) -> dict[str, Any]:
    raise NotImplementedError

publish abstractmethod async #

publish(message, /, *, correlation_id=None)

Public method to publish a message.

Should be called by user only broker.publisher(...).publish(...).

Source code in faststream/_internal/endpoint/publisher/proto.py
@abstractmethod
async def publish(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: str | None = None,
) -> Any | None:
    """Public method to publish a message.

    Should be called by user only `broker.publisher(...).publish(...)`.
    """
    ...

request abstractmethod async #

request(message, /, *, correlation_id=None)

Publishes a message synchronously.

Source code in faststream/_internal/endpoint/publisher/proto.py
@abstractmethod
async def request(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: str | None = None,
) -> Any | None:
    """Publishes a message synchronously."""
    ...

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema()
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()