def create_subscriber(
*topics: str,
batch: bool,
batch_timeout_ms: int,
max_records: int | None,
# Kafka information
group_id: str | None,
listener: Optional["ConsumerRebalanceListener"],
pattern: str | None,
connection_args: dict[str, Any],
partitions: Collection["TopicPartition"],
auto_commit: bool,
# Subscriber args
ack_policy: "AckPolicy",
max_workers: int,
no_ack: bool,
no_reply: bool,
config: "KafkaBrokerConfig",
# Specification args
title_: str | None,
description_: str | None,
include_in_schema: bool,
) -> Union[
"DefaultSubscriber",
"BatchSubscriber",
"ConcurrentDefaultSubscriber",
"ConcurrentBetweenPartitionsSubscriber",
]:
_validate_input_for_misconfigure(
*topics,
pattern=pattern,
partitions=partitions,
ack_policy=ack_policy,
no_ack=no_ack,
auto_commit=auto_commit,
max_workers=max_workers,
)
subscriber_config = KafkaSubscriberConfig(
topics=topics,
partitions=partitions,
connection_args=connection_args,
group_id=group_id,
listener=listener,
pattern=pattern,
no_reply=no_reply,
_outer_config=config,
_ack_policy=ack_policy,
# deprecated options to remove in 0.7.0
_auto_commit=auto_commit,
_no_ack=no_ack,
)
calls = CallsCollection[Any]()
specification = KafkaSubscriberSpecification(
_outer_config=config,
calls=calls,
specification_config=KafkaSubscriberSpecificationConfig(
topics=topics,
partitions=partitions,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
),
)
if batch:
return BatchSubscriber(
subscriber_config,
specification,
calls,
batch_timeout_ms=batch_timeout_ms,
max_records=max_records,
)
if max_workers > 1:
if subscriber_config.ack_first:
return ConcurrentDefaultSubscriber(
subscriber_config,
specification,
calls,
max_workers=max_workers,
)
subscriber_config.topics = (topics[0],)
return ConcurrentBetweenPartitionsSubscriber(
subscriber_config,
specification,
calls,
max_workers=max_workers,
)
return DefaultSubscriber(subscriber_config, specification, calls)