def create_publisher(
*,
channel: Union["PubSub", str, None],
list: Union["ListSub", str, None],
stream: Union["StreamSub", str, None],
headers: dict[str, Any] | None,
reply_to: str,
config: "RedisBrokerConfig",
message_format: type["MessageFormat"] | None,
# AsyncAPI args
title_: str | None,
description_: str | None,
schema_: Any | None,
include_in_schema: bool,
) -> PublisherType:
validate_options(channel=channel, list=list, stream=stream)
publisher_config = RedisPublisherConfig(
reply_to=reply_to,
headers=headers,
_message_format=message_format,
_outer_config=config,
)
specification_config = RedisPublisherSpecificationConfig(
schema_=schema_,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
specification: RedisPublisherSpecification
if channel_sub := PubSub.validate(channel):
specification = ChannelPublisherSpecification(
config,
specification_config,
channel_sub,
)
return ChannelPublisher(publisher_config, specification, channel=channel_sub)
if stream_sub := StreamSub.validate(stream):
specification = StreamPublisherSpecification(
config,
specification_config,
stream_sub,
)
return StreamPublisher(publisher_config, specification, stream=stream_sub)
if list_sub := ListSub.validate(list):
specification = ListPublisherSpecification(config, specification_config, list_sub)
if list_sub.batch:
return ListBatchPublisher(publisher_config, specification, list=list_sub)
return ListPublisher(publisher_config, specification, list=list_sub)
raise SetupError(INCORRECT_SETUP_MSG)