Skip to content

NatsRouter

faststream.nats.NatsRouter #

NatsRouter(
    prefix="",
    handlers=(),
    *,
    dependencies=(),
    middlewares=(),
    routers=(),
    parser=None,
    decoder=None,
    include_in_schema=None,
)

Bases: NatsRegistrator, BrokerRouter[Msg]

Includable to NatsBroker router.

Initialize the NatsRouter instance.

PARAMETER DESCRIPTION
prefix

String prefix to add to all subscribers subjects. Defaults to "".

TYPE: str DEFAULT: ''

handlers

Route object to include. Defaults to ().

TYPE: Iterable[NatsRoute] DEFAULT: ()

dependencies

Dependencies list ([Dependant(),]) to apply to all routers' publishers/subscribers. Defaults to ().

TYPE: Iterable[Dependant] DEFAULT: ()

middlewares

Router middlewares to apply to all routers' publishers/subscribers. Defaults to ().

TYPE: Sequence[BrokerMiddleware[Any, Any]] DEFAULT: ()

routers

Routers to apply to broker. Defaults to ().

TYPE: Iterable[NatsRegistrator] DEFAULT: ()

parser

Parser to map original IncomingMessage Msg to FastStream one. Defaults to None.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects. Defaults to None.

TYPE: Optional[CustomCallable] DEFAULT: None

include_in_schema

Whetever to include operation in AsyncAPI schema or not. Defaults to None.

TYPE: bool | None DEFAULT: None

Source code in faststream/nats/broker/router.py
def __init__(
    self,
    prefix: str = "",
    handlers: Iterable[NatsRoute] = (),
    *,
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    routers: Iterable[NatsRegistrator] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    include_in_schema: bool | None = None,
) -> None:
    """Initialize the NatsRouter instance.

    Args:
        prefix:
            String prefix to add to all subscribers subjects. Defaults to "".
        handlers:
            Route object to include. Defaults to ().
        dependencies:
            Dependencies list (`[Dependant(),]`) to apply to all routers' publishers/subscribers. Defaults to ().
        middlewares:
            Router middlewares to apply to all routers' publishers/subscribers. Defaults to ().
        routers:
            Routers to apply to broker. Defaults to ().
        parser:
            Parser to map original **IncomingMessage** Msg to FastStream one. Defaults to None.
        decoder:
            Function to decode FastStream msg bytes body to python objects. Defaults to None.
        include_in_schema:
            Whetever to include operation in AsyncAPI schema or not. Defaults to None.
    """
    super().__init__(
        handlers=handlers,
        config=NatsBrokerConfig(
            broker_middlewares=middlewares,
            broker_dependencies=dependencies,
            broker_parser=parser,
            broker_decoder=decoder,
            include_in_schema=include_in_schema,
            prefix=prefix,
        ),
        routers=routers,
    )

config instance-attribute #

config = ConfigComposition(config)

routers instance-attribute #

routers = []

subscribers property #

subscribers

publishers property #

publishers

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(middleware)

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

subscriber #

subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: None = None,
    config: None = None,
    ordered_consumer: Literal[False] = False,
    idle_heartbeat: None = None,
    flow_control: None = None,
    deliver_policy: None = None,
    headers_only: None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> CoreSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: None = None,
    config: None = None,
    ordered_consumer: Literal[False] = False,
    idle_heartbeat: None = None,
    flow_control: None = None,
    deliver_policy: None = None,
    headers_only: None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int = ...,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ConcurrentCoreSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> PushStreamSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int = ...,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ConcurrentPushStreamSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Literal[True] = ...,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> PullStreamSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Literal[True] = ...,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int = ...,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ConcurrentPullStreamSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: PullSub = ...,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> Union[PullStreamSubscriber, BatchPullStreamSubscriber]
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: None = None,
    config: None = None,
    ordered_consumer: Literal[False] = False,
    idle_heartbeat: None = None,
    flow_control: None = None,
    deliver_policy: None = None,
    headers_only: None = None,
    pull_sub: Literal[False] = False,
    kv_watch: Union[str, KvWatch] = ...,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> KeyValueWatchSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: None = None,
    config: None = None,
    ordered_consumer: Literal[False] = False,
    idle_heartbeat: None = None,
    flow_control: None = None,
    deliver_policy: None = None,
    headers_only: None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Union[Literal[True], ObjWatch] = ...,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ObjStoreWatchSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Union[bool, PullSub] = False,
    kv_watch: Union[str, KvWatch, None] = None,
    obj_watch: Union[bool, ObjWatch] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream, None] = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int | None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> LogicSubscriber[Any]
