Skip to content

FakeProducer

faststream.kafka.testing.FakeProducer #

FakeProducer(
    broker: KafkaBroker, brokers: Sequence[KafkaBroker]
)

Bases: AioKafkaFastProducer

A fake Kafka producer for testing purposes.

This class extends AioKafkaFastProducer and is used to simulate Kafka message publishing during tests.

Source code in faststream/kafka/testing.py
def __init__(
    self,
    broker: KafkaBroker,
    brokers: Sequence[KafkaBroker],
) -> None:
    self.broker = broker
    self.brokers = brokers

    default = AioKafkaParser(
        msg_class=KafkaMessage,
        regex=None,
    )

    self._parser = ParserComposition(broker._parser, default.parse_message)
    self._decoder = ParserComposition(broker._decoder, default.decode_message)
    self.codec = broker.config.broker_codec or DefaultCodec()

broker instance-attribute #

broker = broker

brokers instance-attribute #

brokers = brokers

codec instance-attribute #

codec = broker_codec or DefaultCodec()

subscribers property #

subscribers: Iterable[LogicSubscriber[Any]]

closed property #

closed: bool

publish async #

publish(cmd: KafkaPublishCommand) -> None

Publish a message to the Kafka broker.

Source code in faststream/kafka/testing.py
@override
async def publish(self, cmd: "KafkaPublishCommand") -> None:
    """Publish a message to the Kafka broker."""
    incoming = await build_message(
        message=cmd.body,
        topic=cmd.destination,
        key=cmd.key,
        partition=cmd.partition,
        timestamp_ms=cmd.timestamp_ms,
        headers=cmd.headers,
        correlation_id=cmd.correlation_id,
        reply_to=cmd.reply_to,
        serializer=self.broker.config.fd_config._serializer,
        codec=self.codec,
    )

    for handler in _find_handler(
        self.subscribers,
        cmd.destination,
        cmd.partition,
    ):
        msg_to_send = [incoming] if isinstance(handler, BatchSubscriber) else incoming

        await self._execute_handler(msg_to_send, cmd.destination, handler)

request async #

Source code in faststream/kafka/testing.py
@override
async def request(self, cmd: "KafkaPublishCommand") -> "ConsumerRecord":
    incoming = await build_message(
        message=cmd.body,
        topic=cmd.destination,
        key=cmd.key,
        partition=cmd.partition,
        timestamp_ms=cmd.timestamp_ms,
        headers=cmd.headers,
        correlation_id=cmd.correlation_id,
        serializer=self.broker.config.fd_config._serializer,
        codec=self.codec,
    )

    for handler in _find_handler(
        self.subscribers,
        cmd.destination,
        cmd.partition,
    ):
        msg_to_send = [incoming] if isinstance(handler, BatchSubscriber) else incoming

        with anyio.fail_after(cmd.timeout):
            return await self._execute_handler(
                msg_to_send,
                cmd.destination,
                handler,
            )

    raise SubscriberNotFound

publish_batch async #

publish_batch(cmd: KafkaPublishCommand) -> None

Publish a batch of messages to the Kafka broker.

Source code in faststream/kafka/testing.py
@override
async def publish_batch(
    self,
    cmd: "KafkaPublishCommand",
) -> None:
    """Publish a batch of messages to the Kafka broker."""
    serializer = self.broker.config.fd_config._serializer

    if isinstance(self.codec, BatchCodecProto):
        encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer)
    else:
        encoded = [
            await self.codec.encode(body, serializer) for body in cmd.batch_bodies
        ]

    for handler in _find_handler(
        self.subscribers,
        cmd.destination,
        cmd.partition,
    ):
        messages = [
            _build_record(
                body=body,
                content_type=content_type,
                topic=cmd.destination,
                partition=cmd.partition,
                timestamp_ms=cmd.timestamp_ms,
                key=cmd.key_for(message_position),
                headers=cmd.headers,
                correlation_id=cmd.correlation_id,
                reply_to=cmd.reply_to,
            )
            for message_position, (body, content_type) in enumerate(encoded)
        ]

        if isinstance(handler, BatchSubscriber):
            await self._execute_handler(list(messages), cmd.destination, handler)

        else:
            for m in messages:
                await self._execute_handler(m, cmd.destination, handler)

connect async #

connect(
    producer: AIOKafkaProducer,
    serializer: Optional[SerializerProto],
    codec: Optional[CodecProto] = None,
) -> None
Source code in faststream/kafka/publisher/producer.py
async def connect(
    self,
    producer: "AIOKafkaProducer",
    serializer: Optional["SerializerProto"],
    codec: Optional["CodecProto"] = None,
) -> None: ...

disconnect async #

disconnect() -> None
Source code in faststream/kafka/publisher/producer.py
async def disconnect(self) -> None: ...

flush async #

flush() -> None
Source code in faststream/kafka/publisher/producer.py
async def flush(self) -> None:
    return None