def __init__(
self,
*,
security: Optional["BaseSecurity"] = None,
config: ConfluentConfig | None = None,
# shared
bootstrap_servers: str | Iterable[str] = "localhost",
retry_backoff_ms: int = 100,
client_id: str | None = SERVICE_NAME,
allow_auto_create_topics: bool = True,
connections_max_idle_ms: int = 9 * 60 * 1000,
metadata_max_age_ms: int = 5 * 60 * 1000,
# producer
request_timeout_ms: int = 40 * 1000,
acks: Literal[0, 1, -1, "all"] = EMPTY,
compression_type: Literal["gzip", "snappy", "lz4", "zstd"] | None = None,
partitioner: str
| Callable[[bytes, list[Any], list[Any]], Any] = "consistent_random",
max_request_size: int = 1024 * 1024,
linger_ms: int = 0,
enable_idempotence: bool = False,
transactional_id: str | None = None,
transaction_timeout_ms: int = 60 * 1000,
) -> None:
self.config = parse_security(security) | (config or {})
shared_config: dict[str, Any] = {
"bootstrap_servers": bootstrap_servers,
"client_id": client_id,
"allow_auto_create_topics": allow_auto_create_topics,
"connections_max_idle_ms": connections_max_idle_ms,
"metadata_max_age_ms": metadata_max_age_ms,
}
# extended consumer options were passed to `broker.subscriber` method
self.raw_consumer_config = shared_config
self.raw_producer_config = shared_config | {
"request_timeout_ms": request_timeout_ms,
"partitioner": partitioner,
"retry_backoff_ms": retry_backoff_ms,
"max_request_size": max_request_size,
"linger_ms": linger_ms,
"enable_idempotence": enable_idempotence,
"transactional_id": transactional_id,
"transaction_timeout_ms": transaction_timeout_ms,
}
if compression_type:
self.raw_producer_config["compression_type"] = compression_type
if acks is EMPTY or acks == "all":
self.raw_producer_config["acks"] = -1
else:
self.raw_producer_config["acks"] = acks
self.raw_admin_config = shared_config | {
"request_timeout_ms": request_timeout_ms,
"retry_backoff_ms": retry_backoff_ms,
}