def create_subscriber(
*,
subject: str,
queue: str,
pending_msgs_limit: int | None,
pending_bytes_limit: int | None,
# Core args
max_msgs: int,
# JS args
durable: str | None,
config: Optional["api.ConsumerConfig"],
ordered_consumer: bool,
idle_heartbeat: float | None,
flow_control: bool | None,
deliver_policy: Optional["api.DeliverPolicy"],
headers_only: bool | None,
# pull args
pull_sub: Optional["PullSub"],
kv_watch: Optional["KvWatch"],
obj_watch: Optional["ObjWatch"],
inbox_prefix: bytes,
# custom args
ack_first: bool,
max_workers: int,
stream: Optional["JStream"],
# Subscriber args
ack_policy: "AckPolicy",
no_ack: bool,
no_reply: bool,
broker_config: "NatsBrokerConfig",
# Specification information
title_: str | None,
description_: str | None,
include_in_schema: bool,
) -> "LogicSubscriber[Any]":
_validate_input_for_misconfigure(
subject=subject,
queue=queue,
pending_msgs_limit=pending_msgs_limit,
pending_bytes_limit=pending_bytes_limit,
max_msgs=max_msgs,
durable=durable,
config=config,
ordered_consumer=ordered_consumer,
idle_heartbeat=idle_heartbeat,
flow_control=flow_control,
deliver_policy=deliver_policy,
headers_only=headers_only,
pull_sub=pull_sub,
ack_policy=ack_policy,
no_ack=no_ack,
kv_watch=kv_watch,
obj_watch=obj_watch,
ack_first=ack_first,
max_workers=max_workers,
stream=stream,
)
config = config or ConsumerConfig(filter_subjects=[])
if config.durable_name is None:
config.durable_name = durable
if config.idle_heartbeat is None:
config.idle_heartbeat = idle_heartbeat
if config.headers_only is None:
config.headers_only = headers_only
if config.deliver_policy is DeliverPolicy.ALL:
config.deliver_policy = deliver_policy or DeliverPolicy.ALL
if stream:
# Both JS Subscribers
extra_options: dict[str, Any] = {
"pending_msgs_limit": pending_msgs_limit or DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
"pending_bytes_limit": pending_bytes_limit
or DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
"durable": durable,
"stream": stream.name,
}
if pull_sub is not None:
# JS Pull Subscriber
extra_options.update({"inbox_prefix": inbox_prefix})
else:
# JS Push Subscriber
if ack_first or ack_policy is AckPolicy.ACK_FIRST:
manual_ack = False
ack_policy = AckPolicy.MANUAL
else:
manual_ack = True
extra_options.update(
{
"ordered_consumer": ordered_consumer,
"idle_heartbeat": idle_heartbeat,
"flow_control": flow_control,
"deliver_policy": deliver_policy,
"headers_only": headers_only,
"manual_ack": manual_ack,
},
)
else:
# Core Subscriber
extra_options = {
"pending_msgs_limit": pending_msgs_limit or DEFAULT_SUB_PENDING_MSGS_LIMIT,
"pending_bytes_limit": pending_bytes_limit or DEFAULT_SUB_PENDING_BYTES_LIMIT,
"max_msgs": max_msgs,
}
subscriber_config = NatsSubscriberConfig(
subject=subject,
sub_config=config,
extra_options=extra_options,
no_reply=no_reply,
_outer_config=broker_config,
_ack_first=ack_first,
_ack_policy=ack_policy,
_no_ack=no_ack,
)
calls = CallsCollection[Any]()
specification_config = NatsSubscriberSpecificationConfig(
subject=subject,
queue=queue or None,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
specification = NatsSubscriberSpecification(
_outer_config=broker_config,
calls=calls,
specification_config=specification_config,
)
not_include_spec = NotIncludeSpecifation(
_outer_config=broker_config,
calls=calls,
specification_config=specification_config,
)
subscriber_options: SharedOptions = {
"config": subscriber_config,
"specification": specification,
"calls": calls,
}
if obj_watch is not None:
return ObjStoreWatchSubscriber(
**(subscriber_options | {"specification": not_include_spec}),
obj_watch=obj_watch,
)
if kv_watch is not None:
return KeyValueWatchSubscriber(
**(subscriber_options | {"specification": not_include_spec}),
kv_watch=kv_watch,
)
if stream is None:
if max_workers > 1:
return ConcurrentCoreSubscriber(
**subscriber_options,
max_workers=max_workers,
queue=queue,
)
return CoreSubscriber(
**subscriber_options,
queue=queue,
)
if max_workers > 1:
if pull_sub is not None:
return ConcurrentPullStreamSubscriber(
**subscriber_options,
max_workers=max_workers,
queue=queue,
stream=stream,
pull_sub=pull_sub,
)
return ConcurrentPushStreamSubscriber(
**subscriber_options,
max_workers=max_workers,
queue=queue,
stream=stream,
)
if pull_sub is not None:
if pull_sub.batch:
return BatchPullStreamSubscriber(
**subscriber_options,
pull_sub=pull_sub,
stream=stream,
)
return PullStreamSubscriber(
**subscriber_options,
queue=queue,
pull_sub=pull_sub,
stream=stream,
)
return PushStreamSubscriber(
**subscriber_options,
queue=queue,
stream=stream,
)