Skip to content

AsyncConfluentProducer

faststream.confluent.helpers.AsyncConfluentProducer #

AsyncConfluentProducer(*, logger, config)

An asynchronous Python Kafka client using the "confluent-kafka" package.

Source code in faststream/confluent/helpers/client.py
def __init__(
    self,
    *,
    logger: "LoggerState",
    config: config_module.ConfluentFastConfig,
) -> None:
    self.logger_state = logger

    self.config = config.producer_config
    self.producer = Producer(
        self.config,
        logger=self.logger_state.logger.logger,
    )

    self.__running = True
    self._poll_task = asyncio.create_task(self._poll_loop())

logger_state instance-attribute #

logger_state = logger

config instance-attribute #

config = producer_config

producer instance-attribute #

producer = Producer(config, logger=logger)

stop async #

stop()

Stop the Kafka producer and flush remaining messages.

Source code in faststream/confluent/helpers/client.py
async def stop(self) -> None:
    """Stop the Kafka producer and flush remaining messages."""
    if self.__running:
        self.__running = False
        if not self._poll_task.done():
            self._poll_task.cancel()
        await call_or_await(self.producer.flush)

flush async #

flush()
Source code in faststream/confluent/helpers/client.py
async def flush(self) -> None:
    await call_or_await(self.producer.flush)

send async #

send(
    topic,
    value=None,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    no_confirm=False,
)

Sends a single message to a Kafka topic.

Source code in faststream/confluent/helpers/client.py
async def send(
    self,
    topic: str,
    value: bytes | str | None = None,
    key: bytes | str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: list[tuple[str, str | bytes]] | None = None,
    no_confirm: bool = False,
) -> "asyncio.Future[Message | None] | Message | None":
    """Sends a single message to a Kafka topic."""
    kwargs: _SendKwargs = {
        "value": value,
        "key": key,
        "headers": headers,
    }

    if partition is not None:
        kwargs["partition"] = partition

    if timestamp_ms is not None:
        kwargs["timestamp"] = timestamp_ms

    loop = asyncio.get_running_loop()
    result_future: asyncio.Future[Message | None] = loop.create_future()

    def ack_callback(err: Any, msg: Message | None) -> None:
        if err or (msg is not None and (err := msg.error())):
            loop.call_soon_threadsafe(
                result_future.set_exception,
                KafkaException(err),
            )
        else:
            loop.call_soon_threadsafe(result_future.set_result, msg)

    kwargs["on_delivery"] = ack_callback

    # should be sync to prevent segfault
    self.producer.produce(topic, **kwargs)

    if no_confirm:
        return result_future
    return await result_future

create_batch #

create_batch()

Creates a batch for sending multiple messages.

Source code in faststream/confluent/helpers/client.py
def create_batch(self) -> "BatchBuilder":
    """Creates a batch for sending multiple messages."""
    return BatchBuilder()

send_batch async #

send_batch(batch, topic, *, partition, no_confirm=False)

Sends a batch of messages to a Kafka topic.

Source code in faststream/confluent/helpers/client.py
async def send_batch(
    self,
    batch: "BatchBuilder",
    topic: str,
    *,
    partition: int | None,
    no_confirm: bool = False,
) -> None:
    """Sends a batch of messages to a Kafka topic."""
    async with anyio.create_task_group() as tg:
        for msg in batch._builder:
            tg.start_soon(
                self.send,
                topic,
                msg["value"],
                msg["key"],
                partition,
                msg["timestamp_ms"],
                msg["headers"],
                no_confirm,
            )

ping async #

ping(timeout=5.0)

Implement ping using list_topics information request.

Source code in faststream/confluent/helpers/client.py
async def ping(
    self,
    timeout: float | None = 5.0,
) -> bool:
    """Implement ping using `list_topics` information request."""
    if timeout is None:
        timeout = -1

    try:
        cluster_metadata = await call_or_await(
            self.producer.list_topics,
            timeout=timeout,
        )

        return bool(cluster_metadata)

    except Exception:
        return False