Bases: SubscriberSpecification[NatsBrokerConfig, NatsSubscriberSpecificationConfig]
Source code in faststream/_internal/endpoint/subscriber/specification.py
| def __init__(
self,
_outer_config: "T_BrokerConfig",
specification_config: "T_SpecificationConfig",
calls: "CallsCollection[Any]",
) -> None:
self.calls = calls
self.config = specification_config
self._outer_config = _outer_config
|
config
instance-attribute
config = specification_config
include_in_schema
property
get_schema
Source code in faststream/nats/subscriber/specification.py
| def get_schema(self) -> dict[str, SubscriberSpec]:
payloads = self.get_payloads()
return {
self.name: SubscriberSpec(
description=self.description,
operation=Operation(
message=Message(
title=f"{self.name}:Message",
payload=resolve_payloads(payloads),
),
bindings=None,
),
bindings=ChannelBinding(
nats=nats.ChannelBinding(
subject=self.subject,
queue=self.config.queue,
),
),
),
}
|
get_payloads
Source code in faststream/_internal/endpoint/subscriber/specification.py
| def get_payloads(self) -> list[tuple["dict[str, Any]", str]]:
payloads: list[tuple[dict[str, Any], str]] = []
call_name = self.call_name
for h in self.calls:
if h.dependant is None:
msg = "You should setup `Handler` at first."
raise SetupError(msg)
body = parse_handler_params(
h.dependant,
prefix=f"{self.config.title_ or call_name}:Message",
)
payloads.append((body, to_camelcase(h.name)))
if not self.calls:
payloads.append(
(
{
"title": f"{self.config.title_ or call_name}:Message:Payload",
},
to_camelcase(call_name),
),
)
return payloads
|