Skip to content

StreamPublisherSpecification

faststream.redis.publisher.specification.StreamPublisherSpecification #

StreamPublisherSpecification(
    _outer_config, specification_config, stream_sub
)

Bases: RedisPublisherSpecification

Source code in faststream/redis/publisher/specification.py
def __init__(
    self,
    _outer_config: RedisBrokerConfig,
    specification_config: RedisPublisherSpecificationConfig,
    stream_sub: StreamSub,
) -> None:
    super().__init__(_outer_config, specification_config)
    self.stream_sub = stream_sub

stream_sub instance-attribute #

stream_sub = stream_sub

name property #

name

stream_name property #

stream_name

channel_binding property #

channel_binding

config instance-attribute #

config = specification_config

calls instance-attribute #

calls = []

include_in_schema property #

include_in_schema

add_call #

add_call(call)
Source code in faststream/_internal/endpoint/publisher/specification.py
def add_call(self, call: "AnyCallable") -> None:
    self.calls.append(call)

get_payloads #

get_payloads()
Source code in faststream/_internal/endpoint/publisher/specification.py
def get_payloads(self) -> list[tuple[dict[str, Any], str]]:
    payloads: list[tuple[dict[str, Any], str]] = []

    if self.config.schema_:
        body = get_model_schema(
            call=create_model(
                "",
                __config__=get_config_base(),
                response__=(self.config.schema_, ...),
            ),
            prefix=f"{self.name}:Message",
        )

        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        di_state = self._outer_config.fd_config

        for call in self.calls:
            call_model = build_call_model(
                call,
                dependency_provider=di_state.provider,
                serializer_cls=di_state._serializer,
            )

            if call_model.serializer:
                response_type = next(
                    iter(call_model.serializer.response_option.values()),
                ).field_type
            else:
                response_type = None

            if response_type is not None and response_type is not Parameter.empty:
                body = get_model_schema(
                    create_model(
                        "",
                        __config__=get_config_base(),
                        response__=(response_type, ...),
                    ),
                    prefix=f"{self.name}:Message",
                )

                if body:
                    payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

get_schema #

get_schema()
Source code in faststream/redis/publisher/specification.py
def get_schema(self) -> dict[str, PublisherSpec]:
    payloads = self.get_payloads()

    return {
        self.name: PublisherSpec(
            description=self.config.description_,
            operation=Operation(
                message=Message(
                    title=f"{self.name}:Message",
                    payload=resolve_payloads(payloads, "Publisher"),
                ),
                bindings=None,
            ),
            bindings=ChannelBinding(
                redis=self.channel_binding,
            ),
        ),
    }