Skip to content

BatchPublisher

faststream.confluent.publisher.usecase.BatchPublisher #

BatchPublisher(
    config: KafkaPublisherConfig,
    specifcication: PublisherSpecification[Any, Any],
)

Bases: LogicPublisher

Source code in faststream/confluent/publisher/usecase.py
def __init__(
    self,
    config: "KafkaPublisherConfig",
    specifcication: "PublisherSpecification[Any, Any]",
) -> None:
    super().__init__(config, specifcication)

    self._topic = config.topic
    self.partition = config.partition
    self.reply_to = config.reply_to
    self.headers = config.headers or {}

specification instance-attribute #

specification = specification

middlewares instance-attribute #

middlewares = middlewares

mock instance-attribute #

mock = MagicMock()

partition instance-attribute #

partition = partition

reply_to instance-attribute #

reply_to = reply_to

headers instance-attribute #

headers = headers or {}

topic property #

topic: str

publish async #

publish(
    *messages: SendableMessage,
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: bool = False,
) -> None
Source code in faststream/confluent/publisher/usecase.py
@override
async def publish(
    self,
    *messages: "SendableMessage",
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: bool = False,
) -> None:
    cmd = KafkaPublishCommand(
        *messages,
        key=None,
        topic=topic or self.topic,
        partition=partition if partition is not None else self.partition,
        reply_to=reply_to or self.reply_to,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        timestamp_ms=timestamp_ms,
        no_confirm=no_confirm,
        _publish_type=PublishType.PUBLISH,
    )

    await self._basic_publish_batch(
        cmd,
        producer=self._outer_config.producer,
        _extra_middlewares=(),
    )

request async #

request(
    message: SendableMessage,
    topic: str = "",
    *,
    key: bytes | str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    timeout: float = 0.5,
) -> KafkaMessage
Source code in faststream/confluent/publisher/usecase.py
@override
async def request(
    self,
    message: "SendableMessage",
    topic: str = "",
    *,
    key: bytes | str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    timeout: float = 0.5,
) -> "KafkaMessage":
    cmd = KafkaPublishCommand(
        message,
        topic=topic or self.topic,
        key=key,
        partition=partition if partition is not None else self.partition,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        timestamp_ms=timestamp_ms,
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
    )

    msg: KafkaMessage = await self._basic_request(
        cmd,
        producer=self._outer_config.producer,
    )
    return msg

start async #

start() -> None
Source code in faststream/_internal/endpoint/publisher/usecase.py
async def start(self) -> None:
    pass

set_test #

set_test(*, mock: MagicMock, with_fake: bool) -> None

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test() -> None

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema() -> dict[str, PublisherSpec]
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()

flush async #

flush() -> None
Source code in faststream/confluent/publisher/usecase.py
async def flush(self) -> None:
    producer = cast("AsyncConfluentFastProducer", self._outer_config.producer)
    await producer.flush()