Skip to content

FakeProducer

faststream.redis.testing.FakeProducer #

FakeProducer(broker, config)

Bases: RedisFastProducer

Source code in faststream/redis/testing.py
def __init__(self, broker: RedisBroker, config: ParserConfig) -> None:
    self.broker = broker

    default = RedisPubSubParser(config)

    self._parser = ParserComposition(
        broker._parser,
        default.parse_message,
    )
    self._decoder = ParserComposition(
        broker._decoder,
        default.decode_message,
    )

broker instance-attribute #

broker = broker

serializer instance-attribute #

serializer = serializer

publish async #

publish(cmd)
Source code in faststream/redis/testing.py
@override
async def publish(self, cmd: "RedisPublishCommand") -> int | bytes:
    body = build_message(
        message=cmd.body,
        reply_to=cmd.reply_to,
        correlation_id=cmd.correlation_id or gen_cor_id(),
        headers=cmd.headers,
        message_format=cmd.message_format,
        serializer=self.broker.config.fd_config._serializer,
    )

    destination = _make_destination_kwargs(cmd)
    visitors = (ChannelVisitor(), ListVisitor(), StreamVisitor())

    for handler in self.broker.subscribers:  # pragma: no branch
        handler = cast("LogicSubscriber", handler)
        for visitor in visitors:
            if visited_ch := visitor.visit(**destination, sub=handler):
                msg = visitor.get_message(
                    visited_ch,
                    body,
                    handler,  # type: ignore[arg-type]
                )

                await self._execute_handler(msg, handler)

    return 0

request async #

request(cmd)
Source code in faststream/redis/testing.py
@override
async def request(self, cmd: "RedisPublishCommand") -> "PubSubMessage":
    body = build_message(
        message=cmd.body,
        correlation_id=cmd.correlation_id or gen_cor_id(),
        headers=cmd.headers,
        message_format=cmd.message_format,
        serializer=self.broker.config.fd_config._serializer,
    )

    destination = _make_destination_kwargs(cmd)
    visitors = (ChannelVisitor(), ListVisitor(), StreamVisitor())

    for handler in self.broker.subscribers:  # pragma: no branch
        handler = cast("LogicSubscriber", handler)
        for visitor in visitors:
            if visited_ch := visitor.visit(**destination, sub=handler):
                msg = visitor.get_message(
                    visited_ch,
                    body,
                    handler,  # type: ignore[arg-type]
                )

                with anyio.fail_after(cmd.timeout):
                    return await self._execute_handler(msg, handler)

    raise SubscriberNotFound

publish_batch async #

publish_batch(cmd)
Source code in faststream/redis/testing.py
@override
async def publish_batch(self, cmd: "RedisPublishCommand") -> int:
    data_to_send = [
        build_message(
            m,
            correlation_id=cmd.correlation_id or gen_cor_id(),
            headers=cmd.headers,
            message_format=cmd.message_format,
            serializer=self.broker.config.fd_config._serializer,
        )
        for m in cmd.batch_bodies
    ]

    visitor = ListVisitor()
    for handler in self.broker.subscribers:  # pragma: no branch
        handler = cast("LogicSubscriber", handler)
        if visitor.visit(list=cmd.destination, sub=handler):
            casted_handler = cast("_ListHandlerMixin", handler)

            if casted_handler.list_sub.batch:
                msg = visitor.get_message(
                    channel=cmd.destination,
                    body=data_to_send,
                    sub=casted_handler,
                )

                await self._execute_handler(msg, handler)

    return 0

connect #

connect(serializer=None)
Source code in faststream/redis/publisher/producer.py
def connect(self, serializer: Optional["SerializerProto"] = None) -> None:
    self.serializer = serializer