BatchPublisher
faststream.kafka.publisher.BatchPublisher #
BatchPublisher(
config: KafkaPublisherConfig,
specification: PublisherSpecification[Any, Any],
)
Bases: LogicPublisher
Source code in faststream/kafka/publisher/usecase.py
request async #
request(
message: SendableMessage,
topic: str = "",
*,
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
correlation_id: str | None = None,
timeout: float = 0.5,
) -> KafkaMessage
Send a request message to Kafka topic.
| PARAMETER | DESCRIPTION |
|---|---|
message | Message body to send. TYPE: |
topic | Topic where the message will be published. TYPE: |
key | A key to associate with the message. Can be used to determine which partition to send the message to. If partition is TYPE: |
partition | Specify a partition. If not set, the partition will be selected using the configured TYPE: |
timestamp_ms | Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time. TYPE: |
headers | Message headers to store metainformation. TYPE: |
correlation_id | Manual message correlation_id setter. correlation_id is a useful option to trace messages. TYPE: |
timeout | Timeout to send RPC request. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
KafkaMessage | The response message. TYPE: |
Source code in faststream/kafka/publisher/usecase.py
start async #
set_test #
reset_test #
schema #
schema() -> dict[str, PublisherSpec]
flush async #
publish async #
publish(
*messages: SendableMessage,
topic: str = "",
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
reply_to: str = "",
correlation_id: str | None = None,
no_confirm: Literal[False] = False,
) -> RecordMetadata
publish(
*messages: SendableMessage,
topic: str = "",
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
reply_to: str = "",
correlation_id: str | None = None,
no_confirm: Literal[True] = ...,
) -> Future[RecordMetadata]
publish(
*messages: SendableMessage,
topic: str = "",
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
reply_to: str = "",
correlation_id: str | None = None,
no_confirm: bool = False,
) -> Union[Future[RecordMetadata], RecordMetadata]
publish(
*messages: SendableMessage,
topic: str = "",
key: bytes | Any | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
reply_to: str = "",
correlation_id: str | None = None,
no_confirm: bool = False,
) -> Union[Future[RecordMetadata], RecordMetadata]
Publish a message batch as a single request to broker.
| PARAMETER | DESCRIPTION |
|---|---|
*messages | Messages bodies to send. TYPE: |
topic | Topic where the message will be published. TYPE: |
key | A single key to associate with every message in this batch. If a partition is not specified and the producer uses the default partitioner, messages with the same key will be routed to the same partition. Must be bytes or serializable to bytes via the configured key serializer. If omitted, falls back to the publisher's default key (if configured). TYPE: |
partition | Specify a partition. If not set, the partition will be selected using the configured TYPE: |
timestamp_ms | Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time. TYPE: |
headers | Message headers to store metainformation. TYPE: |
reply_to | Reply message topic name to send response. TYPE: |
correlation_id | Manual message correlation_id setter. correlation_id is a useful option to trace messages. TYPE: |
no_confirm | Do not wait for Kafka publish confirmation. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
Union[Future[RecordMetadata], RecordMetadata] |
|
Union[Future[RecordMetadata], RecordMetadata] |
|