StreamSubscriberSpecification(
_outer_config, specification_config, calls, stream_sub
)
Bases: RedisSubscriberSpecification
Source code in faststream/redis/subscriber/specification.py
| def __init__(
self,
_outer_config: "RedisBrokerConfig",
specification_config: "RedisSubscriberSpecificationConfig",
calls: "CallsCollection[Any]",
stream_sub: StreamSub,
) -> None:
super().__init__(_outer_config, specification_config, calls)
self.stream_sub = stream_sub
|
stream_sub
instance-attribute
config
instance-attribute
config = specification_config
include_in_schema
property
get_payloads
Source code in faststream/_internal/endpoint/subscriber/specification.py
| def get_payloads(self) -> list[tuple["dict[str, Any]", str]]:
payloads: list[tuple[dict[str, Any], str]] = []
call_name = self.call_name
for h in self.calls:
if h.dependant is None:
msg = "You should setup `Handler` at first."
raise SetupError(msg)
body = parse_handler_params(
h.dependant,
prefix=f"{self.config.title_ or call_name}:Message",
)
payloads.append((body, to_camelcase(h.name)))
if not self.calls:
payloads.append(
(
{
"title": f"{self.config.title_ or call_name}:Message:Payload",
},
to_camelcase(call_name),
),
)
return payloads
|
get_schema
Source code in faststream/redis/subscriber/specification.py
| def get_schema(self) -> dict[str, SubscriberSpec]:
payloads = self.get_payloads()
return {
self.name: SubscriberSpec(
description=self.description,
operation=Operation(
message=Message(
title=f"{self.name}:Message",
payload=resolve_payloads(payloads),
),
bindings=None,
),
bindings=ChannelBinding(
redis=self.channel_binding,
),
),
}
|