subscriber(
    subject="",
    queue="",
    pending_msgs_limit=None,
    pending_bytes_limit=None,
    max_msgs=0,
    durable=None,
    config=None,
    ordered_consumer=False,
    idle_heartbeat=None,
    flow_control=None,
    deliver_policy=None,
    headers_only=None,
    pull_sub=False,
    kv_watch=None,
    obj_watch=False,
    inbox_prefix=INBOX_PREFIX,
    stream=None,
    dependencies=(),
    parser=None,
    decoder=None,
    persistent=True,
    ack_first=EMPTY,
    middlewares=(),
    no_ack=EMPTY,
    max_workers=None,
    ack_policy=EMPTY,
    no_reply=False,
    title=None,
    description=None,
    include_in_schema=True,
)

Creates NATS subscriber object.

You can use it as a handler decorator @broker.subscriber(...).

PARAMETER DESCRIPTION
subject

NATS subject to subscribe.

TYPE: str DEFAULT: ''

queue

Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS server.

TYPE: str DEFAULT: ''

pending_msgs_limit

Limit of messages, considered by NATS server as possible to be delivered to the client without been answered. In case of NATS Core, if that limits exceeds, you will receive NATS 'Slow Consumer' error. That's literally means that your worker can't handle the whole load. In case of NATS JetStream, you will no longer receive messages until some of delivered messages will be acked in any way.

TYPE: int | None DEFAULT: None

pending_bytes_limit

The number of bytes, considered by NATS server as possible to be delivered to the client without been answered. In case of NATS Core, if that limit exceeds, you will receive NATS 'Slow Consumer' error. That's literally means that your worker can't handle the whole load. In case of NATS JetStream, you will no longer receive messages until some of delivered messages will be acked in any way.

TYPE: int | None DEFAULT: None

max_msgs

Consuming messages limiter. Automatically disconnect if reached.

TYPE: int DEFAULT: 0

durable

Name of the durable consumer to which the the subscription should be bound.

TYPE: str | None DEFAULT: None

config

Configuration of JetStream consumer to be subscribed with.

TYPE: Optional[ConsumerConfig] DEFAULT: None

ordered_consumer

Enable ordered consumer mode.

TYPE: bool DEFAULT: False

idle_heartbeat

Enable Heartbeats for a consumer to detect failures.

TYPE: float | None DEFAULT: None

flow_control

Enable Flow Control for a consumer.

TYPE: bool | None DEFAULT: None

deliver_policy

Deliver Policy to be used for subscription.

TYPE: Optional[DeliverPolicy] DEFAULT: None

headers_only

Should be message delivered without payload, only headers and metadata.

TYPE: bool | None DEFAULT: None

pull_sub

NATS Pull consumer parameters container. Should be used with stream only.

TYPE: Union[bool, PullSub] DEFAULT: False

kv_watch

KeyValue watch parameters container.

TYPE: Union[str, KvWatch, None] DEFAULT: None

obj_watch

ObjectStore watch parameters container.

TYPE: Union[bool, ObjWatch] DEFAULT: False

inbox_prefix

Prefix for generating unique inboxes, subjects with that prefix and NUID.

TYPE: bytes DEFAULT: INBOX_PREFIX

ack_first

Whether to ack message at start of consuming or not.

TYPE: bool DEFAULT: EMPTY

stream

Subscribe to NATS Stream with subject filter.

TYPE: Union[str, JStream, None] DEFAULT: None

dependencies

Dependencies list ([Dependant(),]) to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Parser to map original nats-py Msg to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

middlewares

Subscriber middlewares to wrap incoming message processing.

TYPE: Sequence[SubscriberMiddleware[Any]] DEFAULT: ()

