NatsPublisher(
subject: str = "",
*,
headers: dict[str, str] | None = None,
reply_to: str = "",
stream: Union[str, JStream, None] = None,
timeout: float | None = None,
middlewares: Sequence[PublisherMiddleware] = (),
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
)
Bases: ArgsContainer
Delayed NatsPublisher registration object.
Just a copy of KafkaRegistrator.publisher(...) arguments.
Initialized the NatsPublisher object.
| PARAMETER | DESCRIPTION |
subject | NATS subject to send message. TYPE: str DEFAULT: '' |
headers | Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway. Can be overridden by publish.headers if specified. TYPE: dict[str, str] | None DEFAULT: None |
reply_to | NATS subject name to send response. TYPE: str DEFAULT: '' |
stream | This option validates that the target subject is in presented stream. Can be omitted without any effect. TYPE: Union[str, JStream, None] DEFAULT: None |
timeout | Timeout to send message to NATS. TYPE: float | None DEFAULT: None |
middlewares | Publisher middlewares to wrap outgoing messages. TYPE: Sequence[PublisherMiddleware] DEFAULT: () |
title | AsyncAPI publisher object title. TYPE: str | None DEFAULT: None |
description | AsyncAPI publisher object description. TYPE: str | None DEFAULT: None |
schema | AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel. TYPE: Any | None DEFAULT: None |
include_in_schema | Whetever to include operation in AsyncAPI schema or not. TYPE: bool DEFAULT: True |
Source code in faststream/nats/broker/router.py
| def __init__(
self,
subject: str = "",
*,
headers: dict[str, str] | None = None,
reply_to: str = "",
# JS
stream: Union[str, "JStream", None] = None,
timeout: float | None = None,
# basic args
middlewares: Sequence["PublisherMiddleware"] = (),
# AsyncAPI information
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> None:
"""Initialized the NatsPublisher object.
Args:
subject:
NATS subject to send message.
headers:
Message headers to store metainformation.
**content-type** and **correlation_id** will be set automatically by framework anyway. Can be overridden by `publish.headers` if specified.
reply_to:
NATS subject name to send response.
stream:
This option validates that the target `subject` is in presented stream.
Can be omitted without any effect.
timeout:
Timeout to send message to NATS.
middlewares:
Publisher middlewares to wrap outgoing messages.
title:
AsyncAPI publisher object title.
description:
AsyncAPI publisher object description.
schema:
AsyncAPI publishing message type.
Should be any python-native object annotation or `pydantic.BaseModel`.
include_in_schema:
Whetever to include operation in AsyncAPI schema or not.
"""
super().__init__(
subject=subject,
headers=headers,
reply_to=reply_to,
stream=stream,
timeout=timeout,
middlewares=middlewares,
title=title,
description=description,
schema=schema,
include_in_schema=include_in_schema,
)
|
args instance-attribute
args: Iterable[Any] = args
kwargs instance-attribute
kwargs: dict[str, Any] = kwargs