Skip to content

NatsRoute

faststream.nats.broker.router.NatsRoute #

NatsRoute(
    call,
    subject,
    publishers=(),
    queue="",
    pending_msgs_limit=None,
    pending_bytes_limit=None,
    max_msgs=0,
    durable=None,
    config=None,
    ordered_consumer=False,
    idle_heartbeat=None,
    flow_control=None,
    deliver_policy=None,
    headers_only=None,
    pull_sub=None,
    kv_watch=None,
    obj_watch=False,
    inbox_prefix=INBOX_PREFIX,
    ack_first=EMPTY,
    stream=None,
    dependencies=(),
    parser=None,
    decoder=None,
    middlewares=(),
    max_workers=None,
    no_ack=EMPTY,
    ack_policy=EMPTY,
    no_reply=False,
    title=None,
    description=None,
    include_in_schema=True,
)

Bases: SubscriberRoute

Class to store delayed NatsBroker subscriber registration.

Initialized NatsRoute.

PARAMETER DESCRIPTION
call

Message handler function to wrap the same with @broker.subscriber(...) way.

TYPE: Callable[..., SendableMessage] | Callable[..., Awaitable[SendableMessage]]

subject

NATS subject to subscribe.

TYPE: str

publishers

Nats publishers to broadcast the handler result.

TYPE: Iterable[NatsPublisher] DEFAULT: ()

queue

Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS server.

TYPE: str DEFAULT: ''

pending_msgs_limit

Limit of messages, considered by NATS server as possible to be delivered to the client without been answered.

TYPE: int | None DEFAULT: None

pending_bytes_limit

The number of bytes, considered by NATS server as possible to be delivered to the client without been answered.

TYPE: int | None DEFAULT: None

max_msgs

Consuming messages limiter. Automatically disconnect if reached.

TYPE: int DEFAULT: 0

durable

Name of the durable consumer to which the the subscription should be bound.

TYPE: str | None DEFAULT: None

config

Configuration of JetStream consumer to be subscribed with.

TYPE: Optional[ConsumerConfig] DEFAULT: None

ordered_consumer

Enable ordered consumer mode.

TYPE: bool DEFAULT: False

idle_heartbeat

Enable Heartbeats for a consumer to detect failures.

TYPE: float | None DEFAULT: None

flow_control

Enable Flow Control for a consumer.

TYPE: bool | None DEFAULT: None

deliver_policy

Deliver Policy to be used for subscription.

TYPE: Optional[DeliverPolicy] DEFAULT: None

headers_only

Should be message delivered without payload, only headers and metadata.

TYPE: bool | None DEFAULT: None

pull_sub

NATS Pull consumer parameters container. Should be used with stream only.

TYPE: Optional[PullSub] DEFAULT: None

kv_watch

KeyValue watch parameters container.

TYPE: Union[str, KvWatch, None] DEFAULT: None

obj_watch

ObjectStore watch parameters container.

TYPE: Union[bool, ObjWatch] DEFAULT: False

inbox_prefix

Prefix for generating unique inboxes, subjects with that prefix and NUID.

TYPE: bytes DEFAULT: INBOX_PREFIX

ack_first

Whether to ack message at start of consuming or not.

TYPE: bool DEFAULT: EMPTY

stream

Subscribe to NATS Stream with subject filter.

TYPE: Union[str, JStream, None] DEFAULT: None

dependencies

Dependencies list ([Dependant(),]) to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Parser to map original nats-py Msg to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

middlewares

Subscriber middlewares to wrap incoming message processing.

TYPE: Sequence[SubscriberMiddleware[Any]] DEFAULT: ()

max_workers

Number of workers to process messages concurrently.

TYPE: int | None DEFAULT: None

no_ack

Whether to disable FastStream auto acknowledgement logic or not.

TYPE: bool DEFAULT: EMPTY

ack_policy

Acknowledgment policy for subscriber.

TYPE: AckPolicy DEFAULT: EMPTY

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

title

AsyncAPI subscriber object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI subscriber object description. "Uses decorated docstring as default.

TYPE: str | None DEFAULT: None

include_in_schema

Whetever to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

