def create_subscriber(
*topics: str,
partitions: Sequence["TopicPartition"],
polling_interval: float,
batch: bool,
max_records: int | None,
# Kafka information
group_id: str | None,
connection_data: dict[str, Any],
auto_commit: bool,
# Subscriber args
ack_policy: "AckPolicy",
no_ack: bool,
max_workers: int,
no_reply: bool,
config: "KafkaBrokerConfig",
# Specification args
title_: str | None,
description_: str | None,
include_in_schema: bool,
) -> BatchSubscriber | ConcurrentDefaultSubscriber | DefaultSubscriber:
_validate_input_for_misconfigure(
*topics,
group_id=group_id,
partitions=partitions,
ack_policy=ack_policy,
no_ack=no_ack,
auto_commit=auto_commit,
max_workers=max_workers,
)
subscriber_config = KafkaSubscriberConfig(
topics=topics,
partitions=partitions,
polling_interval=polling_interval,
group_id=group_id,
connection_data=connection_data,
no_reply=no_reply,
_outer_config=config,
_ack_policy=ack_policy,
# deprecated options to remove in 0.7.0
_auto_commit=auto_commit,
_no_ack=no_ack,
)
calls = CallsCollection[Any]()
specification = KafkaSubscriberSpecification(
_outer_config=config,
calls=calls,
specification_config=KafkaSubscriberSpecificationConfig(
topics=topics,
partitions=partitions,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
),
)
if batch:
return BatchSubscriber(
subscriber_config,
specification,
calls,
max_records=max_records,
)
if max_workers > 1:
return ConcurrentDefaultSubscriber(
subscriber_config,
specification,
calls,
max_workers=max_workers,
)
return DefaultSubscriber(subscriber_config, specification, calls)