Skip to content

AioKafkaFastProducerImpl

faststream.kafka.publisher.producer.AioKafkaFastProducerImpl #

AioKafkaFastProducerImpl(parser, decoder)

Bases: AioKafkaFastProducer

A class to represent Kafka producer.

Source code in faststream/kafka/publisher/producer.py
def __init__(
    self,
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    self._producer: ProducerState = EmptyProducerState()
    self.serializer: SerializerProto | None = None

    # NOTE: register default parser to be compatible with request
    default = AioKafkaParser(msg_class=KafkaMessage, regex=None)
    self._parser = ParserComposition(parser, default.parse_message)
    self._decoder = ParserComposition(decoder, default.decode_message)

serializer instance-attribute #

serializer = None

closed property #

closed

connect async #

connect(producer, serializer)
Source code in faststream/kafka/publisher/producer.py
async def connect(
    self,
    producer: "AIOKafkaProducer",
    serializer: Optional["SerializerProto"],
) -> None:
    self.serializer = serializer
    await producer.start()
    self._producer = RealProducer(producer)

disconnect async #

disconnect()
Source code in faststream/kafka/publisher/producer.py
async def disconnect(self) -> None:
    await self._producer.stop()
    self._producer = EmptyProducerState()

flush async #

flush()
Source code in faststream/kafka/publisher/producer.py
async def flush(self) -> None:
    await self._producer.flush()

publish async #

publish(cmd)

Publish a message to a topic.

Source code in faststream/kafka/publisher/producer.py
@override
async def publish(
    self,
    cmd: "KafkaPublishCommand",
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
    """Publish a message to a topic."""
    message, content_type = encode_message(cmd.body, serializer=self.serializer)

    headers_to_send = {
        "content-type": content_type or "",
        **cmd.headers_to_publish(),
    }

    send_future = await self._producer.producer.send(
        topic=cmd.destination,
        value=message,
        key=cmd.key,
        partition=cmd.partition,
        timestamp_ms=cmd.timestamp_ms,
        headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
    )

    if not cmd.no_confirm:
        return await send_future
    return send_future

publish_batch async #

publish_batch(cmd)

Publish a batch of messages to a topic.

Source code in faststream/kafka/publisher/producer.py
@override
async def publish_batch(
    self,
    cmd: "KafkaPublishCommand",
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
    """Publish a batch of messages to a topic."""
    batch = self._producer.producer.create_batch()

    headers_to_send = cmd.headers_to_publish()

    for message_position, body in enumerate(cmd.batch_bodies):
        message, content_type = encode_message(body, serializer=self.serializer)

        if content_type:
            final_headers = {
                "content-type": content_type,
                **headers_to_send,
            }
        else:
            final_headers = headers_to_send.copy()

        metadata = batch.append(
            key=None,
            value=message,
            timestamp=cmd.timestamp_ms,
            headers=[(i, j.encode()) for i, j in final_headers.items()],
        )
        if metadata is None:
            raise BatchBufferOverflowException(message_position=message_position)

    send_future = await self._producer.producer.send_batch(
        batch,
        cmd.destination,
        partition=cmd.partition,
    )
    if not cmd.no_confirm:
        return await send_future
    return send_future

request async #

request(cmd)
Source code in faststream/kafka/publisher/producer.py
async def request(self, cmd: "KafkaPublishCommand") -> Any:
    msg = "Kafka doesn't support `request` method without test client."
    raise FeatureNotSupportedException(msg)