Skip to content

create_subscriber

faststream.confluent.subscriber.factory.create_subscriber #

create_subscriber(
    *topics,
    partitions,
    polling_interval,
    batch,
    max_records,
    group_id,
    connection_data,
    auto_commit,
    ack_policy,
    no_ack,
    max_workers,
    no_reply,
    config,
    title_,
    description_,
    include_in_schema,
)
Source code in faststream/confluent/subscriber/factory.py
def create_subscriber(
    *topics: str,
    partitions: Sequence["TopicPartition"],
    polling_interval: float,
    batch: bool,
    max_records: int | None,
    # Kafka information
    group_id: str | None,
    connection_data: dict[str, Any],
    auto_commit: bool,
    # Subscriber args
    ack_policy: "AckPolicy",
    no_ack: bool,
    max_workers: int,
    no_reply: bool,
    config: "KafkaBrokerConfig",
    # Specification args
    title_: str | None,
    description_: str | None,
    include_in_schema: bool,
) -> BatchSubscriber | ConcurrentDefaultSubscriber | DefaultSubscriber:
    _validate_input_for_misconfigure(
        *topics,
        group_id=group_id,
        partitions=partitions,
        ack_policy=ack_policy,
        no_ack=no_ack,
        auto_commit=auto_commit,
        max_workers=max_workers,
    )

    subscriber_config = KafkaSubscriberConfig(
        topics=topics,
        partitions=partitions,
        polling_interval=polling_interval,
        group_id=group_id,
        connection_data=connection_data,
        no_reply=no_reply,
        _outer_config=config,
        _ack_policy=ack_policy,
        # deprecated options to remove in 0.7.0
        _auto_commit=auto_commit,
        _no_ack=no_ack,
    )

    calls = CallsCollection[Any]()

    specification = KafkaSubscriberSpecification(
        _outer_config=config,
        calls=calls,
        specification_config=KafkaSubscriberSpecificationConfig(
            topics=topics,
            partitions=partitions,
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        ),
    )

    if batch:
        return BatchSubscriber(
            subscriber_config,
            specification,
            calls,
            max_records=max_records,
        )

    if max_workers > 1:
        return ConcurrentDefaultSubscriber(
            subscriber_config,
            specification,
            calls,
            max_workers=max_workers,
        )

    return DefaultSubscriber(subscriber_config, specification, calls)