Skip to content

KafkaBroker

faststream.kafka.KafkaBroker #

KafkaBroker(
    bootstrap_servers="localhost",
    *,
    request_timeout_ms=40 * 1000,
    retry_backoff_ms=100,
    metadata_max_age_ms=5 * 60 * 1000,
    connections_max_idle_ms=9 * 60 * 1000,
    sasl_kerberos_service_name="kafka",
    sasl_kerberos_domain_name=None,
    sasl_oauth_token_provider=None,
    loop=None,
    client_id=SERVICE_NAME,
    acks=_missing,
    key_serializer=None,
    value_serializer=None,
    compression_type=None,
    max_batch_size=16 * 1024,
    partitioner=DefaultPartitioner(),
    max_request_size=1024 * 1024,
    linger_ms=0,
    enable_idempotence=False,
    transactional_id=None,
    transaction_timeout_ms=60 * 1000,
    graceful_timeout=15.0,
    decoder=None,
    parser=None,
    dependencies=(),
    middlewares=(),
    routers=(),
    security=None,
    specification_url=None,
    protocol=None,
    protocol_version="auto",
    description=None,
    tags=(),
    logger=EMPTY,
    log_level=INFO,
    apply_types=True,
    serializer=EMPTY,
    provider=None,
    context=None,
)

Bases: KafkaRegistrator, BrokerUsecase[ConsumerRecord | tuple[ConsumerRecord, ...], Callable[..., AIOKafkaConsumer]]

Kafka broker constructor.

PARAMETER DESCRIPTION
bootstrap_servers

A host[:port] string (or list of host[:port] strings) that the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092.

TYPE: Union[str, Iterable[str]] DEFAULT: 'localhost'

request_timeout_ms

Client request timeout in milliseconds.

TYPE: int DEFAULT: 40 * 1000

retry_backoff_ms

Milliseconds to backoff when retrying on errors.

TYPE: int DEFAULT: 100

metadata_max_age_ms

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

TYPE: int DEFAULT: 5 * 60 * 1000

connections_max_idle_ms

Close idle connections after the number of milliseconds specified by this config. Specifying None will disable idle checks.

TYPE: int DEFAULT: 9 * 60 * 1000

sasl_kerberos_service_name

Kerberos service name.

TYPE: str DEFAULT: 'kafka'

sasl_kerberos_domain_name

Kerberos domain name.

TYPE: Optional[str] DEFAULT: None

sasl_oauth_token_provider

OAuthBearer token provider instance.

TYPE: Optional[AbstractTokenProvider] DEFAULT: None

loop

Event loop to use.

TYPE: Optional[AbstractEventLoop] DEFAULT: None

client_id

A name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to :class:~.consumer.group_coordinator.GroupCoordinator for logging with respect to consumer group administration.

TYPE: Optional[str] DEFAULT: SERVICE_NAME

acks

One of 0, 1, all. The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: * 0: Producer will not wait for any acknowledgment from the server at all. The message will immediately be added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. * 1: The broker leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. * all: The broker leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. If unset, defaults to acks=1. If enable_idempotence is :data:True defaults to acks=all.

TYPE: Union[Literal[0, 1, -1, 'all'], object] DEFAULT: _missing

key_serializer

Used to convert user-supplied keys to bytes.

TYPE: Optional[Callable[[Any], bytes]] DEFAULT: None

value_serializer

Used to convert user-supplied message values to bytes.

TYPE: Optional[Callable[[Any], bytes]] DEFAULT: None

compression_type

The compression type for all data generated by the producer. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).

TYPE: Optional[Literal['gzip', 'snappy', 'lz4', 'zstd']] DEFAULT: None

max_batch_size

Maximum size of buffered data per partition. After this amount send coroutine will block until batch is drained.

TYPE: int DEFAULT: 16 * 1024

partitioner

Callable used to determine which partition each message is assigned to. Called (after key serialization): partitioner(key_bytes, all_partitions, available_partitions). The default partitioner implementation hashes each non-None key using the same murmur2 algorithm as the Java client so that messages with the same key are assigned to the same partition. When a key is :data:None, the message is delivered to a random partition (filtered to partitions with available leaders only, if possible).

TYPE: Callable DEFAULT: DefaultPartitioner()

max_request_size

The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.

TYPE: int DEFAULT: 1024 * 1024

linger_ms

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay; that is, if first request is processed faster, than linger_ms, producer will wait linger_ms - process_time.

TYPE: int DEFAULT: 0

enable_idempotence

When set to True, the producer will ensure that exactly one copy of each message is written in the stream. If False, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence acks to set to all. If it is not explicitly set by the user it will be chosen.

TYPE: bool DEFAULT: False

transactional_id

Transactional id for the producer.

TYPE: Optional[str] DEFAULT: None

transaction_timeout_ms

Transaction timeout in milliseconds.

TYPE: int DEFAULT: 60 * 1000

graceful_timeout

Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.

TYPE: Optional[float] DEFAULT: 15.0

decoder

Custom decoder object.

TYPE: Optional[CustomCallable] DEFAULT: None

parser

Custom parser object.

TYPE: Optional[CustomCallable] DEFAULT: None

dependencies

Dependencies to apply to all broker subscribers.

TYPE: Iterable[Dependant] DEFAULT: ()

middlewares

Middlewares to apply to all broker publishers/subscribers.

TYPE: Sequence[BrokerMiddlewarep[Any, Any]] DEFAULT: ()

routers

Routers to apply to broker.

TYPE: Sequence[Registrator] DEFAULT: ()

security

Security options to connect broker and generate AsyncAPI server security information.

TYPE: Optional[BaseSecurity] DEFAULT: None

specification_url

AsyncAPI hardcoded server addresses. Use servers if not specified.

TYPE: Union[str, Iterable[str], None] DEFAULT: None

protocol

AsyncAPI server protocol.

TYPE: Optional[str] DEFAULT: None

protocol_version

AsyncAPI server protocol version.

TYPE: Optional[str] DEFAULT: 'auto'

description

AsyncAPI server description.

TYPE: Optional[str] DEFAULT: None

tags

AsyncAPI server tags.

TYPE: Iterable[Union[Tag, TagDict]] DEFAULT: ()

logger

User specified logger to pass into Context and log service messages.

TYPE: Optional[LoggerProto] DEFAULT: EMPTY

log_level

Service messages log level.

TYPE: int DEFAULT: INFO

apply_types

Whether to use FastDepends or not.

TYPE: bool DEFAULT: True

serializer

Serializer to use.

TYPE: Optional[SerializerProto] DEFAULT: EMPTY

provider

Provider for FastDepends.

TYPE: Optional[Provider] DEFAULT: None

context

Context for FastDepends.

TYPE: Optional[ContextRepo] DEFAULT: None

