Skip to content

RabbitSubscriberSpecification

faststream.rabbit.subscriber.specification.RabbitSubscriberSpecification #

RabbitSubscriberSpecification(
    _outer_config, specification_config, calls
)

Bases: SubscriberSpecification[RabbitBrokerConfig, RabbitSubscriberSpecificationConfig]

Source code in faststream/_internal/endpoint/subscriber/specification.py
def __init__(
    self,
    _outer_config: "T_BrokerConfig",
    specification_config: "T_SpecificationConfig",
    calls: "CallsCollection[Any]",
) -> None:
    self.calls = calls
    self.config = specification_config
    self._outer_config = _outer_config

name property #

name

calls instance-attribute #

calls = calls

config instance-attribute #

config = specification_config

include_in_schema property #

include_in_schema

description property #

description

call_name property #

call_name

get_schema #

get_schema()
Source code in faststream/rabbit/subscriber/specification.py
def get_schema(self) -> dict[str, SubscriberSpec]:
    payloads = self.get_payloads()

    queue = self.config.queue.add_prefix(self._outer_config.prefix)

    exchange_binding = amqp.Exchange.from_exchange(self.config.exchange)
    queue_binding = amqp.Queue.from_queue(queue)

    channel_name = self.name

    return {
        channel_name: SubscriberSpec(
            description=self.description,
            operation=Operation(
                bindings=OperationBinding(
                    amqp=amqp.OperationBinding(
                        routing_key=queue.routing(),
                        queue=queue_binding,
                        exchange=exchange_binding,
                        ack=True,
                        reply_to=None,
                        persist=None,
                        mandatory=None,
                        priority=None,
                    ),
                ),
                message=Message(
                    title=f"{channel_name}:Message",
                    payload=resolve_payloads(payloads),
                ),
            ),
            bindings=ChannelBinding(
                amqp=amqp.ChannelBinding(
                    virtual_host=self._outer_config.virtual_host,
                    queue=queue_binding,
                    exchange=exchange_binding,
                ),
            ),
        ),
    }

get_payloads #

get_payloads()
Source code in faststream/_internal/endpoint/subscriber/specification.py
def get_payloads(self) -> list[tuple["dict[str, Any]", str]]:
    payloads: list[tuple[dict[str, Any], str]] = []

    call_name = self.call_name

    for h in self.calls:
        if h.dependant is None:
            msg = "You should setup `Handler` at first."
            raise SetupError(msg)

        body = parse_handler_params(
            h.dependant,
            prefix=f"{self.config.title_ or call_name}:Message",
        )
        payloads.append((body, to_camelcase(h.name)))

    if not self.calls:
        payloads.append(
            (
                {
                    "title": f"{self.config.title_ or call_name}:Message:Payload",
                },
                to_camelcase(call_name),
            ),
        )

    return payloads