Skip to content

create_subscriber

faststream.kafka.subscriber.factory.create_subscriber #

create_subscriber(
    *topics,
    batch,
    batch_timeout_ms,
    max_records,
    group_id,
    listener,
    pattern,
    connection_args,
    partitions,
    auto_commit,
    ack_policy,
    max_workers,
    no_ack,
    no_reply,
    config,
    title_,
    description_,
    include_in_schema,
)
Source code in faststream/kafka/subscriber/factory.py
def create_subscriber(
    *topics: str,
    batch: bool,
    batch_timeout_ms: int,
    max_records: int | None,
    # Kafka information
    group_id: str | None,
    listener: Optional["ConsumerRebalanceListener"],
    pattern: str | None,
    connection_args: dict[str, Any],
    partitions: Collection["TopicPartition"],
    auto_commit: bool,
    # Subscriber args
    ack_policy: "AckPolicy",
    max_workers: int,
    no_ack: bool,
    no_reply: bool,
    config: "KafkaBrokerConfig",
    # Specification args
    title_: str | None,
    description_: str | None,
    include_in_schema: bool,
) -> Union[
    "DefaultSubscriber",
    "BatchSubscriber",
    "ConcurrentDefaultSubscriber",
    "ConcurrentBetweenPartitionsSubscriber",
]:
    _validate_input_for_misconfigure(
        *topics,
        pattern=pattern,
        partitions=partitions,
        ack_policy=ack_policy,
        no_ack=no_ack,
        auto_commit=auto_commit,
        max_workers=max_workers,
    )

    subscriber_config = KafkaSubscriberConfig(
        topics=topics,
        partitions=partitions,
        connection_args=connection_args,
        group_id=group_id,
        listener=listener,
        pattern=pattern,
        no_reply=no_reply,
        _outer_config=config,
        _ack_policy=ack_policy,
        # deprecated options to remove in 0.7.0
        _auto_commit=auto_commit,
        _no_ack=no_ack,
    )

    calls = CallsCollection[Any]()

    specification = KafkaSubscriberSpecification(
        _outer_config=config,
        calls=calls,
        specification_config=KafkaSubscriberSpecificationConfig(
            topics=topics,
            partitions=partitions,
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        ),
    )

    if batch:
        return BatchSubscriber(
            subscriber_config,
            specification,
            calls,
            batch_timeout_ms=batch_timeout_ms,
            max_records=max_records,
        )

    if max_workers > 1:
        if subscriber_config.ack_first:
            return ConcurrentDefaultSubscriber(
                subscriber_config,
                specification,
                calls,
                max_workers=max_workers,
            )

        subscriber_config.topics = (topics[0],)
        return ConcurrentBetweenPartitionsSubscriber(
            subscriber_config,
            specification,
            calls,
            max_workers=max_workers,
        )

    return DefaultSubscriber(subscriber_config, specification, calls)