Bases: AioKafkaFastProducer
A fake Kafka producer for testing purposes.
This class extends AioKafkaFastProducer and is used to simulate Kafka message publishing during tests.
Source code in faststream/kafka/testing.py
| def __init__(self, broker: KafkaBroker) -> None:
self.broker = broker
default = AioKafkaParser(
msg_class=KafkaMessage,
regex=None,
)
self._parser = ParserComposition(broker._parser, default.parse_message)
self._decoder = ParserComposition(broker._decoder, default.decode_message)
|
broker
instance-attribute
publish
async
Publish a message to the Kafka broker.
Source code in faststream/kafka/testing.py
| @override
async def publish(self, cmd: "KafkaPublishCommand") -> None:
"""Publish a message to the Kafka broker."""
incoming = build_message(
message=cmd.body,
topic=cmd.destination,
key=cmd.key,
partition=cmd.partition,
timestamp_ms=cmd.timestamp_ms,
headers=cmd.headers,
correlation_id=cmd.correlation_id,
reply_to=cmd.reply_to,
serializer=self.broker.config.fd_config._serializer,
)
for handler in _find_handler(
cast("list[LogicSubscriber[Any]]", self.broker.subscribers),
cmd.destination,
cmd.partition,
):
msg_to_send = [incoming] if isinstance(handler, BatchSubscriber) else incoming
await self._execute_handler(msg_to_send, cmd.destination, handler)
|
request
async
Source code in faststream/kafka/testing.py
| @override
async def request(self, cmd: "KafkaPublishCommand") -> "ConsumerRecord":
incoming = build_message(
message=cmd.body,
topic=cmd.destination,
key=cmd.key,
partition=cmd.partition,
timestamp_ms=cmd.timestamp_ms,
headers=cmd.headers,
correlation_id=cmd.correlation_id,
serializer=self.broker.config.fd_config._serializer,
)
for handler in _find_handler(
cast("list[LogicSubscriber[Any]]", self.broker.subscribers),
cmd.destination,
cmd.partition,
):
msg_to_send = [incoming] if isinstance(handler, BatchSubscriber) else incoming
with anyio.fail_after(cmd.timeout):
return await self._execute_handler(
msg_to_send,
cmd.destination,
handler,
)
raise SubscriberNotFound
|
publish_batch
async
Publish a batch of messages to the Kafka broker.
Source code in faststream/kafka/testing.py
| @override
async def publish_batch(
self,
cmd: "KafkaPublishCommand",
) -> None:
"""Publish a batch of messages to the Kafka broker."""
for handler in _find_handler(
cast("list[LogicSubscriber[Any]]", self.broker.subscribers),
cmd.destination,
cmd.partition,
):
messages = (
build_message(
message=message,
topic=cmd.destination,
partition=cmd.partition,
timestamp_ms=cmd.timestamp_ms,
headers=cmd.headers,
correlation_id=cmd.correlation_id,
reply_to=cmd.reply_to,
serializer=self.broker.config.fd_config._serializer,
)
for message in cmd.batch_bodies
)
if isinstance(handler, BatchSubscriber):
await self._execute_handler(list(messages), cmd.destination, handler)
else:
for m in messages:
await self._execute_handler(m, cmd.destination, handler)
|
connect
async
connect(producer, serializer)
Source code in faststream/kafka/publisher/producer.py
| async def connect(
self,
producer: "AIOKafkaProducer",
serializer: Optional["SerializerProto"],
) -> None: ...
|
disconnect
async
Source code in faststream/kafka/publisher/producer.py
| async def disconnect(self) -> None: ...
|
flush
async
Source code in faststream/kafka/publisher/producer.py
| async def flush(self) -> None:
return None
|