max_workers

Number of workers to process messages concurrently.

TYPE: int | None DEFAULT: None

no_ack

Whether to disable FastStream auto acknowledgement logic or not.

TYPE: bool DEFAULT: EMPTY

ack_policy

Whether to ack message at start of consuming or not.

TYPE: AckPolicy DEFAULT: EMPTY

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

title

AsyncAPI subscriber object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI subscriber object description. Uses decorated docstring as default.

TYPE: str | None DEFAULT: None

include_in_schema

Whetever to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the subscriber persistent or not.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
LogicSubscriber[Any]

LogicSubscriber[Any]: The created subscriber object.

Source code in faststream/nats/broker/registrator.py
@override
def subscriber(
    self,
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    # Core arguments
    max_msgs: int = 0,
    # JS arguments
    durable: str | None = None,
    config: Optional["api.ConsumerConfig"] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional["api.DeliverPolicy"] = None,
    headers_only: bool | None = None,
    # pull arguments
    pull_sub: Union[bool, "PullSub"] = False,
    kv_watch: Union[str, "KvWatch", None] = None,
    obj_watch: Union[bool, "ObjWatch"] = False,
    inbox_prefix: bytes = api.INBOX_PREFIX,
    # custom
    stream: Union[str, "JStream", None] = None,
    # broker arguments
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    persistent: bool = True,
    ack_first: Annotated[
        bool,
        deprecated(
            "This option is deprecated and will be removed in 0.7.0 release. "
            "Please, use `ack_policy=AckPolicy.ACK_FIRST` instead."
        ),
    ] = EMPTY,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[Any]"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    no_ack: Annotated[
        bool,
        deprecated(
            "This option was deprecated in 0.6.0 to prior to **ack_policy=AckPolicy.MANUAL**. "
            "Scheduled to remove in 0.7.0",
        ),
    ] = EMPTY,
    max_workers: int | None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> "LogicSubscriber[Any]":
    """Creates NATS subscriber object.

    You can use it as a handler decorator `@broker.subscriber(...)`.

    Args:
        subject: NATS subject to subscribe.
        queue: Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS
            server.
        pending_msgs_limit: Limit of messages, considered by NATS server as possible to be delivered to the
            client without been answered. In case of NATS Core, if that limits exceeds, you will receive NATS
            'Slow Consumer' error. That's literally means that your worker can't handle the whole load. In case of
            NATS JetStream, you will no longer receive messages until some of delivered messages will be acked in
            any way.
        pending_bytes_limit: The number of bytes, considered by NATS server as possible to be delivered to the
            client without been answered. In case of NATS Core, if that limit exceeds, you will receive NATS 'Slow
            Consumer' error. That's literally means that your worker can't handle the whole load. In case of NATS
            JetStream, you will no longer receive messages until some of delivered messages will be acked in any
            way.
        max_msgs: Consuming messages limiter. Automatically disconnect if reached.
        durable: Name of the durable consumer to which the the subscription should be bound.
        config: Configuration of JetStream consumer to be subscribed with.
        ordered_consumer: Enable ordered consumer mode.
        idle_heartbeat: Enable Heartbeats for a consumer to detect failures.
        flow_control: Enable Flow Control for a consumer.
        deliver_policy: Deliver Policy to be used for subscription.
        headers_only: Should be message delivered without payload, only headers and metadata.
        pull_sub: NATS Pull consumer parameters container. Should be used with `stream` only.
        kv_watch: KeyValue watch parameters container.
        obj_watch: ObjectStore watch parameters container.
        inbox_prefix: Prefix for generating unique inboxes, subjects with that prefix and NUID.
        ack_first: Whether to `ack` message at start of consuming or not.
        stream: Subscribe to NATS Stream with `subject` filter.
        dependencies: Dependencies list (`[Dependant(),]`) to apply to the subscriber.
        parser: Parser to map original **nats-py** Msg to FastStream one.
        decoder: Function to decode FastStream msg bytes body to python objects.
        middlewares: Subscriber middlewares to wrap incoming message processing.
        max_workers: Number of workers to process messages concurrently.
        no_ack: Whether to disable **FastStream** auto acknowledgement logic or not.
        ack_policy: Whether to `ack` message at start of consuming or not.
        no_reply: Whether to disable **FastStream** RPC and Reply To auto responses or not.
        title: AsyncAPI subscriber object title.
        description: AsyncAPI subscriber object description. Uses decorated docstring as default.
        include_in_schema: Whetever to include operation in AsyncAPI schema or not.
        persistent: Whether to make the subscriber persistent or not.

    Returns:
        LogicSubscriber[Any]: The created subscriber object.
    """
    stream = self._stream_builder.create(stream)

    subscriber = create_subscriber(
        subject=subject,
        queue=queue,
        stream=stream,
        pull_sub=PullSub.validate(pull_sub),
        kv_watch=KvWatch.validate(kv_watch),
        obj_watch=ObjWatch.validate(obj_watch),
        max_workers=max_workers or 1,
        # extra args
        pending_msgs_limit=pending_msgs_limit,
        pending_bytes_limit=pending_bytes_limit,
        max_msgs=max_msgs,
        durable=durable,
        config=config,
        ordered_consumer=ordered_consumer,
        idle_heartbeat=idle_heartbeat,
        flow_control=flow_control,
        deliver_policy=deliver_policy,
        headers_only=headers_only,
        inbox_prefix=inbox_prefix,
        ack_first=ack_first,
        # subscriber args
        ack_policy=ack_policy,
        no_ack=no_ack,
        no_reply=no_reply,
        broker_config=cast("NatsBrokerConfig", self.config),
        # AsyncAPI
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    self._stream_builder.add_subject(stream, subscriber.subject)

    return subscriber.add_call(
        parser_=parser,
        decoder_=decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(
    subject,
    *,
    headers=None,
    reply_to="",
    stream=None,
    timeout=None,
    persistent=True,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
subject

NATS subject to send message.

TYPE: str

headers

Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway. Can be overridden by publish.headers if specified.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

NATS subject name to send response.

TYPE: str DEFAULT: ''

stream

This option validates that the target subject is in presented stream. Can be omitted without any effect.

TYPE: Union[str, JStream, None] DEFAULT: None

timeout

Timeout to send message to NATS.

TYPE: float | None DEFAULT: None

middlewares

Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whetever to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the publisher persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/nats/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    subject: str,
    *,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    stream: Union[str, "JStream", None] = None,
    timeout: float | None = None,
    persistent: bool = True,
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> "LogicPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        subject: NATS subject to send message.
        headers: Message headers to store metainformation.
            content-type and correlation_id will be set automatically by framework anyway.
            Can be overridden by `publish.headers` if specified.
        reply_to: NATS subject name to send response.
        stream: This option validates that the target `subject` is in presented stream.
            Can be omitted without any effect.
        timeout: Timeout to send message to NATS.
        middlewares: Publisher middlewares to wrap outgoing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type.
            Should be any python-native object annotation or `pydantic.BaseModel`.
        include_in_schema: Whetever to include operation in AsyncAPI schema or not.
        persistent: Whether to make the publisher persistent or not.
    """
    stream = self._stream_builder.create(stream)

    publisher = create_publisher(
        subject=subject,
        headers=headers,
        # Core
        reply_to=reply_to,
        # JS
        timeout=timeout,
        stream=stream,
        # Specific
        broker_config=cast("NatsBrokerConfig", self.config),
        middlewares=middlewares,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )

    super().publisher(publisher, persistent=persistent)

    self._stream_builder.add_subject(stream, publisher.subject)

    return publisher

include_router #

include_router(
    router,
    *,
    prefix="",
    dependencies=(),
    middlewares=(),
    include_in_schema=None,
)
Source code in faststream/nats/broker/registrator.py
@override
def include_router(  # type: ignore[override]
    self,
    router: "NatsRegistrator",
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, NatsRegistrator):
        msg = (
            f"Router must be an instance of NatsRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    for stream, subjects in router._stream_builder.objects.values():
        for subj in subjects:
            router_subject = f"{self.config.prefix}{prefix}{subj}"
            self._stream_builder.add_subject(stream, router_subject)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)