Source code in faststream/kafka/broker/broker.py
def __init__(
    self,
    bootstrap_servers: str | Iterable[str] = "localhost",
    *,
    # both
    request_timeout_ms: int = 40 * 1000,
    retry_backoff_ms: int = 100,
    metadata_max_age_ms: int = 5 * 60 * 1000,
    connections_max_idle_ms: int = 9 * 60 * 1000,
    sasl_kerberos_service_name: str = "kafka",
    sasl_kerberos_domain_name: str | None = None,
    sasl_oauth_token_provider: Optional["AbstractTokenProvider"] = None,
    loop: Optional["asyncio.AbstractEventLoop"] = None,
    client_id: str | None = SERVICE_NAME,
    # publisher args
    acks: Literal[0, 1, -1, "all"] | object = _missing,
    key_serializer: Callable[[Any], bytes] | None = None,
    value_serializer: Callable[[Any], bytes] | None = None,
    compression_type: Literal["gzip", "snappy", "lz4", "zstd"] | None = None,
    max_batch_size: int = 16 * 1024,
    partitioner: Callable[
        [bytes, list[Partition], list[Partition]],
        Partition,
    ] = DefaultPartitioner(),  # noqa: B008
    max_request_size: int = 1024 * 1024,
    linger_ms: int = 0,
    enable_idempotence: bool = False,
    transactional_id: str | None = None,
    transaction_timeout_ms: int = 60 * 1000,
    # broker base args
    graceful_timeout: float | None = 15.0,
    decoder: Optional["CustomCallable"] = None,
    parser: Optional["CustomCallable"] = None,
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    routers: Iterable[KafkaRegistrator] = (),
    # AsyncAPI args
    security: Optional["BaseSecurity"] = None,
    specification_url: str | Iterable[str] | None = None,
    protocol: str | None = None,
    protocol_version: str | None = "auto",
    description: str | None = None,
    tags: Iterable[Union["Tag", "TagDict"]] = (),
    # logging args
    logger: Optional["LoggerProto"] = EMPTY,
    log_level: int = logging.INFO,
    # FastDepends args
    apply_types: bool = True,
    serializer: Optional["SerializerProto"] = EMPTY,
    provider: Optional["Provider"] = None,
    context: Optional["ContextRepo"] = None,
) -> None:
    """Kafka broker constructor.

    Args:
        bootstrap_servers (Union[str, Iterable[str]]):
            A `host[:port]` string (or list of `host[:port]` strings) that the consumer should contact to bootstrap
            initial cluster metadata. This does not have to be the full node list.
            It just needs to have at least one broker that will respond to a
            Metadata API Request. Default port is 9092.
        request_timeout_ms (int):
            Client request timeout in milliseconds.
        retry_backoff_ms (int):
            Milliseconds to backoff when retrying on errors.
        metadata_max_age_ms (int):
            The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any
            partition leadership changes to proactively discover any new brokers or partitions.
        connections_max_idle_ms (int):
            Close idle connections after the number of milliseconds specified by this config. Specifying `None` will
            disable idle checks.
        sasl_kerberos_service_name (str):
            Kerberos service name.
        sasl_kerberos_domain_name (Optional[str]):
            Kerberos domain name.
        sasl_oauth_token_provider (Optional[AbstractTokenProvider]):
            OAuthBearer token provider instance.
        loop (Optional[asyncio.AbstractEventLoop]):
            Event loop to use.
        client_id (Optional[str]):
            A name for this client. This string is passed in each request to servers and can be used to identify specific
            server-side log entries that correspond to this client. Also submitted to :class:`~.consumer.group_coordinator.GroupCoordinator`
            for logging with respect to consumer group administration.
        acks (Union[Literal[0, 1, -1, "all"], object]):
            One of ``0``, ``1``, ``all``. The number of acknowledgments the producer requires the leader to have received before considering a
            request complete. This controls the durability of records that are sent. The following settings are common:
            * ``0``: Producer will not wait for any acknowledgment from the server at all. The message will immediately be added to the socket
              buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries
              configuration will not take effect (as the client won't generally know of any failures). The offset given back for each
              record will always be set to -1.
            * ``1``: The broker leader will write the record to its local log but will respond without awaiting full acknowledgement from all
              followers. In this case should the leader fail immediately after acknowledging the record but before the followers have
              replicated it then the record will be lost.
            * ``all``: The broker leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the
              record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
            If unset, defaults to ``acks=1``. If `enable_idempotence` is :data:`True` defaults to ``acks=all``.
        key_serializer (Optional[Callable[[Any], bytes]]):
            Used to convert user-supplied keys to bytes.
        value_serializer (Optional[Callable[[Any], bytes]]):
            Used to convert user-supplied message values to bytes.
        compression_type (Optional[Literal["gzip", "snappy", "lz4", "zstd"]]):
            The compression type for all data generated by the producer.
            Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
        max_batch_size (int):
            Maximum size of buffered data per partition. After this amount `send` coroutine will block until batch is drained.
        partitioner (Callable):
            Callable used to determine which partition each message is assigned to. Called (after key serialization):
            ``partitioner(key_bytes, all_partitions, available_partitions)``.
            The default partitioner implementation hashes each non-None key using the same murmur2 algorithm as the Java client so that
            messages with the same key are assigned to the same partition. When a key is :data:`None`, the message is delivered to a random partition
            (filtered to partitions with available leaders only, if possible).
        max_request_size (int):
            The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
            has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer
            will send in a single request to avoid sending huge requests.
        linger_ms (int):
            The producer groups together any records that arrive in between request transmissions into a single batched request.
            Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client
            may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of
            artificial delay; that is, if first request is processed faster, than `linger_ms`, producer will wait ``linger_ms - process_time``.
        enable_idempotence (bool):
            When set to `True`, the producer will ensure that exactly one copy of each message is written in the stream.
            If `False`, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream.
            Note that enabling idempotence acks to set to ``all``. If it is not explicitly set by the user it will be chosen.
        transactional_id (Optional[str]):
            Transactional id for the producer.
        transaction_timeout_ms (int):
            Transaction timeout in milliseconds.
        graceful_timeout (Optional[float]):
            Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.
        decoder (Optional[CustomCallable]):
            Custom decoder object.
        parser (Optional[CustomCallable]):
            Custom parser object.
        dependencies (Iterable[Dependant]):
            Dependencies to apply to all broker subscribers.
        middlewares (Sequence[BrokerMiddlewarep[Any, Any]]):
            Middlewares to apply to all broker publishers/subscribers.
        routers (Sequence[Registrator]):
            Routers to apply to broker.
        security (Optional[BaseSecurity]):
            Security options to connect broker and generate AsyncAPI server security information.
        specification_url (Union[str, Iterable[str], None]):
            AsyncAPI hardcoded server addresses. Use `servers` if not specified.
        protocol (Optional[str]):
            AsyncAPI server protocol.
        protocol_version (Optional[str]):
            AsyncAPI server protocol version.
        description (Optional[str]):
            AsyncAPI server description.
        tags (Iterable[Union[Tag, TagDict]]):
            AsyncAPI server tags.
        logger (Optional[LoggerProto]):
            User specified logger to pass into Context and log service messages.
        log_level (int):
            Service messages log level.
        apply_types (bool):
            Whether to use FastDepends or not.
        serializer (Optional[SerializerProto]):
            Serializer to use.
        provider (Optional[Provider]):
            Provider for FastDepends.
        context (Optional[ContextRepo]):
            Context for FastDepends.
    """
    if protocol is None:
        if security is not None and security.use_ssl:
            protocol = "kafka-secure"
        else:
            protocol = "kafka"

    servers = (
        [bootstrap_servers]
        if isinstance(bootstrap_servers, str)
        else list(bootstrap_servers)
    )

    if specification_url is not None:
        if isinstance(specification_url, str):
            specification_url = [specification_url]
        else:
            specification_url = list(specification_url)
    else:
        specification_url = servers

    connection_params = dict(
        bootstrap_servers=servers,
        # both args
        client_id=client_id,
        api_version=protocol_version,
        request_timeout_ms=request_timeout_ms,
        retry_backoff_ms=retry_backoff_ms,
        metadata_max_age_ms=metadata_max_age_ms,
        connections_max_idle_ms=connections_max_idle_ms,
        sasl_kerberos_service_name=sasl_kerberos_service_name,
        sasl_kerberos_domain_name=sasl_kerberos_domain_name,
        sasl_oauth_token_provider=sasl_oauth_token_provider,
        loop=loop,
        # publisher args
        acks=acks,
        key_serializer=key_serializer,
        value_serializer=value_serializer,
        compression_type=compression_type,
        max_batch_size=max_batch_size,
        partitioner=partitioner,
        max_request_size=max_request_size,
        linger_ms=linger_ms,
        enable_idempotence=enable_idempotence,
        transactional_id=transactional_id,
        transaction_timeout_ms=transaction_timeout_ms,
        **parse_security(security),
    )

    consumer_options, _ = filter_by_dict(
        ConsumerConnectionParams,
        connection_params,
    )
    builder = partial(aiokafka.AIOKafkaConsumer, **consumer_options)

    super().__init__(
        **connection_params,
        routers=routers,
        config=KafkaBrokerConfig(
            client_id=client_id,
            builder=builder,
            producer=AioKafkaFastProducerImpl(
                parser=parser,
                decoder=decoder,
            ),
            # both args,
            broker_decoder=decoder,
            broker_parser=parser,
            broker_middlewares=middlewares,
            logger=make_kafka_logger_state(
                logger=logger,
                log_level=log_level,
            ),
            fd_config=FastDependsConfig(
                use_fastdepends=apply_types,
                serializer=serializer,
                provider=provider or dependency_provider,
                context=context or ContextRepo(),
            ),
            # subscriber args
            graceful_timeout=graceful_timeout,
            broker_dependencies=dependencies,
            extra_context={
                "broker": self,
            },
        ),
        specification=BrokerSpec(
            description=description,
            url=specification_url,
            protocol=protocol,
            protocol_version=protocol_version,
            security=security,
            tags=tags,
        ),
    )

