def create_subscriber(
*,
channel: Union["PubSub", str, None],
list: Union["ListSub", str, None],
stream: Union["StreamSub", str, None],
# Subscriber args
ack_policy: "AckPolicy",
no_ack: bool,
config: "RedisBrokerConfig",
no_reply: bool = False,
message_format: type["MessageFormat"] | None,
# AsyncAPI args
title_: str | None = None,
description_: str | None = None,
include_in_schema: bool = True,
max_workers: int = 1,
) -> SubscriberType:
_validate_input_for_misconfigure(
channel=channel,
list=list,
stream=stream,
ack_policy=ack_policy,
no_ack=no_ack,
max_workers=max_workers,
message_format=message_format,
)
subscriber_config = RedisSubscriberConfig(
channel_sub=PubSub.validate(channel),
list_sub=ListSub.validate(list),
stream_sub=StreamSub.validate(stream),
no_reply=no_reply,
_outer_config=config,
_ack_policy=ack_policy,
_message_format=message_format,
)
specification_config = RedisSubscriberSpecificationConfig(
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
calls = CallsCollection[Any]()
specification: RedisSubscriberSpecification
if subscriber_config.channel_sub:
specification = ChannelSubscriberSpecification(
config,
specification_config,
calls,
channel=subscriber_config.channel_sub,
)
subscriber_config._ack_policy = AckPolicy.MANUAL
if max_workers > 1:
return ChannelConcurrentSubscriber(
subscriber_config,
specification,
calls,
max_workers=max_workers,
)
return ChannelSubscriber(subscriber_config, specification, calls)
if subscriber_config.stream_sub:
specification = StreamSubscriberSpecification(
config,
specification_config,
calls,
stream_sub=subscriber_config.stream_sub,
)
if subscriber_config.stream_sub.batch:
# TODO: raise warning if max_workers in `_validate_input_for_misconfigure`
return StreamBatchSubscriber(subscriber_config, specification, calls)
if max_workers > 1:
return StreamConcurrentSubscriber(
subscriber_config,
specification,
calls,
max_workers=max_workers,
)
return StreamSubscriber(subscriber_config, specification, calls)
if subscriber_config.list_sub:
specification = ListSubscriberSpecification(
config,
specification_config,
calls,
list_sub=subscriber_config.list_sub,
)
if subscriber_config.list_sub.batch:
# TODO: raise warning if max_workers in `_validate_input_for_misconfigure`
return ListBatchSubscriber(subscriber_config, specification, calls)
if max_workers > 1:
return ListConcurrentSubscriber(
subscriber_config,
specification,
calls,
max_workers=max_workers,
)
return ListSubscriber(subscriber_config, specification, calls)
raise SetupError(INCORRECT_SETUP_MSG)