Skip to content

KafkaPublisher

faststream.confluent.broker.router.KafkaPublisher #

KafkaPublisher(
    topic,
    *,
    key=None,
    partition=None,
    headers=None,
    reply_to="",
    batch=False,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
)

Bases: ArgsContainer

Delayed KafkaPublisher registration object.

Just a copy of KafkaRegistrator.publisher(...) arguments.

Initialize KafkaPublisher.

PARAMETER DESCRIPTION
topic

Topic where the message will be published.

TYPE: str

key

A key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer's partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer.

TYPE: bytes | str | None DEFAULT: None

partition

Specify a partition. If not set, the partition will be selected using the configured partitioner.

TYPE: int | None DEFAULT: None

headers

Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway. Can be overridden by publish.headers if specified.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

Topic name to send response.

TYPE: str DEFAULT: ''

batch

Whether to send messages in batches or not.

TYPE: bool DEFAULT: False

middlewares

Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whetever to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

Source code in faststream/confluent/broker/router.py
def __init__(
    self,
    topic: str,
    *,
    key: bytes | str | None = None,
    partition: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    batch: bool = False,
    # basic args
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    # AsyncAPI args
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> None:
    """Initialize KafkaPublisher.

    Args:
        topic: Topic where the message will be published.
        key: A key to associate with the message. Can be used to
            determine which partition to send the message to. If partition
            is `None` (and producer's partitioner config is left as default),
            then messages with the same key will be delivered to the same
            partition (but if key is `None`, partition is chosen randomly).
            Must be type `bytes`, or be serializable to bytes via configured
            `key_serializer`.
        partition: Specify a partition. If not set, the partition will be
            selected using the configured `partitioner`.
        headers: Message headers to store metainformation.
            **content-type** and **correlation_id** will be set automatically by framework anyway.
            Can be overridden by `publish.headers` if specified.
        reply_to: Topic name to send response.
        batch: Whether to send messages in batches or not.
        middlewares: Publisher middlewares to wrap outgoing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type.
            Should be any python-native object annotation or `pydantic.BaseModel`.
        include_in_schema: Whetever to include operation in AsyncAPI schema or not.
    """
    super().__init__(
        topic=topic,
        key=key,
        partition=partition,
        batch=batch,
        headers=headers,
        reply_to=reply_to,
        # basic args
        middlewares=middlewares,
        # AsyncAPI args
        title=title,
        description=description,
        schema=schema,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args = args

kwargs instance-attribute #

kwargs = kwargs