middlewares property #

middlewares

context property #

context

config instance-attribute #

config = ConfigComposition(config)

routers instance-attribute #

routers = []

subscribers property #

subscribers

publishers property #

publishers

specification instance-attribute #

specification = specification

running instance-attribute #

running = False

provider property #

provider

url instance-attribute #

url

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(middleware)

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

subscriber #

subscriber(
    *topics: str,
    batch: Literal[False] = False,
    group_id: str | None = None,
    key_deserializer: Callable[[bytes], Any] | None = None,
    value_deserializer: Callable[[bytes], Any]
    | None = None,
    fetch_max_bytes: int = 50 * 1024 * 1024,
    fetch_min_bytes: int = 1,
    fetch_max_wait_ms: int = 500,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    auto_offset_reset: Literal[
        "latest", "earliest", "none"
    ] = "latest",
    auto_commit: bool = EMPTY,
    auto_commit_interval_ms: int = 5 * 1000,
    check_crcs: bool = True,
    partition_assignment_strategy: Sequence[
        AbstractPartitionAssignor
    ] = (RoundRobinPartitionAssignor,),
    max_poll_interval_ms: int = 5 * 60 * 1000,
    rebalance_timeout_ms: int | None = None,
    session_timeout_ms: int = 10 * 1000,
    heartbeat_interval_ms: int = 3 * 1000,
    consumer_timeout_ms: int = 200,
    max_poll_records: int | None = None,
    exclude_internal_topics: bool = True,
    isolation_level: Literal[
        "read_uncommitted", "read_committed"
    ] = "read_uncommitted",
    batch_timeout_ms: int = 200,
    max_records: int | None = None,
    listener: Optional[ConsumerRebalanceListener] = None,
    pattern: str | None = None,
    partitions: Collection[TopicPartition] = (),
    persistent: bool = True,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> DefaultSubscriber
subscriber(
    *topics: str,
    batch: Literal[True] = ...,
    group_id: str | None = None,
    key_deserializer: Callable[[bytes], Any] | None = None,
    value_deserializer: Callable[[bytes], Any]
    | None = None,
    fetch_max_bytes: int = 50 * 1024 * 1024,
    fetch_min_bytes: int = 1,
    fetch_max_wait_ms: int = 500,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    auto_offset_reset: Literal[
        "latest", "earliest", "none"
    ] = "latest",
    auto_commit: bool = EMPTY,
    auto_commit_interval_ms: int = 5 * 1000,
    check_crcs: bool = True,
    partition_assignment_strategy: Sequence[
        AbstractPartitionAssignor
    ] = (RoundRobinPartitionAssignor,),
    max_poll_interval_ms: int = 5 * 60 * 1000,
    rebalance_timeout_ms: int | None = None,
    session_timeout_ms: int = 10 * 1000,
    heartbeat_interval_ms: int = 3 * 1000,
    consumer_timeout_ms: int = 200,
    max_poll_records: int | None = None,
    exclude_internal_topics: bool = True,
    isolation_level: Literal[
        "read_uncommitted", "read_committed"
    ] = "read_uncommitted",
    batch_timeout_ms: int = 200,
    max_records: int | None = None,
    listener: Optional[ConsumerRebalanceListener] = None,
    pattern: str | None = None,
    partitions: Collection[TopicPartition] = (),
    persistent: bool = True,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> BatchSubscriber
subscriber(
    *topics: str,
    batch: Literal[False] = False,
    group_id: None = None,
    key_deserializer: Callable[[bytes], Any] | None = None,
    value_deserializer: Callable[[bytes], Any]
    | None = None,
    fetch_max_bytes: int = 50 * 1024 * 1024,
    fetch_min_bytes: int = 1,
    fetch_max_wait_ms: int = 500,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    auto_offset_reset: Literal[
        "latest", "earliest", "none"
    ] = "latest",
    auto_commit: bool = EMPTY,
    auto_commit_interval_ms: int = 5 * 1000,
    check_crcs: bool = True,
    partition_assignment_strategy: Sequence[
        AbstractPartitionAssignor
    ] = (RoundRobinPartitionAssignor,),
    max_poll_interval_ms: int = 5 * 60 * 1000,
    rebalance_timeout_ms: int | None = None,
    session_timeout_ms: int = 10 * 1000,
    heartbeat_interval_ms: int = 3 * 1000,
    consumer_timeout_ms: int = 200,
    max_poll_records: int | None = None,
    exclude_internal_topics: bool = True,
    isolation_level: Literal[
        "read_uncommitted", "read_committed"
    ] = "read_uncommitted",
    batch_timeout_ms: int = 200,
    max_records: int | None = None,
    listener: Optional[ConsumerRebalanceListener] = None,
    pattern: str | None = None,
    partitions: Collection[TopicPartition] = (),
    persistent: bool = True,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int = ...,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ConcurrentDefaultSubscriber
subscriber(
    *topics: str,
    batch: Literal[False] = False,
    group_id: str = ...,
    key_deserializer: Callable[[bytes], Any] | None = None,
    value_deserializer: Callable[[bytes], Any]
    | None = None,
    fetch_max_bytes: int = 50 * 1024 * 1024,
    fetch_min_bytes: int = 1,
    fetch_max_wait_ms: int = 500,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    auto_offset_reset: Literal[
        "latest", "earliest", "none"
    ] = "latest",
    auto_commit: bool = EMPTY,
    auto_commit_interval_ms: int = 5 * 1000,
    check_crcs: bool = True,
    partition_assignment_strategy: Sequence[
        AbstractPartitionAssignor
    ] = (RoundRobinPartitionAssignor,),
    max_poll_interval_ms: int = 5 * 60 * 1000,
    rebalance_timeout_ms: int | None = None,
    session_timeout_ms: int = 10 * 1000,
    heartbeat_interval_ms: int = 3 * 1000,
    consumer_timeout_ms: int = 200,
    max_poll_records: int | None = None,
    exclude_internal_topics: bool = True,
    isolation_level: Literal[
        "read_uncommitted", "read_committed"
    ] = "read_uncommitted",
    batch_timeout_ms: int = 200,
    max_records: int | None = None,
    listener: Optional[ConsumerRebalanceListener] = None,
    pattern: str | None = None,
    partitions: Collection[TopicPartition] = (),
    persistent: bool = True,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int = ...,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ConcurrentBetweenPartitionsSubscriber
subscriber(
    *topics: str,
    batch: bool = False,
    group_id: str | None = None,
    key_deserializer: Callable[[bytes], Any] | None = None,
    value_deserializer: Callable[[bytes], Any]
    | None = None,
    fetch_max_bytes: int = 50 * 1024 * 1024,
    fetch_min_bytes: int = 1,
    fetch_max_wait_ms: int = 500,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    auto_offset_reset: Literal[
        "latest", "earliest", "none"
    ] = "latest",
    auto_commit: bool = EMPTY,
    auto_commit_interval_ms: int = 5 * 1000,
    check_crcs: bool = True,
    partition_assignment_strategy: Sequence[
        AbstractPartitionAssignor
    ] = (RoundRobinPartitionAssignor,),
    max_poll_interval_ms: int = 5 * 60 * 1000,
    rebalance_timeout_ms: int | None = None,
    session_timeout_ms: int = 10 * 1000,
    heartbeat_interval_ms: int = 3 * 1000,
    consumer_timeout_ms: int = 200,
    max_poll_records: int | None = None,
    exclude_internal_topics: bool = True,
    isolation_level: Literal[
        "read_uncommitted", "read_committed"
    ] = "read_uncommitted",
    batch_timeout_ms: int = 200,
    max_records: int | None = None,
    listener: Optional[ConsumerRebalanceListener] = None,
    pattern: str | None = None,
    partitions: Collection[TopicPartition] = (),
    persistent: bool = True,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int | None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> Union[
    DefaultSubscriber,
    BatchSubscriber,
    ConcurrentDefaultSubscriber,
    ConcurrentBetweenPartitionsSubscriber,
]
subscriber(
    *topics,
    batch=False,
    group_id=None,
    key_deserializer=None,
    value_deserializer=None,
    fetch_max_bytes=50 * 1024 * 1024,
    fetch_min_bytes=1,
    fetch_max_wait_ms=500,
    max_partition_fetch_bytes=1 * 1024 * 1024,
    auto_offset_reset="latest",
    auto_commit=EMPTY,
    auto_commit_interval_ms=5 * 1000,
    check_crcs=True,
    partition_assignment_strategy=(
        RoundRobinPartitionAssignor,
    ),
    max_poll_interval_ms=5 * 60 * 1000,
    rebalance_timeout_ms=None,
    session_timeout_ms=10 * 1000,
    heartbeat_interval_ms=3 * 1000,
    consumer_timeout_ms=200,
    max_poll_records=None,
    exclude_internal_topics=True,
    isolation_level="read_uncommitted",
    batch_timeout_ms=200,
    max_records=None,
    listener=None,
    pattern=None,
    partitions=(),
    persistent=True,
    dependencies=(),
    parser=None,
    decoder=None,
    middlewares=(),
    no_ack=EMPTY,
    max_workers=None,
    ack_policy=EMPTY,
    no_reply=False,
    title=None,
    description=None,
    include_in_schema=True,
)

Create a subscriber for Kafka topics.

PARAMETER DESCRIPTION
*topics

Kafka topics to consume messages from.

TYPE: str DEFAULT: ()

batch

Whether to consume messages in batches or not.

TYPE: bool DEFAULT: False

group_id

Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled.

TYPE: str | None DEFAULT: None

key_deserializer

Any callable that takes a raw message bytes key and returns a deserialized one.

TYPE: Callable[[bytes], Any] | None DEFAULT: None

value_deserializer

Any callable that takes a raw message bytes value and returns a deserialized value.

TYPE: Callable[[bytes], Any] | None DEFAULT: None

fetch_max_bytes

The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch is larger than this value, the message will still be returned to ensure that the consumer can make progress. NOTE: consumer performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic.

TYPE: int DEFAULT: 50 * 1024 * 1024

fetch_min_bytes

Minimum amount of data the server should return for a fetch request, otherwise wait up to fetch_max_wait_ms for more data to accumulate.

TYPE: int DEFAULT: 1

fetch_max_wait_ms

The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch_min_bytes.

TYPE: int DEFAULT: 500

max_partition_fetch_bytes

The maximum amount of data per-partition the server will return. The maximum total memory used for a request = #partitions * max_partition_fetch_bytes. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.

TYPE: int DEFAULT: 1 * 1024 * 1024

auto_offset_reset

A policy for resetting offsets on OffsetOutOfRangeError errors:

  • earliest will move to the oldest available message
  • latest will move to the most recent
  • none will raise an exception so you can handle this case

TYPE: Literal['latest', 'earliest', 'none'] DEFAULT: 'latest'

auto_commit

If True the consumer's offset will be periodically committed in the background.

TYPE: bool DEFAULT: EMPTY

auto_commit_interval_ms

Milliseconds between automatic offset commits, if auto_commit is True.

TYPE: int DEFAULT: 5 * 1000

check_crcs

Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.

TYPE: bool DEFAULT: True

partition_assignment_strategy

List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order of the strategies in the list. When assignment strategy changes: to support a change to the assignment strategy, new versions must enable support both for the old assignment strategy and the new one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy.

TYPE: Sequence[AbstractPartitionAssignor] DEFAULT: (RoundRobinPartitionAssignor,)

max_poll_interval_ms

Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout.

TYPE: int DEFAULT: 5 * 60 * 1000

rebalance_timeout_ms

The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to max.poll.interval.ms configuration, but as aiokafka will rejoin the group in the background, we decouple this setting to allow finer tuning by users that use ConsumerRebalanceListener to delay rebalacing. Defaults to session_timeout_ms

TYPE: int | None DEFAULT: None

session_timeout_ms

Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms.

TYPE: int DEFAULT: 10 * 1000

heartbeat_interval_ms

The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session_timeout_ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

TYPE: int DEFAULT: 3 * 1000

consumer_timeout_ms

Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions.

TYPE: int DEFAULT: 200

max_poll_records

The maximum number of records returned in a single call by batch consumer. Has no limit by default.

TYPE: int | None DEFAULT: None

exclude_internal_topics

Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it.

TYPE: bool DEFAULT: True

isolation_level

Controls how to read messages written transactionally.

  • read_committed, batch consumer will only return transactional messages which have been committed.

  • read_uncommitted (the default), batch consumer will return all messages, even transactional messages which have been aborted.

Non-transactional messages will be returned unconditionally in either mode.

Messages will always be returned in offset order. Hence, in read_committed mode, batch consumer will only return messages up to the last stable offset (ALSO), which is the one less than the offset of the first open transaction. In particular any messages appearing after messages belonging to ongoing transactions will be withheld until the relevant transaction has been completed. As a result, read_committed consumers will not be able to read up to the high watermark when there are in flight transactions. Further, when in read_committed the seek_to_end method will return the ALSO. See method docs below.

TYPE: Literal['read_uncommitted', 'read_committed'] DEFAULT: 'read_uncommitted'

batch_timeout_ms

Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty.

TYPE: int DEFAULT: 200

max_records

Number of messages to consume as one batch.

TYPE: int | None DEFAULT: None

listener

Optionally include listener callback, which will be called before and after each rebalance operation. As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and will trigger a rebalance operation if one of the following events trigger:

  • Number of partitions change for any of the subscribed topics
  • Topic is created or deleted
  • An existing member of the consumer group dies
  • A new member is added to the consumer group

When any of these events are triggered, the provided listener will be invoked first to indicate that the consumer's assignment has been revoked, and then again when the new assignment has been received. Note that this listener will immediately override any listener set in a previous call to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call.

TYPE: Optional[ConsumerRebalanceListener] DEFAULT: None

pattern

Pattern to match available topics. You must provide either topics or pattern, but not both.

TYPE: str | None DEFAULT: None

partitions

An explicit partitions list to assign. You can't use 'topics' and 'partitions' in the same time.

TYPE: Collection[TopicPartition] DEFAULT: ()

dependencies

Dependencies list ([Dependant(),]) to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Parser to map original ConsumerRecord object to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

middlewares

Subscriber middlewares to wrap incoming message processing.

TYPE: Sequence[SubscriberMiddleware[Any]] DEFAULT: ()

max_workers

Number of workers to process messages concurrently.

TYPE: int | None DEFAULT: None

no_ack

Whether to disable FastStream auto acknowledgement logic or not.

TYPE: bool DEFAULT: EMPTY

ack_policy

Acknowledgement policy for the subscriber.

TYPE: AckPolicy DEFAULT: EMPTY

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

title

Specification subscriber object title.

TYPE: str | None DEFAULT: None

description

Specification subscriber object description. " "Uses decorated docstring as default.

TYPE: str | None DEFAULT: None

include_in_schema

Whetever to include operation in Specification schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the subscriber persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/kafka/broker/registrator.py
@override
def subscriber(
    self,
    *topics: str,
    batch: bool = False,
    group_id: str | None = None,
    key_deserializer: Callable[[bytes], Any] | None = None,
    value_deserializer: Callable[[bytes], Any] | None = None,
    fetch_max_bytes: int = 50 * 1024 * 1024,
    fetch_min_bytes: int = 1,
    fetch_max_wait_ms: int = 500,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    auto_offset_reset: Literal["latest", "earliest", "none"] = "latest",
    auto_commit: Annotated[
        bool,
        deprecated(
            "This option is deprecated and will be removed in 0.7.0 release. "
            "Please, use `ack_policy=AckPolicy.ACK_FIRST` instead."
        ),
    ] = EMPTY,
    auto_commit_interval_ms: int = 5 * 1000,
    check_crcs: bool = True,
    partition_assignment_strategy: Sequence["AbstractPartitionAssignor"] = (
        RoundRobinPartitionAssignor,
    ),
    max_poll_interval_ms: int = 5 * 60 * 1000,
    rebalance_timeout_ms: int | None = None,
    session_timeout_ms: int = 10 * 1000,
    heartbeat_interval_ms: int = 3 * 1000,
    consumer_timeout_ms: int = 200,
    max_poll_records: int | None = None,
    exclude_internal_topics: bool = True,
    isolation_level: Literal[
        "read_uncommitted", "read_committed"
    ] = "read_uncommitted",
    batch_timeout_ms: int = 200,
    max_records: int | None = None,
    listener: Optional["ConsumerRebalanceListener"] = None,
    pattern: str | None = None,
    partitions: Collection["TopicPartition"] = (),
    # broker args
    persistent: bool = True,
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[Any]"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead. "
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    no_ack: Annotated[
        bool,
        deprecated(
            "This option was deprecated in 0.6.0 to prior to **ack_policy=AckPolicy.MANUAL**. "
            "Scheduled to remove in 0.7.0",
        ),
    ] = EMPTY,
    max_workers: int | None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    # Specification args
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> Union[
    "DefaultSubscriber",
    "BatchSubscriber",
    "ConcurrentDefaultSubscriber",
    "ConcurrentBetweenPartitionsSubscriber",
]:
    """Create a subscriber for Kafka topics.

    Args:
        *topics: Kafka topics to consume messages from.
        batch: Whether to consume messages in batches or not.
        group_id:
            Name of the consumer group to join for dynamic
            partition assignment (if enabled), and to use for fetching and
            committing offsets. If `None`, auto-partition assignment (via
            group coordinator) and offset commits are disabled.
        key_deserializer:
            Any callable that takes a raw message `bytes`
            key and returns a deserialized one.
        value_deserializer:
            Any callable that takes a raw message `bytes`
            value and returns a deserialized value.
        fetch_max_bytes:
            The maximum amount of data the server should
            return for a fetch request. This is not an absolute maximum, if
            the first message in the first non-empty partition of the fetch
            is larger than this value, the message will still be returned
            to ensure that the consumer can make progress. NOTE: consumer
            performs fetches to multiple brokers in parallel so memory
            usage will depend on the number of brokers containing
            partitions for the topic.
        fetch_min_bytes:
            Minimum amount of data the server should
            return for a fetch request, otherwise wait up to
            `fetch_max_wait_ms` for more data to accumulate.
        fetch_max_wait_ms:
            The maximum amount of time in milliseconds
            the server will block before answering the fetch request if
            there isn't sufficient data to immediately satisfy the
            requirement given by `fetch_min_bytes`.
        max_partition_fetch_bytes:
            The maximum amount of data
            per-partition the server will return. The maximum total memory
            used for a request ``= #partitions * max_partition_fetch_bytes``.
            This size must be at least as large as the maximum message size
            the server allows or else it is possible for the producer to
            send messages larger than the consumer can fetch. If that
            happens, the consumer can get stuck trying to fetch a large
            message on a certain partition.
        auto_offset_reset:
            A policy for resetting offsets on `OffsetOutOfRangeError` errors:

            * `earliest` will move to the oldest available message
            * `latest` will move to the most recent
            * `none` will raise an exception so you can handle this case
        auto_commit:
            If `True` the consumer's offset will be
            periodically committed in the background.

        auto_commit_interval_ms:
            Milliseconds between automatic
            offset commits, if `auto_commit` is `True`.
        check_crcs:
            Automatically check the CRC32 of the records
            consumed. This ensures no on-the-wire or on-disk corruption to
            the messages occurred. This check adds some overhead, so it may
            be disabled in cases seeking extreme performance.
        partition_assignment_strategy:
            List of objects to use to
            distribute partition ownership amongst consumer instances when
            group management is used. This preference is implicit in the order
            of the strategies in the list. When assignment strategy changes:
            to support a change to the assignment strategy, new versions must
            enable support both for the old assignment strategy and the new
            one. The coordinator will choose the old assignment strategy until
            all members have been updated. Then it will choose the new
            strategy.
        max_poll_interval_ms:
            Maximum allowed time between calls to
            consume messages in batches. If this interval
            is exceeded the consumer is considered failed and the group will
            rebalance in order to reassign the partitions to another consumer
            group member. If API methods block waiting for messages, that time
            does not count against this timeout.
        rebalance_timeout_ms:
            The maximum time server will wait for this
            consumer to rejoin the group in a case of rebalance. In Java client
            this behaviour is bound to `max.poll.interval.ms` configuration,
            but as ``aiokafka`` will rejoin the group in the background, we
            decouple this setting to allow finer tuning by users that use
            `ConsumerRebalanceListener` to delay rebalacing. Defaults
            to ``session_timeout_ms``
        session_timeout_ms:
            Client group session and failure detection
            timeout. The consumer sends periodic heartbeats
            (`heartbeat.interval.ms`) to indicate its liveness to the broker.
            If no hearts are received by the broker for a group member within
            the session timeout, the broker will remove the consumer from the
            group and trigger a rebalance. The allowed range is configured with
            the **broker** configuration properties
            `group.min.session.timeout.ms` and `group.max.session.timeout.ms`.
        heartbeat_interval_ms:
            The expected time in milliseconds
            between heartbeats to the consumer coordinator when using
            Kafka's group management feature. Heartbeats are used to ensure
            that the consumer's session stays active and to facilitate
            rebalancing when new consumers join or leave the group. The
            value must be set lower than `session_timeout_ms`, but typically
            should be set no higher than 1/3 of that value. It can be
            adjusted even lower to control the expected time for normal
            rebalances.
        consumer_timeout_ms:
            Maximum wait timeout for background fetching
            routine. Mostly defines how fast the system will see rebalance and
            request new data for new partitions.
        max_poll_records:
            The maximum number of records returned in a
            single call by batch consumer. Has no limit by default.
        exclude_internal_topics:
            Whether records from internal topics
            (such as offsets) should be exposed to the consumer. If set to True
            the only way to receive records from an internal topic is
            subscribing to it.
        isolation_level:
            Controls how to read messages written
            transactionally.

            * `read_committed`, batch consumer will only return
            transactional messages which have been committed.

            * `read_uncommitted` (the default), batch consumer will
            return all messages, even transactional messages which have been
            aborted.

            Non-transactional messages will be returned unconditionally in
            either mode.

            Messages will always be returned in offset order. Hence, in
            `read_committed` mode, batch consumer will only return
            messages up to the last stable offset (ALSO), which is the one less
            than the offset of the first open transaction. In particular any
            messages appearing after messages belonging to ongoing transactions
            will be withheld until the relevant transaction has been completed.
            As a result, `read_committed` consumers will not be able to read up
            to the high watermark when there are in flight transactions.
            Further, when in `read_committed` the seek_to_end method will
            return the ALSO. See method docs below.
        batch_timeout_ms:
            Milliseconds spent waiting if
            data is not available in the buffer. If 0, returns immediately
            with any records that are available currently in the buffer,
            else returns empty.
        max_records: Number of messages to consume as one batch.
        listener:
            Optionally include listener
            callback, which will be called before and after each rebalance
            operation.
            As part of group management, the consumer will keep track of
            the list of consumers that belong to a particular group and
            will trigger a rebalance operation if one of the following
            events trigger:

            * Number of partitions change for any of the subscribed topics
            * Topic is created or deleted
            * An existing member of the consumer group dies
            * A new member is added to the consumer group

            When any of these events are triggered, the provided listener
            will be invoked first to indicate that the consumer's
            assignment has been revoked, and then again when the new
            assignment has been received. Note that this listener will
            immediately override any listener set in a previous call
            to subscribe. It is guaranteed, however, that the partitions
            revoked/assigned
            through this interface are from topics subscribed in this call.
        pattern:
            Pattern to match available topics. You must provide either topics or pattern, but not both.
        partitions:  An explicit partitions list to assign.
            You can't use 'topics' and 'partitions' in the same time.
        dependencies: Dependencies list (`[Dependant(),]`) to apply to the subscriber.
        parser: Parser to map original **ConsumerRecord** object to FastStream one.
        decoder: Function to decode FastStream msg bytes body to python objects.
        middlewares: Subscriber middlewares to wrap incoming message processing.
        max_workers: Number of workers to process messages concurrently.
        no_ack: Whether to disable **FastStream** auto acknowledgement logic or not.
        ack_policy: Acknowledgement policy for the subscriber.
        no_reply: Whether to disable **FastStream** RPC and Reply To auto responses or not.
        title: Specification subscriber object title.
        description: Specification subscriber object description. " "Uses decorated docstring as default.
        include_in_schema: Whetever to include operation in Specification schema or not.
        persistent: Whether to make the subscriber persistent or not.
    """
    workers = max_workers or 1

    subscriber = create_subscriber(
        *topics,
        batch=batch,
        max_workers=workers,
        batch_timeout_ms=batch_timeout_ms,
        max_records=max_records,
        group_id=group_id,
        listener=listener,
        pattern=pattern,
        connection_args={
            "key_deserializer": key_deserializer,
            "value_deserializer": value_deserializer,
            "fetch_max_wait_ms": fetch_max_wait_ms,
            "fetch_max_bytes": fetch_max_bytes,
            "fetch_min_bytes": fetch_min_bytes,
            "max_partition_fetch_bytes": max_partition_fetch_bytes,
            "auto_offset_reset": auto_offset_reset,
            "auto_commit_interval_ms": auto_commit_interval_ms,
            "check_crcs": check_crcs,
            "partition_assignment_strategy": partition_assignment_strategy,
            "max_poll_interval_ms": max_poll_interval_ms,
            "rebalance_timeout_ms": rebalance_timeout_ms,
            "session_timeout_ms": session_timeout_ms,
            "heartbeat_interval_ms": heartbeat_interval_ms,
            "consumer_timeout_ms": consumer_timeout_ms,
            "max_poll_records": max_poll_records,
            "exclude_internal_topics": exclude_internal_topics,
            "isolation_level": isolation_level,
        },
        partitions=partitions,
        # acknowledgement args
        ack_policy=ack_policy,
        no_ack=no_ack,
        auto_commit=auto_commit,
        # subscriber args
        no_reply=no_reply,
        config=cast("KafkaBrokerConfig", self.config),
        # Specification
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    subscriber.add_call(
        parser_=parser,
        decoder_=decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

    if batch:
        return cast("BatchSubscriber", subscriber)

    if workers > 1:
        if auto_commit:
            return cast("ConcurrentDefaultSubscriber", subscriber)
        return cast("ConcurrentBetweenPartitionsSubscriber", subscriber)
    return cast("DefaultSubscriber", subscriber)

publisher #

publisher(
    topic: str,
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    batch: Literal[False] = False,
    persistent: bool = True,
    middlewares: Sequence[PublisherMiddleware] = (),
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
    autoflush: bool = False,
) -> DefaultPublisher
publisher(
    topic: str,
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    batch: Literal[True] = ...,
    persistent: bool = True,
    middlewares: Sequence[PublisherMiddleware] = (),
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
    autoflush: bool = False,
) -> BatchPublisher
publisher(
    topic: str,
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    batch: bool = False,
    middlewares: Sequence[PublisherMiddleware] = (),
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
    autoflush: bool = False,
) -> Union[BatchPublisher, DefaultPublisher]
publisher(
    topic,
    *,
    key=None,
    partition=None,
    headers=None,
    reply_to="",
    batch=False,
    persistent=True,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
    autoflush=False,
)

Creates long-living and Specification-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
topic

Topic where the message will be published."

TYPE: str

key

A key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer's partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer.

TYPE: bytes | Any | None DEFAULT: None

partition

Specify a partition. If not set, the partition will be selected using the configured partitioner.

TYPE: int | None DEFAULT: None

headers

Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway. Can be overridden by publish.headers if specified.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

Topic name to send response.

TYPE: str DEFAULT: ''

batch

Whether to send messages in batches or not.

TYPE: bool DEFAULT: False

middlewares

middlewares: Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

title

Specification publisher object title.

TYPE: str | None DEFAULT: None

description

Specification publisher object description.

TYPE: str | None DEFAULT: None

schema

Specification publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whetever to include operation in Specification schema or not.

TYPE: bool DEFAULT: True

autoflush

Whether to flush the producer or not on every publish call.

TYPE: bool DEFAULT: False

persistent

Whether to make the publisher persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/kafka/broker/registrator.py
@override
def publisher(
    self,
    topic: str,
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    batch: bool = False,
    # basic args
    persistent: bool = True,
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    # Specification args
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
    autoflush: bool = False,
) -> Union[
    "BatchPublisher",
    "DefaultPublisher",
]:
    """Creates long-living and Specification-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        topic: Topic where the message will be published."
        key:
            A key to associate with the message. Can be used to
            determine which partition to send the message to. If partition
            is `None` (and producer's partitioner config is left as default),
            then messages with the same key will be delivered to the same
            partition (but if key is `None`, partition is chosen randomly).
            Must be type `bytes`, or be serializable to bytes via configured
            `key_serializer`.
        partition:
            Specify a partition. If not set, the partition will be
            selected using the configured `partitioner`.
        headers:
            Message headers to store metainformation.
            **content-type** and **correlation_id** will be set automatically by framework anyway.
            Can be overridden by `publish.headers` if specified.
        reply_to: Topic name to send response.
        batch: Whether to send messages in batches or not.
        middlewares: middlewares: Publisher middlewares to wrap outgoing messages.
        title: Specification publisher object title.
        description: Specification publisher object description.
        schema:
            Specification publishing message type.
            Should be any python-native object annotation or `pydantic.BaseModel`.
        include_in_schema: Whetever to include operation in Specification schema or not.
        autoflush: Whether to flush the producer or not on every publish call.
        persistent: Whether to make the publisher persistent or not.
    """
    publisher = create_publisher(
        autoflush=autoflush,
        # batch flag
        batch=batch,
        # default args
        key=key,
        # both args
        topic=topic,
        partition=partition,
        headers=headers,
        reply_to=reply_to,
        # publisher-specific
        config=cast("KafkaBrokerConfig", self.config),
        middlewares=middlewares,
        # Specification
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )

    super().publisher(publisher, persistent=persistent)

    if batch:
        return cast("BatchPublisher", publisher)
    return cast("DefaultPublisher", publisher)

include_router #

include_router(
    router,
    *,
    prefix="",
    dependencies=(),
    middlewares=(),
    include_in_schema=None,
)
Source code in faststream/kafka/broker/registrator.py
@override
def include_router(
    self,
    router: "KafkaRegistrator",  # type: ignore[override]
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, KafkaRegistrator):
        msg = (
            f"Router must be an instance of KafkaRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

connect async #

connect()

Connect to a remote server.

Source code in faststream/_internal/broker/broker.py
async def connect(self) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        self._connection = await self._connect()
        self._setup_logger()

    return self._connection

stop async #

stop(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/kafka/broker/broker.py
async def stop(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await super().stop(exc_type, exc_val, exc_tb)
    await self.config.disconnect()
    self._connection = None

close async #

close(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/kafka/broker/broker.py
@deprecated(
    "Deprecated in **FastStream 0.5.44**. "
    "Please, use `stop` method instead. "
    "Method `close` will be removed in **FastStream 0.7.0**.",
    category=DeprecationWarning,
    stacklevel=1,
)
async def close(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await self.stop(exc_type, exc_val, exc_tb)

start async #

start()

Connect broker to Kafka and startup all subscribers.

Source code in faststream/kafka/broker/broker.py
async def start(self) -> None:
    """Connect broker to Kafka and startup all subscribers."""
    await self.connect()
    await super().start()

publish async #

publish(
    message: SendableMessage,
    topic: str = "",
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: Literal[False] = False,
) -> RecordMetadata
publish(
    message: SendableMessage,
    topic: str = "",
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: Literal[True] = ...,
) -> Future[RecordMetadata]
publish(
    message: SendableMessage,
    topic: str = "",
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: bool = False,
) -> Future[RecordMetadata] | RecordMetadata
publish(
    message,
    topic="",
    *,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    correlation_id=None,
    reply_to="",
    no_confirm=False,
)

Publish message directly.

This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks applications or to publish messages from time to time.

Please, use @broker.publisher(...) or broker.publisher(...).publish(...) instead in a regular way.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: SendableMessage

topic

Topic where the message will be published.

TYPE: str DEFAULT: ''

key

A key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer's partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer

TYPE: bytes | Any | None DEFAULT: None

partition

Specify a partition. If not set, the partition will be selected using the configured partitioner

TYPE: int | None DEFAULT: None

timestamp_ms

Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

TYPE: int | None DEFAULT: None

headers

Message headers to store metainformation.

TYPE: dict[str, str] | None DEFAULT: None

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

reply_to

Reply message topic name to send response.

TYPE: str DEFAULT: ''

no_confirm

Do not wait for Kafka publish confirmation.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Future[RecordMetadata] | RecordMetadata

asyncio.Future[RecordMetadata] if no_confirm = True.

Future[RecordMetadata] | RecordMetadata

RecordMetadata if no_confirm = False.

Source code in faststream/kafka/broker/broker.py
@override
async def publish(
    self,
    message: "SendableMessage",
    topic: str = "",
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: bool = False,
) -> asyncio.Future[RecordMetadata] | RecordMetadata:
    """Publish message directly.

    This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
    applications or to publish messages from time to time.

    Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.

    Args:
        message:
            Message body to send.
        topic:
            Topic where the message will be published.
        key:
            A key to associate with the message. Can be used to
            determine which partition to send the message to. If partition
            is `None` (and producer's partitioner config is left as default),
            then messages with the same key will be delivered to the same
            partition (but if key is `None`, partition is chosen randomly).
            Must be type `bytes`, or be serializable to bytes via configured
            `key_serializer`
        partition:
            Specify a partition. If not set, the partition will be
            selected using the configured `partitioner`
        timestamp_ms:
            Epoch milliseconds (from Jan 1 1970 UTC) to use as
            the message timestamp. Defaults to current time.
        headers:
            Message headers to store metainformation.
        correlation_id:
            Manual message **correlation_id** setter.
            **correlation_id** is a useful option to trace messages.
        reply_to:
            Reply message topic name to send response.
        no_confirm:
            Do not wait for Kafka publish confirmation.

    Returns:
        `asyncio.Future[RecordMetadata]` if no_confirm = True.
        `RecordMetadata` if no_confirm = False.
    """
    cmd = KafkaPublishCommand(
        message,
        topic=topic,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        reply_to=reply_to,
        no_confirm=no_confirm,
        correlation_id=correlation_id or gen_cor_id(),
        _publish_type=PublishType.PUBLISH,
    )
    return await super()._basic_publish(cmd, producer=self.config.producer)

request async #

request(
    message,
    topic,
    *,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    correlation_id=None,
    timeout=0.5,
)

Send a request message and wait for a response.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: SendableMessage

topic

Topic where the message will be published.

TYPE: str

key

A key to associate with the message. Can be used to

TYPE: bytes | Any | None DEFAULT: None

partition

Specify a partition. If not set, the partition will be

TYPE: int | None DEFAULT: None

timestamp_ms

Epoch milliseconds (from Jan 1 1970 UTC) to use as

TYPE: int | None DEFAULT: None

headers

Message headers to store metainformation.

TYPE: dict[str, str] | None DEFAULT: None

correlation_id

Manual message correlation_id setter.

TYPE: str | None DEFAULT: None

timeout

Timeout to send RPC request.

TYPE: float DEFAULT: 0.5

RETURNS DESCRIPTION
KafkaMessage

The response message.

TYPE: KafkaMessage

Source code in faststream/kafka/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    topic: str,
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    timeout: float = 0.5,
) -> "KafkaMessage":
    """Send a request message and wait for a response.

    Args:
        message: Message body to send.
        topic: Topic where the message will be published.
        key: A key to associate with the message. Can be used to
        determine which partition to send the message to. If partition
        is `None` (and producer's partitioner config is left as default),
        then messages with the same key will be delivered to the same
        partition (but if key is `None`, partition is chosen randomly).
        Must be type `bytes`, or be serializable to bytes via configured
        `key_serializer`.
        partition: Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        timestamp_ms: Epoch milliseconds (from Jan 1 1970 UTC) to use as
        the message timestamp. Defaults to current time.
        headers: Message headers to store metainformation.
        correlation_id: Manual message **correlation_id** setter.
        **correlation_id** is a useful option to trace messages.
        timeout: Timeout to send RPC request.

    Returns:
        KafkaMessage: The response message.
    """
    cmd = KafkaPublishCommand(
        message,
        topic=topic,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        timeout=timeout,
        correlation_id=correlation_id or gen_cor_id(),
        _publish_type=PublishType.REQUEST,
    )

    msg: KafkaMessage = await super()._basic_request(
        cmd,
        producer=self.config.producer,
    )
    return msg

publish_batch async #

publish_batch(
    *messages: SendableMessage,
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    no_confirm: Literal[False] = False,
) -> RecordMetadata
publish_batch(
    *messages: SendableMessage,
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    no_confirm: Literal[True] = ...,
) -> Future[RecordMetadata]
publish_batch(
    *messages: SendableMessage,
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    no_confirm: bool = False,
) -> Future[RecordMetadata] | RecordMetadata
publish_batch(
    *messages,
    topic="",
    partition=None,
    timestamp_ms=None,
    headers=None,
    reply_to="",
    correlation_id=None,
    no_confirm=False,
)

Publish a message batch as a single request to broker.

PARAMETER DESCRIPTION
*messages

Messages bodies to send.

TYPE: SendableMessage DEFAULT: ()

topic

Topic where the message will be published.

TYPE: str DEFAULT: ''

partition

Specify a partition. If not set, the partition will be selected using the configured partitioner

TYPE: int | None DEFAULT: None

timestamp_ms

Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

TYPE: int | None DEFAULT: None

headers

Message headers to store metainformation.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

Reply message topic name to send response.

TYPE: str DEFAULT: ''

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

no_confirm

Do not wait for Kafka publish confirmation.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Future[RecordMetadata] | RecordMetadata

asyncio.Future[RecordMetadata] if no_confirm = True.

Future[RecordMetadata] | RecordMetadata

RecordMetadata if no_confirm = False.

Source code in faststream/kafka/broker/broker.py
async def publish_batch(
    self,
    *messages: "SendableMessage",
    topic: str = "",
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    no_confirm: bool = False,
) -> asyncio.Future[RecordMetadata] | RecordMetadata:
    """Publish a message batch as a single request to broker.

    Args:
        *messages:
            Messages bodies to send.
        topic:
            Topic where the message will be published.
        partition:
            Specify a partition. If not set, the partition will be
            selected using the configured `partitioner`
        timestamp_ms:
            Epoch milliseconds (from Jan 1 1970 UTC) to use as
            the message timestamp. Defaults to current time.
        headers:
            Message headers to store metainformation.
        reply_to:
            Reply message topic name to send response.
        correlation_id:
            Manual message **correlation_id** setter.
            **correlation_id** is a useful option to trace messages.
        no_confirm:
            Do not wait for Kafka publish confirmation.

    Returns:
        `asyncio.Future[RecordMetadata]` if no_confirm = True.
        `RecordMetadata` if no_confirm = False.
    """
    cmd = KafkaPublishCommand(
        *messages,
        topic=topic,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        reply_to=reply_to,
        no_confirm=no_confirm,
        correlation_id=correlation_id or gen_cor_id(),
        _publish_type=PublishType.PUBLISH,
    )

    return await self._basic_publish_batch(cmd, producer=self.config.producer)

ping async #

ping(timeout)
Source code in faststream/kafka/broker/broker.py
@override
async def ping(self, timeout: float | None) -> bool:
    sleep_time = (timeout or 10) / 10

    with anyio.move_on_after(timeout) as cancel_scope:
        while True:
            if cancel_scope.cancel_called:
                return False

            try:
                await self.config.admin_client.describe_cluster()

            except IncorrectState:
                return False

            except Exception:
                await anyio.sleep(sleep_time)

            else:
                return True

    return False