def create_publisher(
*,
autoflush: bool,
batch: bool,
key: bytes | str | None,
topic: str,
partition: int | None,
headers: dict[str, str] | None,
reply_to: str,
# Publisher args
config: "KafkaBrokerConfig",
middlewares: Sequence["PublisherMiddleware"],
# Specification args
schema_: Any | None,
title_: str | None,
description_: str | None,
include_in_schema: bool,
) -> BatchPublisher | DefaultPublisher:
publisher_config = KafkaPublisherConfig(
key=key,
topic=topic,
partition=partition,
headers=headers,
reply_to=reply_to,
_outer_config=config,
middlewares=middlewares,
)
specification = KafkaPublisherSpecification(
_outer_config=config,
specification_config=KafkaPublisherSpecificationConfig(
topic=topic,
schema_=schema_,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
),
)
publisher: BatchPublisher | DefaultPublisher
if batch:
if key:
msg = "You can't setup `key` with batch publisher"
raise SetupError(msg)
publisher = BatchPublisher(publisher_config, specification)
publish_method = "_basic_publish_batch"
else:
publisher = DefaultPublisher(publisher_config, specification)
publish_method = "_basic_publish"
if autoflush:
default_publish: Callable[..., Awaitable[Any | None]] = getattr(
publisher,
publish_method,
)
@wraps(default_publish)
async def autoflush_wrapper(*args: Any, **kwargs: Any) -> Any | None:
result = await default_publish(*args, **kwargs)
await publisher.flush()
return result
setattr(publisher, publish_method, autoflush_wrapper)
return publisher