Skip to content

AioPikaFastProducerImpl

faststream.rabbit.publisher.producer.AioPikaFastProducerImpl #

AioPikaFastProducerImpl(*, declarer, parser, decoder)

Bases: AioPikaFastProducer

A class for fast producing messages using aio-pika.

Source code in faststream/rabbit/publisher/producer.py
def __init__(
    self,
    *,
    declarer: "RabbitDeclarer",
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
) -> None:
    self.declarer = declarer

    self.__lock: LockState = LockUnset()
    self.serializer: SerializerProto | None = None

    default_parser = AioPikaParser()
    self._parser = ParserComposition(parser, default_parser.parse_message)
    self._decoder = ParserComposition(decoder, default_parser.decode_message)

declarer instance-attribute #

declarer = declarer

serializer instance-attribute #

serializer = None

connect #

connect(serializer=None)

Lock initialization.

Should be called in async context due anyio.Lock object can't be created outside event loop.

Source code in faststream/rabbit/publisher/producer.py
def connect(self, serializer: Optional["SerializerProto"] = None) -> None:
    """Lock initialization.

    Should be called in async context due `anyio.Lock` object can't be created outside event loop.
    """
    self.serializer = serializer
    self.__lock = RealLock()

disconnect #

disconnect()
Source code in faststream/rabbit/publisher/producer.py
def disconnect(self) -> None:
    self.__lock = LockUnset()

publish async #

publish(cmd)
Source code in faststream/rabbit/publisher/producer.py
@override
async def publish(
    self,
    cmd: "RabbitPublishCommand",
) -> Optional["aiormq.abc.ConfirmationFrameType"]:
    return await self._publish(
        message=cmd.body,
        exchange=cmd.exchange,
        routing_key=cmd.destination,
        reply_to=cmd.reply_to,
        headers=cmd.headers,
        correlation_id=cmd.correlation_id,
        **cmd.publish_options,
        **cmd.message_options,
    )

request async #

request(cmd)
Source code in faststream/rabbit/publisher/producer.py
@override
async def request(self, cmd: "RabbitPublishCommand") -> "IncomingMessage":
    async with _RPCCallback(
        self.__lock.lock,
        await self.declarer.declare_queue(RABBIT_REPLY),
    ) as response_queue:
        with anyio.fail_after(cmd.timeout):
            await self._publish(
                message=cmd.body,
                exchange=cmd.exchange,
                routing_key=cmd.destination,
                reply_to=RABBIT_REPLY.name,
                headers=cmd.headers,
                correlation_id=cmd.correlation_id,
                **cmd.publish_options,
                **cmd.message_options,
            )
            return await response_queue.receive()

publish_batch async #

publish_batch(cmd)
Source code in faststream/rabbit/publisher/producer.py
@override
async def publish_batch(self, cmd: "RabbitPublishCommand") -> None:
    msg = "RabbitMQ doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)