Skip to content

KafkaBrokerConfig

faststream.kafka.configs.broker.KafkaBrokerConfig dataclass #

KafkaBrokerConfig(
    *,
    prefix: str = "",
    include_in_schema: bool | None = True,
    broker_middlewares: Sequence[
        BrokerMiddleware[Any]
    ] = (),
    broker_parser: Optional[CustomCallable] = None,
    broker_decoder: Optional[CustomCallable] = None,
    producer: AioKafkaFastProducer = FakeAioKafkaFastProducer(),
    logger: LoggerState = LoggerState(),
    fd_config: FastDependsConfig = FastDependsConfig(),
    broker_dependencies: Iterable[Dependant] = (),
    graceful_timeout: float | None = None,
    extra_context: dict[str, Any] = dict(),
    builder: Callable[..., AIOKafkaConsumer] = lambda: None,
    client_id: str | None = SERVICE_NAME,
    _admin_client: Optional[AIOKafkaAdminClient] = None,
)

Bases: BrokerConfig

producer class-attribute instance-attribute #

producer: AioKafkaFastProducer = field(
    default_factory=FakeAioKafkaFastProducer
)

builder class-attribute instance-attribute #

builder: Callable[..., AIOKafkaConsumer] = lambda: None

client_id class-attribute instance-attribute #

client_id: str | None = SERVICE_NAME

admin_client property #

admin_client: AIOKafkaAdminClient

prefix class-attribute instance-attribute #

prefix: str = ''

include_in_schema class-attribute instance-attribute #

include_in_schema: bool | None = True

broker_middlewares class-attribute instance-attribute #

broker_middlewares: Sequence[BrokerMiddleware[Any]] = ()

broker_parser class-attribute instance-attribute #

broker_parser: Optional[CustomCallable] = None

broker_decoder class-attribute instance-attribute #

broker_decoder: Optional[CustomCallable] = None

logger class-attribute instance-attribute #

logger: LoggerState = field(default_factory=LoggerState)

fd_config class-attribute instance-attribute #

fd_config: FastDependsConfig = field(
    default_factory=FastDependsConfig
)

broker_dependencies class-attribute instance-attribute #

broker_dependencies: Iterable[Dependant] = ()

graceful_timeout class-attribute instance-attribute #

graceful_timeout: float | None = None

extra_context class-attribute instance-attribute #

extra_context: dict[str, Any] = field(default_factory=dict)

connect async #

connect(**connection_kwargs: Any) -> None
Source code in faststream/kafka/configs/broker.py
async def connect(self, **connection_kwargs: Any) -> "None":
    producer = aiokafka.AIOKafkaProducer(**connection_kwargs)
    await self.producer.connect(producer, serializer=self.fd_config._serializer)

    admin_options, _ = filter_by_dict(
        AdminClientConnectionParams,
        connection_kwargs,
    )

    self._admin_client = aiokafka.admin.client.AIOKafkaAdminClient(**admin_options)
    await self._admin_client.start()

    consumer_options, _ = filter_by_dict(
        ConsumerConnectionParams,
        connection_kwargs,
    )
    self.builder = partial(aiokafka.AIOKafkaConsumer, **consumer_options)

disconnect async #

disconnect() -> None
Source code in faststream/kafka/configs/broker.py
async def disconnect(self) -> "None":
    if self._admin_client is not None:
        await self._admin_client.close()
        self._admin_client = None

    await self.producer.disconnect()

add_middleware #

add_middleware(middleware: BrokerMiddleware[Any]) -> None
Source code in faststream/_internal/configs/broker.py
def add_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
    self.broker_middlewares = (*self.broker_middlewares, middleware)

insert_middleware #

insert_middleware(
    middleware: BrokerMiddleware[Any],
) -> None
Source code in faststream/_internal/configs/broker.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
    self.broker_middlewares = (middleware, *self.broker_middlewares)