Skip to content

AsyncConfluentFastProducerImpl

faststream.confluent.publisher.producer.AsyncConfluentFastProducerImpl #

AsyncConfluentFastProducerImpl(parser, decoder)

Bases: AsyncConfluentFastProducer

A class to represent Kafka producer.

Source code in faststream/confluent/publisher/producer.py
def __init__(
    self,
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    self._producer: ProducerState = EmptyProducerState()
    self.serializer: SerializerProto | None = None

    # NOTE: register default parser to be compatible with request
    default = AsyncConfluentParser()
    self._parser = ParserComposition(parser, default.parse_message)
    self._decoder = ParserComposition(decoder, default.decode_message)

serializer instance-attribute #

serializer = None

connect #

connect(producer, serializer)
Source code in faststream/confluent/publisher/producer.py
def connect(
    self,
    producer: "AsyncConfluentProducer",
    serializer: Optional["SerializerProto"],
) -> None:
    self._producer = RealProducer(producer)
    self.serializer = serializer

disconnect async #

disconnect()
Source code in faststream/confluent/publisher/producer.py
async def disconnect(self) -> None:
    await self._producer.stop()
    self._producer = EmptyProducerState()

ping async #

ping(timeout)
Source code in faststream/confluent/publisher/producer.py
async def ping(self, timeout: float) -> bool:
    return await self._producer.ping(timeout=timeout)

flush async #

flush()
Source code in faststream/confluent/publisher/producer.py
async def flush(self) -> None:
    await self._producer.flush()

publish async #

publish(cmd)

Publish a message to a topic.

Source code in faststream/confluent/publisher/producer.py
@override
async def publish(
    self,
    cmd: "KafkaPublishCommand",
) -> "asyncio.Future[Message | None] | Message | None":
    """Publish a message to a topic."""
    message, content_type = encode_message(cmd.body, serializer=self.serializer)

    headers_to_send = {
        "content-type": content_type or "",
        **cmd.headers_to_publish(),
    }

    return await self._producer.producer.send(
        topic=cmd.destination,
        value=message,
        key=cmd.key,
        partition=cmd.partition,
        timestamp_ms=cmd.timestamp_ms,
        headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
        no_confirm=cmd.no_confirm,
    )

publish_batch async #

publish_batch(cmd)

Publish a batch of messages to a topic.

Source code in faststream/confluent/publisher/producer.py
@override
async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
    """Publish a batch of messages to a topic."""
    batch = self._producer.producer.create_batch()

    headers_to_send = cmd.headers_to_publish()

    for msg in cmd.batch_bodies:
        message, content_type = encode_message(msg, serializer=self.serializer)

        if content_type:
            final_headers = {
                "content-type": content_type,
                **headers_to_send,
            }
        else:
            final_headers = headers_to_send.copy()

        batch.append(
            key=None,
            value=message,
            timestamp=cmd.timestamp_ms,
            headers=[(i, j.encode()) for i, j in final_headers.items()],
        )

    await self._producer.producer.send_batch(
        batch,
        cmd.destination,
        partition=cmd.partition,
        no_confirm=cmd.no_confirm,
    )

request async #

request(cmd)
Source code in faststream/confluent/publisher/producer.py
@override
async def request(self, cmd: "KafkaPublishCommand") -> Any:
    msg = "Kafka doesn't support `request` method without test client."
    raise FeatureNotSupportedException(msg)