Skip to content

BatchPublisher

faststream.kafka.publisher.BatchPublisher #

BatchPublisher(config, specification)

Bases: LogicPublisher

Source code in faststream/kafka/publisher/usecase.py
def __init__(
    self,
    config: "KafkaPublisherConfig",
    specification: "PublisherSpecification[Any, Any]",
) -> None:
    super().__init__(config, specification)

    self._topic = config.topic
    self.partition = config.partition
    self.reply_to = config.reply_to
    self.headers = config.headers or {}

specification instance-attribute #

specification = specification

middlewares instance-attribute #

middlewares = middlewares

mock instance-attribute #

mock = MagicMock()

partition instance-attribute #

partition = partition

reply_to instance-attribute #

reply_to = reply_to

headers instance-attribute #

headers = headers or {}

topic property #

topic

request async #

request(
    message,
    topic="",
    *,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    correlation_id=None,
    timeout=0.5,
)

Send a request message to Kafka topic.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: SendableMessage

topic

Topic where the message will be published.

TYPE: str DEFAULT: ''

key

A key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer's partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer.

TYPE: bytes | Any | None DEFAULT: None

partition

Specify a partition. If not set, the partition will be selected using the configured partitioner.

TYPE: int | None DEFAULT: None

timestamp_ms

Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

TYPE: int | None DEFAULT: None

headers

Message headers to store metainformation.

TYPE: dict[str, str] | None DEFAULT: None

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

timeout

Timeout to send RPC request.

TYPE: float DEFAULT: 0.5

RETURNS DESCRIPTION
KafkaMessage

The response message.

TYPE: KafkaMessage

Source code in faststream/kafka/publisher/usecase.py
@override
async def request(
    self,
    message: "SendableMessage",
    topic: str = "",
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    timeout: float = 0.5,
) -> "KafkaMessage":
    """Send a request message to Kafka topic.

    Args:
        message: Message body to send.
        topic: Topic where the message will be published.
        key: A key to associate with the message. Can be used to
            determine which partition to send the message to. If partition
            is `None` (and producer's partitioner config is left as default),
            then messages with the same key will be delivered to the same
            partition (but if key is `None`, partition is chosen randomly).
            Must be type `bytes`, or be serializable to bytes via configured
            `key_serializer`.
        partition: Specify a partition. If not set, the partition will be
            selected using the configured `partitioner`.
        timestamp_ms: Epoch milliseconds (from Jan 1 1970 UTC) to use as
            the message timestamp. Defaults to current time.
        headers: Message headers to store metainformation.
        correlation_id: Manual message **correlation_id** setter.
            **correlation_id** is a useful option to trace messages.
        timeout: Timeout to send RPC request.

    Returns:
        KafkaMessage: The response message.
    """
    cmd = KafkaPublishCommand(
        message,
        topic=topic or self.topic,
        key=key,
        partition=partition or self.partition,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        timestamp_ms=timestamp_ms,
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
    )

    msg: KafkaMessage = await self._basic_request(
        cmd,
        producer=self._outer_config.producer,
    )
    return msg

start async #

start()
Source code in faststream/_internal/endpoint/publisher/usecase.py
async def start(self) -> None:
    pass

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema()
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()

flush async #

flush()
Source code in faststream/kafka/publisher/usecase.py
async def flush(self) -> None:
    producer = cast("AioKafkaFastProducer", self._outer_config.producer)
    await producer.flush()

publish async #

publish(
    *messages: SendableMessage,
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    no_confirm: Literal[False] = False,
) -> RecordMetadata
publish(
    *messages: SendableMessage,
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    no_confirm: Literal[True] = ...,
) -> Future[RecordMetadata]
publish(
    *messages: SendableMessage,
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    no_confirm: bool = False,
) -> Union[Future[RecordMetadata], RecordMetadata]
publish(
    *messages,
    topic="",
    partition=None,
    timestamp_ms=None,
    headers=None,
    reply_to="",
    correlation_id=None,
    no_confirm=False,
)

Publish a message batch as a single request to broker.

PARAMETER DESCRIPTION
*messages

Messages bodies to send.

TYPE: SendableMessage DEFAULT: ()

topic

Topic where the message will be published.

TYPE: str DEFAULT: ''

partition

Specify a partition. If not set, the partition will be selected using the configured partitioner

TYPE: int | None DEFAULT: None

timestamp_ms

Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

TYPE: int | None DEFAULT: None

headers

Message headers to store metainformation.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

Reply message topic name to send response.

TYPE: str DEFAULT: ''

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

no_confirm

Do not wait for Kafka publish confirmation.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[Future[RecordMetadata], RecordMetadata]

asyncio.Future[RecordMetadata] if no_confirm = True.

Union[Future[RecordMetadata], RecordMetadata]

RecordMetadata if no_confirm = False.

Source code in faststream/kafka/publisher/usecase.py
@override
async def publish(
    self,
    *messages: "SendableMessage",
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    no_confirm: bool = False,
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
    """Publish a message batch as a single request to broker.

    Args:
        *messages:
            Messages bodies to send.
        topic:
            Topic where the message will be published.
        partition:
            Specify a partition. If not set, the partition will be
            selected using the configured `partitioner`
        timestamp_ms:
            Epoch milliseconds (from Jan 1 1970 UTC) to use as
            the message timestamp. Defaults to current time.
        headers:
            Message headers to store metainformation.
        reply_to:
            Reply message topic name to send response.
        correlation_id:
            Manual message **correlation_id** setter.
            **correlation_id** is a useful option to trace messages.
        no_confirm:
            Do not wait for Kafka publish confirmation.

    Returns:
        `asyncio.Future[RecordMetadata]` if no_confirm = True.
        `RecordMetadata` if no_confirm = False.
    """
    cmd = KafkaPublishCommand(
        *messages,
        key=None,
        topic=topic or self.topic,
        partition=partition or self.partition,
        reply_to=reply_to or self.reply_to,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        timestamp_ms=timestamp_ms,
        no_confirm=no_confirm,
        _publish_type=PublishType.PUBLISH,
    )

    return await self._basic_publish_batch(
        cmd,
        producer=self._outer_config.producer,
        _extra_middlewares=(),
    )