Skip to content

FakeProducer

faststream.confluent.testing.FakeProducer #

FakeProducer(broker)

Bases: AsyncConfluentFastProducer

A fake Kafka producer for testing purposes.

This class extends AsyncConfluentFastProducer and is used to simulate Kafka message publishing during tests.

Source code in faststream/confluent/testing.py
def __init__(self, broker: KafkaBroker) -> None:
    self.broker = broker

    default = AsyncConfluentParser()
    self._parser = ParserComposition(broker._parser, default.parse_message)
    self._decoder = ParserComposition(broker._decoder, default.decode_message)

broker instance-attribute #

broker = broker

ping async #

ping(timeout)
Source code in faststream/confluent/testing.py
async def ping(self, timeout: float) -> bool:
    return True

publish async #

publish(cmd)

Publish a message to the Kafka broker.

Source code in faststream/confluent/testing.py
@override
async def publish(self, cmd: "KafkaPublishCommand") -> None:
    """Publish a message to the Kafka broker."""
    incoming = build_message(
        message=cmd.body,
        topic=cmd.destination,
        key=cmd.key,
        partition=cmd.partition,
        timestamp_ms=cmd.timestamp_ms,
        headers=cmd.headers,
        correlation_id=cmd.correlation_id,
        reply_to=cmd.reply_to,
        serializer=self.broker.config.fd_config._serializer,
    )

    for handler in _find_handler(
        cast("Iterable[LogicSubscriber[Any]]", self.broker.subscribers),
        cmd.destination,
        cmd.partition,
    ):
        msg_to_send = [incoming] if isinstance(handler, BatchSubscriber) else incoming

        await self._execute_handler(msg_to_send, cmd.destination, handler)

publish_batch async #

publish_batch(cmd)

Publish a batch of messages to the Kafka broker.

Source code in faststream/confluent/testing.py
@override
async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
    """Publish a batch of messages to the Kafka broker."""
    for handler in _find_handler(
        cast("Iterable[LogicSubscriber[Any]]", self.broker.subscribers),
        cmd.destination,
        cmd.partition,
    ):
        messages = (
            build_message(
                message=message,
                topic=cmd.destination,
                partition=cmd.partition,
                timestamp_ms=cmd.timestamp_ms,
                headers=cmd.headers,
                correlation_id=cmd.correlation_id,
                reply_to=cmd.reply_to,
                serializer=self.broker.config.fd_config._serializer,
            )
            for message in cmd.batch_bodies
        )

        if isinstance(handler, BatchSubscriber):
            await self._execute_handler(list(messages), cmd.destination, handler)

        else:
            for m in messages:
                await self._execute_handler(m, cmd.destination, handler)

request async #

request(cmd)
Source code in faststream/confluent/testing.py
@override
async def request(self, cmd: "KafkaPublishCommand") -> "MockConfluentMessage":
    incoming = build_message(
        message=cmd.body,
        topic=cmd.destination,
        key=cmd.key,
        partition=cmd.partition,
        timestamp_ms=cmd.timestamp_ms,
        headers=cmd.headers,
        correlation_id=cmd.correlation_id,
        serializer=self.broker.config.fd_config._serializer,
    )

    for handler in _find_handler(
        cast("Iterable[LogicSubscriber[Any]]", self.broker.subscribers),
        cmd.destination,
        cmd.partition,
    ):
        msg_to_send = [incoming] if isinstance(handler, BatchSubscriber) else incoming

        with anyio.fail_after(cmd.timeout):
            return await self._execute_handler(
                msg_to_send,
                cmd.destination,
                handler,
            )

    raise SubscriberNotFound

connect #

connect(producer, serializer)
Source code in faststream/confluent/publisher/producer.py
def connect(
    self,
    producer: "AsyncConfluentProducer",
    serializer: Optional["SerializerProto"],
) -> None: ...

disconnect async #

disconnect()
Source code in faststream/confluent/publisher/producer.py
async def disconnect(self) -> None:
    return None

flush async #

flush()
Source code in faststream/confluent/publisher/producer.py
async def flush(self) -> None:
    return None