Source code in faststream/nats/broker/router.py
def __init__(
    self,
    call: Callable[..., "SendableMessage"]
    | Callable[..., Awaitable["SendableMessage"]],
    subject: str,
    publishers: Iterable[NatsPublisher] = (),
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    # Core arguments
    max_msgs: int = 0,
    # JS arguments
    durable: str | None = None,
    config: Optional["api.ConsumerConfig"] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional["api.DeliverPolicy"] = None,
    headers_only: bool | None = None,
    # pull arguments
    pull_sub: Optional["PullSub"] = None,
    kv_watch: Union[str, "KvWatch", None] = None,
    obj_watch: Union[bool, "ObjWatch"] = False,
    inbox_prefix: bytes = api.INBOX_PREFIX,
    ack_first: Annotated[
        bool,
        deprecated(
            "This option is deprecated and will be removed in 0.7.0 release. "
            "Please, use `ack_policy=AckPolicy.ACK_FIRST` instead."
        ),
    ] = EMPTY,
    stream: Union[str, "JStream", None] = None,
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[Any]"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    max_workers: int | None = None,
    no_ack: Annotated[
        bool,
        deprecated(
            "This option was deprecated in 0.6.0 to prior to **ack_policy=AckPolicy.MANUAL**. "
            "Scheduled to remove in 0.7.0",
        ),
    ] = EMPTY,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> None:
    """Initialized NatsRoute.

    Args:
        call:
            Message handler function to wrap the same with `@broker.subscriber(...)` way.
        subject:
            NATS subject to subscribe.
        publishers:
            Nats publishers to broadcast the handler result.
        queue:
            Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS server.
        pending_msgs_limit:
            Limit of messages, considered by NATS server as possible to be delivered to the client without been answered.
        pending_bytes_limit:
            The number of bytes, considered by NATS server as possible to be delivered to the client without been answered.
        max_msgs:
            Consuming messages limiter. Automatically disconnect if reached.
        durable:
            Name of the durable consumer to which the the subscription should be bound.
        config:
            Configuration of JetStream consumer to be subscribed with.
        ordered_consumer:
            Enable ordered consumer mode.
        idle_heartbeat:
            Enable Heartbeats for a consumer to detect failures.
        flow_control:
            Enable Flow Control for a consumer.
        deliver_policy:
            Deliver Policy to be used for subscription.
        headers_only:
            Should be message delivered without payload, only headers and metadata.
        pull_sub:
            NATS Pull consumer parameters container. Should be used with `stream` only.
        kv_watch:
            KeyValue watch parameters container.
        obj_watch:
            ObjectStore watch parameters container.
        inbox_prefix:
            Prefix for generating unique inboxes, subjects with that prefix and NUID.
        ack_first:
            Whether to `ack` message at start of consuming or not.
        stream:
            Subscribe to NATS Stream with `subject` filter.
        dependencies:
            Dependencies list (`[Dependant(),]`) to apply to the subscriber.
        parser:
            Parser to map original **nats-py** Msg to FastStream one.
        decoder:
            Function to decode FastStream msg bytes body to python objects.
        middlewares:
            Subscriber middlewares to wrap incoming message processing.
        max_workers:
            Number of workers to process messages concurrently.
        no_ack:
            Whether to disable **FastStream** auto acknowledgement logic or not.
        ack_policy:
            Acknowledgment policy for subscriber.
        no_reply:
            Whether to disable **FastStream** RPC and Reply To auto responses or not.
        title:
            AsyncAPI subscriber object title.
        description:
            AsyncAPI subscriber object description.
            "Uses decorated docstring as default.
        include_in_schema:
            Whetever to include operation in AsyncAPI schema or not.
    """
    super().__init__(
        call,
        subject=subject,
        publishers=publishers,
        pending_msgs_limit=pending_msgs_limit,
        pending_bytes_limit=pending_bytes_limit,
        max_msgs=max_msgs,
        durable=durable,
        config=config,
        ordered_consumer=ordered_consumer,
        idle_heartbeat=idle_heartbeat,
        flow_control=flow_control,
        deliver_policy=deliver_policy,
        headers_only=headers_only,
        pull_sub=pull_sub,
        kv_watch=kv_watch,
        obj_watch=obj_watch,
        inbox_prefix=inbox_prefix,
        ack_first=ack_first,
        stream=stream,
        max_workers=max_workers,
        queue=queue,
        dependencies=dependencies,
        parser=parser,
        decoder=decoder,
        middlewares=middlewares,
        ack_policy=ack_policy,
        no_ack=no_ack,
        no_reply=no_reply,
        title=title,
        description=description,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args = args

kwargs instance-attribute #

kwargs = kwargs

call instance-attribute #

call = call

publishers instance-attribute #

publishers = publishers