Skip to content

LogicPublisher

faststream.confluent.publisher.usecase.LogicPublisher #

LogicPublisher(config, specifcication)

Bases: PublisherUsecase

A class to publish messages to a Kafka topic.

Source code in faststream/confluent/publisher/usecase.py
def __init__(
    self,
    config: "KafkaPublisherConfig",
    specifcication: "PublisherSpecification[Any, Any]",
) -> None:
    super().__init__(config, specifcication)

    self._topic = config.topic
    self.partition = config.partition
    self.reply_to = config.reply_to
    self.headers = config.headers or {}

partition instance-attribute #

partition = partition

reply_to instance-attribute #

reply_to = reply_to

headers instance-attribute #

headers = headers or {}

topic property #

topic

specification instance-attribute #

specification = specification

middlewares instance-attribute #

middlewares = middlewares

mock instance-attribute #

mock = MagicMock()

request async #

request(
    message,
    topic="",
    *,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    correlation_id=None,
    timeout=0.5,
)
Source code in faststream/confluent/publisher/usecase.py
@override
async def request(
    self,
    message: "SendableMessage",
    topic: str = "",
    *,
    key: bytes | str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    timeout: float = 0.5,
) -> "KafkaMessage":
    cmd = KafkaPublishCommand(
        message,
        topic=topic or self.topic,
        key=key,
        partition=partition or self.partition,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        timestamp_ms=timestamp_ms,
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
    )

    msg: KafkaMessage = await self._basic_request(
        cmd,
        producer=self._outer_config.producer,
    )
    return msg

flush async #

flush()
Source code in faststream/confluent/publisher/usecase.py
async def flush(self) -> None:
    producer = cast("AsyncConfluentFastProducer", self._outer_config.producer)
    await producer.flush()

publish abstractmethod async #

publish(message, /, *, correlation_id=None)

Public method to publish a message.

Should be called by user only broker.publisher(...).publish(...).

Source code in faststream/_internal/endpoint/publisher/proto.py
@abstractmethod
async def publish(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: str | None = None,
) -> Any | None:
    """Public method to publish a message.

    Should be called by user only `broker.publisher(...).publish(...)`.
    """
    ...

start async #

start()
Source code in faststream/_internal/endpoint/publisher/usecase.py
async def start(self) -> None:
    pass

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema()
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()