Skip to content

ConfluentFastConfig

faststream.confluent.helpers.ConfluentFastConfig #

ConfluentFastConfig(
    *,
    security=None,
    config=None,
    bootstrap_servers="localhost",
    retry_backoff_ms=100,
    client_id=SERVICE_NAME,
    allow_auto_create_topics=True,
    connections_max_idle_ms=9 * 60 * 1000,
    metadata_max_age_ms=5 * 60 * 1000,
    request_timeout_ms=40 * 1000,
    acks=EMPTY,
    compression_type=None,
    partitioner="consistent_random",
    max_request_size=1024 * 1024,
    linger_ms=0,
    enable_idempotence=False,
    transactional_id=None,
    transaction_timeout_ms=60 * 1000,
)
Source code in faststream/confluent/helpers/config.py
def __init__(
    self,
    *,
    security: Optional["BaseSecurity"] = None,
    config: ConfluentConfig | None = None,
    # shared
    bootstrap_servers: str | Iterable[str] = "localhost",
    retry_backoff_ms: int = 100,
    client_id: str | None = SERVICE_NAME,
    allow_auto_create_topics: bool = True,
    connections_max_idle_ms: int = 9 * 60 * 1000,
    metadata_max_age_ms: int = 5 * 60 * 1000,
    # producer
    request_timeout_ms: int = 40 * 1000,
    acks: Literal[0, 1, -1, "all"] = EMPTY,
    compression_type: Literal["gzip", "snappy", "lz4", "zstd"] | None = None,
    partitioner: str
    | Callable[[bytes, list[Any], list[Any]], Any] = "consistent_random",
    max_request_size: int = 1024 * 1024,
    linger_ms: int = 0,
    enable_idempotence: bool = False,
    transactional_id: str | None = None,
    transaction_timeout_ms: int = 60 * 1000,
) -> None:
    self.config = parse_security(security) | (config or {})

    shared_config: dict[str, Any] = {
        "bootstrap_servers": bootstrap_servers,
        "client_id": client_id,
        "allow_auto_create_topics": allow_auto_create_topics,
        "connections_max_idle_ms": connections_max_idle_ms,
        "metadata_max_age_ms": metadata_max_age_ms,
    }

    # extended consumer options were passed to `broker.subscriber` method
    self.raw_consumer_config = shared_config

    self.raw_producer_config = shared_config | {
        "request_timeout_ms": request_timeout_ms,
        "partitioner": partitioner,
        "retry_backoff_ms": retry_backoff_ms,
        "max_request_size": max_request_size,
        "linger_ms": linger_ms,
        "enable_idempotence": enable_idempotence,
        "transactional_id": transactional_id,
        "transaction_timeout_ms": transaction_timeout_ms,
    }

    if compression_type:
        self.raw_producer_config["compression_type"] = compression_type

    if acks is EMPTY or acks == "all":
        self.raw_producer_config["acks"] = -1
    else:
        self.raw_producer_config["acks"] = acks

    self.raw_admin_config = shared_config | {
        "request_timeout_ms": request_timeout_ms,
        "retry_backoff_ms": retry_backoff_ms,
    }

config instance-attribute #

config = parse_security(security) | (config or {})

raw_consumer_config instance-attribute #

raw_consumer_config = shared_config

raw_producer_config instance-attribute #

raw_producer_config = shared_config | {
    "request_timeout_ms": request_timeout_ms,
    "partitioner": partitioner,
    "retry_backoff_ms": retry_backoff_ms,
    "max_request_size": max_request_size,
    "linger_ms": linger_ms,
    "enable_idempotence": enable_idempotence,
    "transactional_id": transactional_id,
    "transaction_timeout_ms": transaction_timeout_ms,
}

raw_admin_config instance-attribute #

raw_admin_config = shared_config | {
    "request_timeout_ms": request_timeout_ms,
    "retry_backoff_ms": retry_backoff_ms,
}

consumer_config property #

consumer_config

producer_config property #

producer_config

admin_config property #

admin_config