Skip to content

NatsPublisherSpecification

faststream.nats.publisher.specification.NatsPublisherSpecification #

NatsPublisherSpecification(
    _outer_config, specification_config
)

Bases: PublisherSpecification[NatsBrokerConfig, NatsPublisherSpecificationConfig]

Source code in faststream/_internal/endpoint/publisher/specification.py
def __init__(
    self,
    _outer_config: "T_BrokerConfig",
    specification_config: "T_SpecificationConfig",
) -> None:
    self.config = specification_config
    self._outer_config = _outer_config

    self.calls: list[AnyCallable] = []

subject property #

subject

name property #

name

config instance-attribute #

config = specification_config

calls instance-attribute #

calls = []

include_in_schema property #

include_in_schema

get_schema #

get_schema()
Source code in faststream/nats/publisher/specification.py
def get_schema(self) -> dict[str, PublisherSpec]:
    payloads = self.get_payloads()

    return {
        self.name: PublisherSpec(
            description=self.config.description_,
            operation=Operation(
                message=Message(
                    title=f"{self.name}:Message",
                    payload=resolve_payloads(payloads, "Publisher"),
                ),
                bindings=None,
            ),
            bindings=ChannelBinding(
                nats=nats.ChannelBinding(
                    subject=self.subject,
                    queue=None,
                ),
            ),
        ),
    }

add_call #

add_call(call)
Source code in faststream/_internal/endpoint/publisher/specification.py
def add_call(self, call: "AnyCallable") -> None:
    self.calls.append(call)

get_payloads #

get_payloads()
Source code in faststream/_internal/endpoint/publisher/specification.py
def get_payloads(self) -> list[tuple[dict[str, Any], str]]:
    payloads: list[tuple[dict[str, Any], str]] = []

    if self.config.schema_:
        body = get_model_schema(
            call=create_model(
                "",
                __config__=get_config_base(),
                response__=(self.config.schema_, ...),
            ),
            prefix=f"{self.name}:Message",
        )

        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        di_state = self._outer_config.fd_config

        for call in self.calls:
            call_model = build_call_model(
                call,
                dependency_provider=di_state.provider,
                serializer_cls=di_state._serializer,
            )

            if call_model.serializer:
                response_type = next(
                    iter(call_model.serializer.response_option.values()),
                ).field_type
            else:
                response_type = None

            if response_type is not None and response_type is not Parameter.empty:
                body = get_model_schema(
                    create_model(
                        "",
                        __config__=get_config_base(),
                        response__=(response_type, ...),
                    ),
                    prefix=f"{self.name}:Message",
                )

                if body:
                    payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads