AioPikaFastProducerImpl(*, declarer, parser, decoder)
Bases: AioPikaFastProducer
A class for fast producing messages using aio-pika.
Source code in faststream/rabbit/publisher/producer.py
| def __init__(
self,
*,
declarer: "RabbitDeclarer",
parser: Optional["CustomCallable"],
decoder: Optional["CustomCallable"],
) -> None:
self.declarer = declarer
self.__lock: LockState = LockUnset()
self.serializer: SerializerProto | None = None
default_parser = AioPikaParser()
self._parser = ParserComposition(parser, default_parser.parse_message)
self._decoder = ParserComposition(decoder, default_parser.decode_message)
|
declarer
instance-attribute
serializer
instance-attribute
connect
Lock initialization.
Should be called in async context due anyio.Lock
object can't be created outside event loop.
Source code in faststream/rabbit/publisher/producer.py
| def connect(self, serializer: Optional["SerializerProto"] = None) -> None:
"""Lock initialization.
Should be called in async context due `anyio.Lock` object can't be created outside event loop.
"""
self.serializer = serializer
self.__lock = RealLock()
|
disconnect
Source code in faststream/rabbit/publisher/producer.py
| def disconnect(self) -> None:
self.__lock = LockUnset()
|
publish
async
Source code in faststream/rabbit/publisher/producer.py
| @override
async def publish(
self,
cmd: "RabbitPublishCommand",
) -> Optional["aiormq.abc.ConfirmationFrameType"]:
return await self._publish(
message=cmd.body,
exchange=cmd.exchange,
routing_key=cmd.destination,
reply_to=cmd.reply_to,
headers=cmd.headers,
correlation_id=cmd.correlation_id,
**cmd.publish_options,
**cmd.message_options,
)
|
request
async
Source code in faststream/rabbit/publisher/producer.py
| @override
async def request(self, cmd: "RabbitPublishCommand") -> "IncomingMessage":
async with _RPCCallback(
self.__lock.lock,
await self.declarer.declare_queue(RABBIT_REPLY),
) as response_queue:
with anyio.fail_after(cmd.timeout):
await self._publish(
message=cmd.body,
exchange=cmd.exchange,
routing_key=cmd.destination,
reply_to=RABBIT_REPLY.name,
headers=cmd.headers,
correlation_id=cmd.correlation_id,
**cmd.publish_options,
**cmd.message_options,
)
return await response_queue.receive()
|
publish_batch
async
Source code in faststream/rabbit/publisher/producer.py
| @override
async def publish_batch(self, cmd: "RabbitPublishCommand") -> None:
msg = "RabbitMQ doesn't support publishing in batches."
raise FeatureNotSupportedException(msg)
|