Bases: AioKafkaFastProducer
A fake Kafka producer for testing purposes.
This class extends AioKafkaFastProducer and is used to simulate Kafka message publishing during tests.
Source code in faststream/kafka/testing.py
| def __init__(
self,
broker: KafkaBroker,
brokers: Sequence[KafkaBroker],
) -> None:
self.broker = broker
self.brokers = brokers
default = AioKafkaParser(
msg_class=KafkaMessage,
regex=None,
)
self._parser = ParserComposition(broker._parser, default.parse_message)
self._decoder = ParserComposition(broker._decoder, default.decode_message)
self.codec = broker.config.broker_codec or DefaultCodec()
|
broker instance-attribute
brokers instance-attribute
codec instance-attribute
codec = broker_codec or DefaultCodec()
publish async
Publish a message to the Kafka broker.
Source code in faststream/kafka/testing.py
| @override
async def publish(self, cmd: "KafkaPublishCommand") -> None:
"""Publish a message to the Kafka broker."""
incoming = await build_message(
message=cmd.body,
topic=cmd.destination,
key=cmd.key,
partition=cmd.partition,
timestamp_ms=cmd.timestamp_ms,
headers=cmd.headers,
correlation_id=cmd.correlation_id,
reply_to=cmd.reply_to,
serializer=self.broker.config.fd_config._serializer,
codec=self.codec,
)
for handler in _find_handler(
self.subscribers,
cmd.destination,
cmd.partition,
):
msg_to_send = [incoming] if isinstance(handler, BatchSubscriber) else incoming
await self._execute_handler(msg_to_send, cmd.destination, handler)
|
request async
Source code in faststream/kafka/testing.py
| @override
async def request(self, cmd: "KafkaPublishCommand") -> "ConsumerRecord":
incoming = await build_message(
message=cmd.body,
topic=cmd.destination,
key=cmd.key,
partition=cmd.partition,
timestamp_ms=cmd.timestamp_ms,
headers=cmd.headers,
correlation_id=cmd.correlation_id,
serializer=self.broker.config.fd_config._serializer,
codec=self.codec,
)
for handler in _find_handler(
self.subscribers,
cmd.destination,
cmd.partition,
):
msg_to_send = [incoming] if isinstance(handler, BatchSubscriber) else incoming
with anyio.fail_after(cmd.timeout):
return await self._execute_handler(
msg_to_send,
cmd.destination,
handler,
)
raise SubscriberNotFound
|
publish_batch async
Publish a batch of messages to the Kafka broker.
Source code in faststream/kafka/testing.py
| @override
async def publish_batch(
self,
cmd: "KafkaPublishCommand",
) -> None:
"""Publish a batch of messages to the Kafka broker."""
serializer = self.broker.config.fd_config._serializer
if isinstance(self.codec, BatchCodecProto):
encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer)
else:
encoded = [
await self.codec.encode(body, serializer) for body in cmd.batch_bodies
]
for handler in _find_handler(
self.subscribers,
cmd.destination,
cmd.partition,
):
messages = [
_build_record(
body=body,
content_type=content_type,
topic=cmd.destination,
partition=cmd.partition,
timestamp_ms=cmd.timestamp_ms,
key=cmd.key_for(message_position),
headers=cmd.headers,
correlation_id=cmd.correlation_id,
reply_to=cmd.reply_to,
)
for message_position, (body, content_type) in enumerate(encoded)
]
if isinstance(handler, BatchSubscriber):
await self._execute_handler(list(messages), cmd.destination, handler)
else:
for m in messages:
await self._execute_handler(m, cmd.destination, handler)
|
connect async
connect(
producer: AIOKafkaProducer,
serializer: Optional[SerializerProto],
codec: Optional[CodecProto] = None,
) -> None
Source code in faststream/kafka/publisher/producer.py
| async def connect(
self,
producer: "AIOKafkaProducer",
serializer: Optional["SerializerProto"],
codec: Optional["CodecProto"] = None,
) -> None: ...
|
disconnect async
Source code in faststream/kafka/publisher/producer.py
| async def disconnect(self) -> None: ...
|
flush async
Source code in faststream/kafka/publisher/producer.py
| async def flush(self) -> None:
return None
|