Bases: SubscriberSpecification[KafkaBrokerConfig, KafkaSubscriberSpecificationConfig]
Source code in faststream/_internal/endpoint/subscriber/specification.py
| def __init__(
self,
_outer_config: "T_BrokerConfig",
specification_config: "T_SpecificationConfig",
calls: "CallsCollection[Any]",
) -> None:
self.calls = calls
self.config = specification_config
self._outer_config = _outer_config
|
config
instance-attribute
config = specification_config
include_in_schema
property
get_schema
Source code in faststream/kafka/subscriber/specification.py
| def get_schema(self) -> dict[str, SubscriberSpec]:
payloads = self.get_payloads()
channels = {}
for t in self.topics:
handler_name = self.config.title_ or f"{t}:{self.call_name}"
channels[handler_name] = SubscriberSpec(
description=self.description,
operation=Operation(
message=Message(
title=f"{handler_name}:Message",
payload=resolve_payloads(payloads),
),
bindings=None,
),
bindings=ChannelBinding(
kafka=kafka.ChannelBinding(
topic=t,
partitions=None,
replicas=None,
),
),
)
return channels
|
get_payloads
Source code in faststream/_internal/endpoint/subscriber/specification.py
| def get_payloads(self) -> list[tuple["dict[str, Any]", str]]:
payloads: list[tuple[dict[str, Any], str]] = []
call_name = self.call_name
for h in self.calls:
if h.dependant is None:
msg = "You should setup `Handler` at first."
raise SetupError(msg)
body = parse_handler_params(
h.dependant,
prefix=f"{self.config.title_ or call_name}:Message",
)
payloads.append((body, to_camelcase(h.name)))
if not self.calls:
payloads.append(
(
{
"title": f"{self.config.title_ or call_name}:Message:Payload",
},
to_camelcase(call_name),
),
)
return payloads
|