Bases: AioKafkaFastProducer
A class to represent Kafka producer.
Source code in faststream/kafka/publisher/producer.py
| def __init__(
self,
parser: Optional["CustomCallable"],
decoder: Optional["CustomCallable"],
) -> None:
self._producer: ProducerState = EmptyProducerState()
self.serializer: SerializerProto | None = None
# NOTE: register default parser to be compatible with request
default = AioKafkaParser(msg_class=KafkaMessage, regex=None)
self._parser = ParserComposition(parser, default.parse_message)
self._decoder = ParserComposition(decoder, default.decode_message)
|
serializer
instance-attribute
connect
async
connect(producer, serializer)
Source code in faststream/kafka/publisher/producer.py
| async def connect(
self,
producer: "AIOKafkaProducer",
serializer: Optional["SerializerProto"],
) -> None:
self.serializer = serializer
await producer.start()
self._producer = RealProducer(producer)
|
disconnect
async
Source code in faststream/kafka/publisher/producer.py
| async def disconnect(self) -> None:
await self._producer.stop()
self._producer = EmptyProducerState()
|
flush
async
Source code in faststream/kafka/publisher/producer.py
| async def flush(self) -> None:
await self._producer.flush()
|
publish
async
Publish a message to a topic.
Source code in faststream/kafka/publisher/producer.py
| @override
async def publish(
self,
cmd: "KafkaPublishCommand",
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Publish a message to a topic."""
message, content_type = encode_message(cmd.body, serializer=self.serializer)
headers_to_send = {
"content-type": content_type or "",
**cmd.headers_to_publish(),
}
send_future = await self._producer.producer.send(
topic=cmd.destination,
value=message,
key=cmd.key,
partition=cmd.partition,
timestamp_ms=cmd.timestamp_ms,
headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
)
if not cmd.no_confirm:
return await send_future
return send_future
|
publish_batch
async
Publish a batch of messages to a topic.
Source code in faststream/kafka/publisher/producer.py
| @override
async def publish_batch(
self,
cmd: "KafkaPublishCommand",
) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
"""Publish a batch of messages to a topic."""
batch = self._producer.producer.create_batch()
headers_to_send = cmd.headers_to_publish()
for message_position, body in enumerate(cmd.batch_bodies):
message, content_type = encode_message(body, serializer=self.serializer)
if content_type:
final_headers = {
"content-type": content_type,
**headers_to_send,
}
else:
final_headers = headers_to_send.copy()
metadata = batch.append(
key=None,
value=message,
timestamp=cmd.timestamp_ms,
headers=[(i, j.encode()) for i, j in final_headers.items()],
)
if metadata is None:
raise BatchBufferOverflowException(message_position=message_position)
send_future = await self._producer.producer.send_batch(
batch,
cmd.destination,
partition=cmd.partition,
)
if not cmd.no_confirm:
return await send_future
return send_future
|
request
async
Source code in faststream/kafka/publisher/producer.py
| async def request(self, cmd: "KafkaPublishCommand") -> Any:
msg = "Kafka doesn't support `request` method without test client."
raise FeatureNotSupportedException(msg)
|