Skip to content

NatsPublisher

faststream.nats.broker.router.NatsPublisher #

NatsPublisher(
    subject="",
    *,
    headers=None,
    reply_to="",
    stream=None,
    timeout=None,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
)

Bases: ArgsContainer

Delayed NatsPublisher registration object.

Just a copy of KafkaRegistrator.publisher(...) arguments.

Initialized the NatsPublisher object.

PARAMETER DESCRIPTION
subject

NATS subject to send message.

TYPE: str DEFAULT: ''

headers

Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway. Can be overridden by publish.headers if specified.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

NATS subject name to send response.

TYPE: str DEFAULT: ''

stream

This option validates that the target subject is in presented stream. Can be omitted without any effect.

TYPE: Union[str, JStream, None] DEFAULT: None

timeout

Timeout to send message to NATS.

TYPE: float | None DEFAULT: None

middlewares

Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whetever to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

Source code in faststream/nats/broker/router.py
def __init__(
    self,
    subject: str = "",
    *,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    # JS
    stream: Union[str, "JStream", None] = None,
    timeout: float | None = None,
    # basic args
    middlewares: Sequence["PublisherMiddleware"] = (),
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> None:
    """Initialized the NatsPublisher object.

    Args:
        subject:
            NATS subject to send message.
        headers:
            Message headers to store metainformation.
            **content-type** and **correlation_id** will be set automatically by framework anyway. Can be overridden by `publish.headers` if specified.
        reply_to:
            NATS subject name to send response.
        stream:
            This option validates that the target `subject` is in presented stream.
            Can be omitted without any effect.
        timeout:
            Timeout to send message to NATS.
        middlewares:
            Publisher middlewares to wrap outgoing messages.
        title:
            AsyncAPI publisher object title.
        description:
            AsyncAPI publisher object description.
        schema:
            AsyncAPI publishing message type.
            Should be any python-native object annotation or `pydantic.BaseModel`.
        include_in_schema:
            Whetever to include operation in AsyncAPI schema or not.
    """
    super().__init__(
        subject=subject,
        headers=headers,
        reply_to=reply_to,
        stream=stream,
        timeout=timeout,
        middlewares=middlewares,
        title=title,
        description=description,
        schema=schema,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args = args

kwargs instance-attribute #

kwargs = kwargs