Skip to content

NatsBroker

faststream.nats.NatsBroker #

NatsBroker(
    servers=("nats://localhost:4222",),
    *,
    error_cb=None,
    disconnected_cb=None,
    closed_cb=None,
    discovered_server_cb=None,
    reconnected_cb=None,
    name=SERVICE_NAME,
    pedantic=False,
    verbose=False,
    allow_reconnect=True,
    connect_timeout=DEFAULT_CONNECT_TIMEOUT,
    reconnect_time_wait=DEFAULT_RECONNECT_TIME_WAIT,
    max_reconnect_attempts=DEFAULT_MAX_RECONNECT_ATTEMPTS,
    ping_interval=DEFAULT_PING_INTERVAL,
    max_outstanding_pings=DEFAULT_MAX_OUTSTANDING_PINGS,
    dont_randomize=False,
    flusher_queue_size=DEFAULT_MAX_FLUSHER_QUEUE_SIZE,
    no_echo=False,
    tls_hostname=None,
    token=None,
    drain_timeout=DEFAULT_DRAIN_TIMEOUT,
    signature_cb=None,
    user_jwt_cb=None,
    user_credentials=None,
    nkeys_seed=None,
    nkeys_seed_str=None,
    inbox_prefix=DEFAULT_INBOX_PREFIX,
    pending_size=DEFAULT_PENDING_SIZE,
    flush_timeout=None,
    js_options=None,
    graceful_timeout=None,
    decoder=None,
    parser=None,
    dependencies=(),
    middlewares=(),
    routers=(),
    security=None,
    specification_url=None,
    protocol="nats",
    protocol_version="custom",
    description=None,
    tags=(),
    logger=EMPTY,
    log_level=INFO,
    apply_types=True,
    serializer=EMPTY,
    provider=None,
    context=None,
)

Bases: NatsRegistrator, BrokerUsecase[Msg, Client]

A class to represent a NATS broker.

Initialize the NatsBroker object.

PARAMETER DESCRIPTION
servers

NATS cluster addresses to connect.

TYPE: str | Iterable[str] DEFAULT: ('nats://localhost:4222',)

error_cb

Callback to report errors.

TYPE: Optional[ErrorCallback] DEFAULT: None

disconnected_cb

Callback to report disconnection from NATS.

TYPE: Optional[Callback] DEFAULT: None

closed_cb

Callback to report when client stops reconnection to NATS.

TYPE: Optional[Callback] DEFAULT: None

discovered_server_cb

A callback to report when a new server joins the cluster.

TYPE: Optional[Callback] DEFAULT: None

reconnected_cb

Callback to report success reconnection.

TYPE: Optional[Callback] DEFAULT: None

name

Label the connection with name (shown in NATS monitoring).

TYPE: str | None DEFAULT: SERVICE_NAME

pedantic

Turn on NATS server pedantic mode that performs extra checks on the protocol. https://docs.nats.io/using-nats/developer/connecting/misc#turn-on-pedantic-mode

TYPE: bool DEFAULT: False

verbose

Verbose mode produce more feedback about code execution.

TYPE: bool DEFAULT: False

allow_reconnect

Whether recover connection automatically or not.

TYPE: bool DEFAULT: True

connect_timeout

Timeout in seconds to establish connection with NATS server.

TYPE: int DEFAULT: DEFAULT_CONNECT_TIMEOUT

reconnect_time_wait

Time in seconds to wait for reestablish connection to NATS server

TYPE: int DEFAULT: DEFAULT_RECONNECT_TIME_WAIT

max_reconnect_attempts

Maximum attempts number to reconnect to NATS server.

TYPE: int DEFAULT: DEFAULT_MAX_RECONNECT_ATTEMPTS

ping_interval

Interval in seconds to ping.

TYPE: int DEFAULT: DEFAULT_PING_INTERVAL

max_outstanding_pings

Maximum number of failed pings

TYPE: int DEFAULT: DEFAULT_MAX_OUTSTANDING_PINGS

dont_randomize

Boolean indicating should client randomly shuffle servers list for reconnection randomness.

TYPE: bool DEFAULT: False

flusher_queue_size

Max count of commands awaiting to be flushed to the socket

TYPE: int DEFAULT: DEFAULT_MAX_FLUSHER_QUEUE_SIZE

no_echo

Boolean indicating should commands be echoed.

TYPE: bool DEFAULT: False

tls_hostname

Hostname for TLS.

TYPE: str | None DEFAULT: None

token

Auth token for NATS auth.

TYPE: str | None DEFAULT: None

drain_timeout

Timeout in seconds to drain subscriptions.

TYPE: int DEFAULT: DEFAULT_DRAIN_TIMEOUT

signature_cb

A callback used to sign a nonce from the server while authenticating with nkeys. The user should sign the nonce and return the base64 encoded signature.

TYPE: Optional[SignatureCallback] DEFAULT: None

user_jwt_cb

A callback used to fetch and return the account signed JWT for this user.

TYPE: Optional[JWTCallback] DEFAULT: None

user_credentials

A user credentials file or tuple of files.

TYPE: Optional[Credentials] DEFAULT: None

nkeys_seed

Path-like object containing nkeys seed that will be used.

TYPE: str | None DEFAULT: None

nkeys_seed_str

Nkeys seed to be used.

TYPE: str | None DEFAULT: None

inbox_prefix

Prefix for generating unique inboxes, subjects with that prefix and NUID.ß

TYPE: str | bytes DEFAULT: DEFAULT_INBOX_PREFIX

pending_size

Max size of the pending buffer for publishing commands.

TYPE: int DEFAULT: DEFAULT_PENDING_SIZE

flush_timeout

Max duration to wait for a forced flush to occur

TYPE: float | None DEFAULT: None

js_options

JetStream initialization options.

TYPE: Union[JsInitOptions, dict[str, Any], None] DEFAULT: None

graceful_timeout

Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.

TYPE: float | None DEFAULT: None

decoder

Custom decoder object

TYPE: Optional[CustomCallable] DEFAULT: None

parser

Custom parser object.

TYPE: Optional[CustomCallable] DEFAULT: None

dependencies

Dependencies to apply to all broker subscribers.

TYPE: Iterable[Dependant] DEFAULT: ()

middlewares

"Middlewares to apply to all broker publishers/subscribers.

TYPE: Sequence[BrokerMiddleware[Any, Any]] DEFAULT: ()

routers

"Routers to apply to broker.

TYPE: Iterable[NatsRegistrator] DEFAULT: ()

security

Security options to connect broker and generate AsyncAPI server security information.

TYPE: Optional[BaseSecurity] DEFAULT: None

specification_url

AsyncAPI hardcoded server addresses. Use servers if not specified.

TYPE: str | Iterable[str] | None DEFAULT: None

protocol

AsyncAPI server protocol.

TYPE: str | None DEFAULT: 'nats'

protocol_version

AsyncAPI server protocol version.

TYPE: str | None DEFAULT: 'custom'

description

AsyncAPI server description.

TYPE: str | None DEFAULT: None

tags

AsyncAPI server tags.

TYPE: Iterable[Union[Tag, TagDict]] DEFAULT: ()

logger

User specified logger to pass into Context and log service messages.

TYPE: Optional[LoggerProto] DEFAULT: EMPTY

log_level

Service messages log level.

TYPE: int DEFAULT: INFO

apply_types

Whether to use FastDepends or not.

TYPE: bool DEFAULT: True

serializer

FastDepends-compatible serializer to validate incoming messages.

TYPE: Optional[SerializerProto] DEFAULT: EMPTY

provider

Provider for FastDepends.

TYPE: Optional[Provider] DEFAULT: None

context

Context for FastDepends.

TYPE: Optional[ContextRepo] DEFAULT: None

Source code in faststream/nats/broker/broker.py
def __init__(
    self,
    servers: str | Iterable[str] = ("nats://localhost:4222",),
    *,
    error_cb: Optional["ErrorCallback"] = None,
    disconnected_cb: Optional["Callback"] = None,
    closed_cb: Optional["Callback"] = None,
    discovered_server_cb: Optional["Callback"] = None,
    reconnected_cb: Optional["Callback"] = None,
    name: str | None = SERVICE_NAME,
    pedantic: bool = False,
    verbose: bool = False,
    allow_reconnect: bool = True,
    connect_timeout: int = DEFAULT_CONNECT_TIMEOUT,
    reconnect_time_wait: int = DEFAULT_RECONNECT_TIME_WAIT,
    max_reconnect_attempts: int = DEFAULT_MAX_RECONNECT_ATTEMPTS,
    ping_interval: int = DEFAULT_PING_INTERVAL,
    max_outstanding_pings: int = DEFAULT_MAX_OUTSTANDING_PINGS,
    dont_randomize: bool = False,
    flusher_queue_size: int = DEFAULT_MAX_FLUSHER_QUEUE_SIZE,
    no_echo: bool = False,
    tls_hostname: str | None = None,
    token: str | None = None,
    drain_timeout: int = DEFAULT_DRAIN_TIMEOUT,
    signature_cb: Optional["SignatureCallback"] = None,
    user_jwt_cb: Optional["JWTCallback"] = None,
    user_credentials: Optional["Credentials"] = None,
    nkeys_seed: str | None = None,
    nkeys_seed_str: str | None = None,
    inbox_prefix: str | bytes = DEFAULT_INBOX_PREFIX,
    pending_size: int = DEFAULT_PENDING_SIZE,
    flush_timeout: float | None = None,
    js_options: Union["JsInitOptions", dict[str, Any], None] = None,
    graceful_timeout: float | None = None,
    decoder: Optional["CustomCallable"] = None,
    parser: Optional["CustomCallable"] = None,
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    routers: Iterable[NatsRegistrator] = (),
    security: Optional["BaseSecurity"] = None,
    specification_url: str | Iterable[str] | None = None,
    protocol: str | None = "nats",
    protocol_version: str | None = "custom",
    description: str | None = None,
    tags: Iterable[Union["Tag", "TagDict"]] = (),
    logger: Optional["LoggerProto"] = EMPTY,
    log_level: int = logging.INFO,
    apply_types: bool = True,
    serializer: Optional["SerializerProto"] = EMPTY,
    provider: Optional["Provider"] = None,
    context: Optional["ContextRepo"] = None,
) -> None:
    """Initialize the NatsBroker object.

    Args:
        servers:
            NATS cluster addresses to connect.
        error_cb:
            Callback to report errors.
        disconnected_cb:
            Callback to report disconnection from NATS.
        closed_cb:
            Callback to report when client stops reconnection to NATS.
        discovered_server_cb:
            A callback to report when a new server joins the cluster.
        reconnected_cb:
            Callback to report success reconnection.
        name:
            Label the connection with name (shown in NATS monitoring).
        pedantic:
            Turn on NATS server pedantic mode that performs extra checks on the protocol.
            https://docs.nats.io/using-nats/developer/connecting/misc#turn-on-pedantic-mode
        verbose:
            Verbose mode produce more feedback about code execution.
        allow_reconnect:
            Whether recover connection automatically or not.
        connect_timeout:
            Timeout in seconds to establish connection with NATS server.
        reconnect_time_wait:
            Time in seconds to wait for reestablish connection to NATS server
        max_reconnect_attempts:
            Maximum attempts number to reconnect to NATS server.
        ping_interval:
            Interval in seconds to ping.
        max_outstanding_pings:
            Maximum number of failed pings
        dont_randomize:
            Boolean indicating should client randomly shuffle servers list for reconnection randomness.
        flusher_queue_size:
            Max count of commands awaiting to be flushed to the socket
        no_echo:
            Boolean indicating should commands be echoed.
        tls_hostname:
            Hostname for TLS.
        token:
            Auth token for NATS auth.
        drain_timeout:
            Timeout in seconds to drain subscriptions.
        signature_cb:
            A callback used to sign a nonce from the server while authenticating with nkeys.
            The user should sign the nonce and return the base64 encoded signature.
        user_jwt_cb:
            A callback used to fetch and return the account signed JWT for this user.
        user_credentials:
            A user credentials file or tuple of files.
        nkeys_seed:
            Path-like object containing nkeys seed that will be used.
        nkeys_seed_str:
            Nkeys seed to be used.
        inbox_prefix:
            Prefix for generating unique inboxes, subjects with that prefix and NUID.ß
        pending_size:
            Max size of the pending buffer for publishing commands.
        flush_timeout:
            Max duration to wait for a forced flush to occur
        js_options:
            JetStream initialization options.
        graceful_timeout:
            Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down.
        decoder:
            Custom decoder object
        parser:
            Custom parser object.
        dependencies:
            Dependencies to apply to all broker subscribers.
        middlewares:
            "Middlewares to apply to all broker publishers/subscribers.
        routers:
            "Routers to apply to broker.
        security:
            Security options to connect broker and generate AsyncAPI server security information.
        specification_url:
            AsyncAPI hardcoded server addresses. Use `servers` if not specified.
        protocol:
            AsyncAPI server protocol.
        protocol_version:
            AsyncAPI server protocol version.
        description:
            AsyncAPI server description.
        tags:
            AsyncAPI server tags.
        logger:
            User specified logger to pass into Context and log service messages.
        log_level:
            Service messages log level.
        apply_types:
            Whether to use FastDepends or not.
        serializer:
            FastDepends-compatible serializer to validate incoming messages.
        provider:
            Provider for FastDepends.
        context:
            Context for FastDepends.
    """
    secure_kwargs = parse_security(security)

    servers = [servers] if isinstance(servers, str) else list(servers)

    if specification_url is not None:
        if isinstance(specification_url, str):
            specification_url = [specification_url]
        else:
            specification_url = list(specification_url)
    else:
        specification_url = servers

    js_producer = NatsJSFastProducer(
        parser=parser,
        decoder=decoder,
    )

    producer = NatsFastProducerImpl(
        parser=parser,
        decoder=decoder,
    )

    super().__init__(
        # NATS options
        servers=servers,
        name=name,
        verbose=verbose,
        allow_reconnect=allow_reconnect,
        reconnect_time_wait=reconnect_time_wait,
        max_reconnect_attempts=max_reconnect_attempts,
        no_echo=no_echo,
        pedantic=pedantic,
        inbox_prefix=inbox_prefix,
        pending_size=pending_size,
        connect_timeout=connect_timeout,
        drain_timeout=drain_timeout,
        flush_timeout=flush_timeout,
        ping_interval=ping_interval,
        max_outstanding_pings=max_outstanding_pings,
        dont_randomize=dont_randomize,
        flusher_queue_size=flusher_queue_size,
        # security
        tls_hostname=tls_hostname,
        token=token,
        user_credentials=user_credentials,
        nkeys_seed=nkeys_seed,
        nkeys_seed_str=nkeys_seed_str,
        **secure_kwargs,
        # callbacks
        error_cb=self._log_connection_broken(error_cb),
        reconnected_cb=self._log_reconnected(reconnected_cb),
        disconnected_cb=disconnected_cb,
        closed_cb=closed_cb,
        discovered_server_cb=discovered_server_cb,
        signature_cb=signature_cb,
        user_jwt_cb=user_jwt_cb,
        # Basic args
        routers=routers,
        config=NatsBrokerConfig(
            producer=producer,
            js_producer=js_producer,
            js_options=js_options or {},
            # both args
            broker_middlewares=middlewares,
            broker_parser=parser,
            broker_decoder=decoder,
            logger=make_nats_logger_state(
                logger=logger,
                log_level=log_level,
            ),
            fd_config=FastDependsConfig(
                use_fastdepends=apply_types,
                serializer=serializer,
                provider=provider or dependency_provider,
                context=context or ContextRepo(),
            ),
            # subscriber args
            broker_dependencies=dependencies,
            graceful_timeout=graceful_timeout,
            extra_context={
                "broker": self,
            },
        ),
        specification=BrokerSpec(
            description=description,
            url=specification_url,
            protocol=protocol,
            protocol_version=protocol_version,
            security=security,
            tags=tags,
        ),
    )

middlewares property #

middlewares

context property #

context

config instance-attribute #

config = ConfigComposition(config)

routers instance-attribute #

routers = []

subscribers property #

subscribers

publishers property #

publishers

specification instance-attribute #

specification = specification

running instance-attribute #

running = False

provider property #

provider

url instance-attribute #

url

connection property #

connection

publish_batch async #

publish_batch(*messages, queue)
Source code in faststream/_internal/broker/pub_base.py
async def publish_batch(
    self,
    *messages: "SendableMessage",
    queue: str,
) -> Any:
    msg = f"{self.__class__.__name__} doesn't support publishing in batches."
    raise FeatureNotSupportedException(msg)

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of the stack.

Source code in faststream/_internal/broker/registrator.py
def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of the stack.
    """
    self.config.add_middleware(middleware)

insert_middleware #

insert_middleware(middleware)

Insert BrokerMiddleware to the start of middlewares list.

Current middleware will be used as a most outer of the stack.

Source code in faststream/_internal/broker/registrator.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
    """Insert BrokerMiddleware to the start of middlewares list.

    Current middleware will be used as a most outer of the stack.
    """
    self.config.insert_middleware(middleware)

subscriber #

subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: None = None,
    config: None = None,
    ordered_consumer: Literal[False] = False,
    idle_heartbeat: None = None,
    flow_control: None = None,
    deliver_policy: None = None,
    headers_only: None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> CoreSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: None = None,
    config: None = None,
    ordered_consumer: Literal[False] = False,
    idle_heartbeat: None = None,
    flow_control: None = None,
    deliver_policy: None = None,
    headers_only: None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int = ...,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ConcurrentCoreSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> PushStreamSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int = ...,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ConcurrentPushStreamSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Literal[True] = ...,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> PullStreamSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Literal[True] = ...,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int = ...,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ConcurrentPullStreamSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: PullSub = ...,
    kv_watch: None = None,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream] = ...,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> Union[PullStreamSubscriber, BatchPullStreamSubscriber]
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: None = None,
    config: None = None,
    ordered_consumer: Literal[False] = False,
    idle_heartbeat: None = None,
    flow_control: None = None,
    deliver_policy: None = None,
    headers_only: None = None,
    pull_sub: Literal[False] = False,
    kv_watch: Union[str, KvWatch] = ...,
    obj_watch: Literal[False] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> KeyValueWatchSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: None = None,
    config: None = None,
    ordered_consumer: Literal[False] = False,
    idle_heartbeat: None = None,
    flow_control: None = None,
    deliver_policy: None = None,
    headers_only: None = None,
    pull_sub: Literal[False] = False,
    kv_watch: None = None,
    obj_watch: Union[Literal[True], ObjWatch] = ...,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: None = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> ObjStoreWatchSubscriber
subscriber(
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: Optional[ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional[DeliverPolicy] = None,
    headers_only: bool | None = None,
    pull_sub: Union[bool, PullSub] = False,
    kv_watch: Union[str, KvWatch, None] = None,
    obj_watch: Union[bool, ObjWatch] = False,
    inbox_prefix: bytes = INBOX_PREFIX,
    stream: Union[str, JStream, None] = None,
    dependencies: Iterable[Dependant] = (),
    parser: Optional[CustomCallable] = None,
    decoder: Optional[CustomCallable] = None,
    persistent: bool = True,
    ack_first: bool = EMPTY,
    middlewares: Sequence[SubscriberMiddleware[Any]] = (),
    no_ack: bool = EMPTY,
    max_workers: int | None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> LogicSubscriber[Any]
subscriber(
    subject="",
    queue="",
    pending_msgs_limit=None,
    pending_bytes_limit=None,
    max_msgs=0,
    durable=None,
    config=None,
    ordered_consumer=False,
    idle_heartbeat=None,
    flow_control=None,
    deliver_policy=None,
    headers_only=None,
    pull_sub=False,
    kv_watch=None,
    obj_watch=False,
    inbox_prefix=INBOX_PREFIX,
    stream=None,
    dependencies=(),
    parser=None,
    decoder=None,
    persistent=True,
    ack_first=EMPTY,
    middlewares=(),
    no_ack=EMPTY,
    max_workers=None,
    ack_policy=EMPTY,
    no_reply=False,
    title=None,
    description=None,
    include_in_schema=True,
)

Creates NATS subscriber object.

You can use it as a handler decorator @broker.subscriber(...).

PARAMETER DESCRIPTION
subject

NATS subject to subscribe.

TYPE: str DEFAULT: ''

queue

Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS server.

TYPE: str DEFAULT: ''

pending_msgs_limit

Limit of messages, considered by NATS server as possible to be delivered to the client without been answered. In case of NATS Core, if that limits exceeds, you will receive NATS 'Slow Consumer' error. That's literally means that your worker can't handle the whole load. In case of NATS JetStream, you will no longer receive messages until some of delivered messages will be acked in any way.

TYPE: int | None DEFAULT: None

pending_bytes_limit

The number of bytes, considered by NATS server as possible to be delivered to the client without been answered. In case of NATS Core, if that limit exceeds, you will receive NATS 'Slow Consumer' error. That's literally means that your worker can't handle the whole load. In case of NATS JetStream, you will no longer receive messages until some of delivered messages will be acked in any way.

TYPE: int | None DEFAULT: None

max_msgs

Consuming messages limiter. Automatically disconnect if reached.

TYPE: int DEFAULT: 0

durable

Name of the durable consumer to which the the subscription should be bound.

TYPE: str | None DEFAULT: None

config

Configuration of JetStream consumer to be subscribed with.

TYPE: Optional[ConsumerConfig] DEFAULT: None

ordered_consumer

Enable ordered consumer mode.

TYPE: bool DEFAULT: False

idle_heartbeat

Enable Heartbeats for a consumer to detect failures.

TYPE: float | None DEFAULT: None

flow_control

Enable Flow Control for a consumer.

TYPE: bool | None DEFAULT: None

deliver_policy

Deliver Policy to be used for subscription.

TYPE: Optional[DeliverPolicy] DEFAULT: None

headers_only

Should be message delivered without payload, only headers and metadata.

TYPE: bool | None DEFAULT: None

pull_sub

NATS Pull consumer parameters container. Should be used with stream only.

TYPE: Union[bool, PullSub] DEFAULT: False

kv_watch

KeyValue watch parameters container.

TYPE: Union[str, KvWatch, None] DEFAULT: None

obj_watch

ObjectStore watch parameters container.

TYPE: Union[bool, ObjWatch] DEFAULT: False

inbox_prefix

Prefix for generating unique inboxes, subjects with that prefix and NUID.

TYPE: bytes DEFAULT: INBOX_PREFIX

ack_first

Whether to ack message at start of consuming or not.

TYPE: bool DEFAULT: EMPTY

stream

Subscribe to NATS Stream with subject filter.

TYPE: Union[str, JStream, None] DEFAULT: None

dependencies

Dependencies list ([Dependant(),]) to apply to the subscriber.

TYPE: Iterable[Dependant] DEFAULT: ()

parser

Parser to map original nats-py Msg to FastStream one.

TYPE: Optional[CustomCallable] DEFAULT: None

decoder

Function to decode FastStream msg bytes body to python objects.

TYPE: Optional[CustomCallable] DEFAULT: None

middlewares

Subscriber middlewares to wrap incoming message processing.

TYPE: Sequence[SubscriberMiddleware[Any]] DEFAULT: ()

max_workers

Number of workers to process messages concurrently.

TYPE: int | None DEFAULT: None

no_ack

Whether to disable FastStream auto acknowledgement logic or not.

TYPE: bool DEFAULT: EMPTY

ack_policy

Whether to ack message at start of consuming or not.

TYPE: AckPolicy DEFAULT: EMPTY

no_reply

Whether to disable FastStream RPC and Reply To auto responses or not.

TYPE: bool DEFAULT: False

title

AsyncAPI subscriber object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI subscriber object description. Uses decorated docstring as default.

TYPE: str | None DEFAULT: None

include_in_schema

Whetever to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the subscriber persistent or not.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
LogicSubscriber[Any]

LogicSubscriber[Any]: The created subscriber object.

Source code in faststream/nats/broker/registrator.py
@override
def subscriber(
    self,
    subject: str = "",
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    # Core arguments
    max_msgs: int = 0,
    # JS arguments
    durable: str | None = None,
    config: Optional["api.ConsumerConfig"] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool | None = None,
    deliver_policy: Optional["api.DeliverPolicy"] = None,
    headers_only: bool | None = None,
    # pull arguments
    pull_sub: Union[bool, "PullSub"] = False,
    kv_watch: Union[str, "KvWatch", None] = None,
    obj_watch: Union[bool, "ObjWatch"] = False,
    inbox_prefix: bytes = api.INBOX_PREFIX,
    # custom
    stream: Union[str, "JStream", None] = None,
    # broker arguments
    dependencies: Iterable["Dependant"] = (),
    parser: Optional["CustomCallable"] = None,
    decoder: Optional["CustomCallable"] = None,
    persistent: bool = True,
    ack_first: Annotated[
        bool,
        deprecated(
            "This option is deprecated and will be removed in 0.7.0 release. "
            "Please, use `ack_policy=AckPolicy.ACK_FIRST` instead."
        ),
    ] = EMPTY,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[Any]"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    no_ack: Annotated[
        bool,
        deprecated(
            "This option was deprecated in 0.6.0 to prior to **ack_policy=AckPolicy.MANUAL**. "
            "Scheduled to remove in 0.7.0",
        ),
    ] = EMPTY,
    max_workers: int | None = None,
    ack_policy: AckPolicy = EMPTY,
    no_reply: bool = False,
    # AsyncAPI information
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
) -> "LogicSubscriber[Any]":
    """Creates NATS subscriber object.

    You can use it as a handler decorator `@broker.subscriber(...)`.

    Args:
        subject: NATS subject to subscribe.
        queue: Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS
            server.
        pending_msgs_limit: Limit of messages, considered by NATS server as possible to be delivered to the
            client without been answered. In case of NATS Core, if that limits exceeds, you will receive NATS
            'Slow Consumer' error. That's literally means that your worker can't handle the whole load. In case of
            NATS JetStream, you will no longer receive messages until some of delivered messages will be acked in
            any way.
        pending_bytes_limit: The number of bytes, considered by NATS server as possible to be delivered to the
            client without been answered. In case of NATS Core, if that limit exceeds, you will receive NATS 'Slow
            Consumer' error. That's literally means that your worker can't handle the whole load. In case of NATS
            JetStream, you will no longer receive messages until some of delivered messages will be acked in any
            way.
        max_msgs: Consuming messages limiter. Automatically disconnect if reached.
        durable: Name of the durable consumer to which the the subscription should be bound.
        config: Configuration of JetStream consumer to be subscribed with.
        ordered_consumer: Enable ordered consumer mode.
        idle_heartbeat: Enable Heartbeats for a consumer to detect failures.
        flow_control: Enable Flow Control for a consumer.
        deliver_policy: Deliver Policy to be used for subscription.
        headers_only: Should be message delivered without payload, only headers and metadata.
        pull_sub: NATS Pull consumer parameters container. Should be used with `stream` only.
        kv_watch: KeyValue watch parameters container.
        obj_watch: ObjectStore watch parameters container.
        inbox_prefix: Prefix for generating unique inboxes, subjects with that prefix and NUID.
        ack_first: Whether to `ack` message at start of consuming or not.
        stream: Subscribe to NATS Stream with `subject` filter.
        dependencies: Dependencies list (`[Dependant(),]`) to apply to the subscriber.
        parser: Parser to map original **nats-py** Msg to FastStream one.
        decoder: Function to decode FastStream msg bytes body to python objects.
        middlewares: Subscriber middlewares to wrap incoming message processing.
        max_workers: Number of workers to process messages concurrently.
        no_ack: Whether to disable **FastStream** auto acknowledgement logic or not.
        ack_policy: Whether to `ack` message at start of consuming or not.
        no_reply: Whether to disable **FastStream** RPC and Reply To auto responses or not.
        title: AsyncAPI subscriber object title.
        description: AsyncAPI subscriber object description. Uses decorated docstring as default.
        include_in_schema: Whetever to include operation in AsyncAPI schema or not.
        persistent: Whether to make the subscriber persistent or not.

    Returns:
        LogicSubscriber[Any]: The created subscriber object.
    """
    stream = self._stream_builder.create(stream)

    subscriber = create_subscriber(
        subject=subject,
        queue=queue,
        stream=stream,
        pull_sub=PullSub.validate(pull_sub),
        kv_watch=KvWatch.validate(kv_watch),
        obj_watch=ObjWatch.validate(obj_watch),
        max_workers=max_workers or 1,
        # extra args
        pending_msgs_limit=pending_msgs_limit,
        pending_bytes_limit=pending_bytes_limit,
        max_msgs=max_msgs,
        durable=durable,
        config=config,
        ordered_consumer=ordered_consumer,
        idle_heartbeat=idle_heartbeat,
        flow_control=flow_control,
        deliver_policy=deliver_policy,
        headers_only=headers_only,
        inbox_prefix=inbox_prefix,
        ack_first=ack_first,
        # subscriber args
        ack_policy=ack_policy,
        no_ack=no_ack,
        no_reply=no_reply,
        broker_config=cast("NatsBrokerConfig", self.config),
        # AsyncAPI
        title_=title,
        description_=description,
        include_in_schema=include_in_schema,
    )

    super().subscriber(subscriber, persistent=persistent)

    self._stream_builder.add_subject(stream, subscriber.subject)

    return subscriber.add_call(
        parser_=parser,
        decoder_=decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(
    subject,
    *,
    headers=None,
    reply_to="",
    stream=None,
    timeout=None,
    persistent=True,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

PARAMETER DESCRIPTION
subject

NATS subject to send message.

TYPE: str

headers

Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway. Can be overridden by publish.headers if specified.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

NATS subject name to send response.

TYPE: str DEFAULT: ''

stream

This option validates that the target subject is in presented stream. Can be omitted without any effect.

TYPE: Union[str, JStream, None] DEFAULT: None

timeout

Timeout to send message to NATS.

TYPE: float | None DEFAULT: None

middlewares

Publisher middlewares to wrap outgoing messages.

TYPE: Sequence[PublisherMiddleware] DEFAULT: ()

title

AsyncAPI publisher object title.

TYPE: str | None DEFAULT: None

description

AsyncAPI publisher object description.

TYPE: str | None DEFAULT: None

schema

AsyncAPI publishing message type. Should be any python-native object annotation or pydantic.BaseModel.

TYPE: Any | None DEFAULT: None

include_in_schema

Whetever to include operation in AsyncAPI schema or not.

TYPE: bool DEFAULT: True

persistent

Whether to make the publisher persistent or not.

TYPE: bool DEFAULT: True

Source code in faststream/nats/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    subject: str,
    *,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    stream: Union[str, "JStream", None] = None,
    timeout: float | None = None,
    persistent: bool = True,
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        deprecated(
            "This option was deprecated in 0.6.0. Use router-level middlewares instead."
            "Scheduled to remove in 0.7.0",
        ),
    ] = (),
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> "LogicPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.

    Args:
        subject: NATS subject to send message.
        headers: Message headers to store metainformation.
            content-type and correlation_id will be set automatically by framework anyway.
            Can be overridden by `publish.headers` if specified.
        reply_to: NATS subject name to send response.
        stream: This option validates that the target `subject` is in presented stream.
            Can be omitted without any effect.
        timeout: Timeout to send message to NATS.
        middlewares: Publisher middlewares to wrap outgoing messages.
        title: AsyncAPI publisher object title.
        description: AsyncAPI publisher object description.
        schema: AsyncAPI publishing message type.
            Should be any python-native object annotation or `pydantic.BaseModel`.
        include_in_schema: Whetever to include operation in AsyncAPI schema or not.
        persistent: Whether to make the publisher persistent or not.
    """
    stream = self._stream_builder.create(stream)

    publisher = create_publisher(
        subject=subject,
        headers=headers,
        # Core
        reply_to=reply_to,
        # JS
        timeout=timeout,
        stream=stream,
        # Specific
        broker_config=cast("NatsBrokerConfig", self.config),
        middlewares=middlewares,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=include_in_schema,
    )

    super().publisher(publisher, persistent=persistent)

    self._stream_builder.add_subject(stream, publisher.subject)

    return publisher

include_router #

include_router(
    router,
    *,
    prefix="",
    dependencies=(),
    middlewares=(),
    include_in_schema=None,
)
Source code in faststream/nats/broker/registrator.py
@override
def include_router(  # type: ignore[override]
    self,
    router: "NatsRegistrator",
    *,
    prefix: str = "",
    dependencies: Iterable["Dependant"] = (),
    middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (),
    include_in_schema: bool | None = None,
) -> None:
    if not isinstance(router, NatsRegistrator):
        msg = (
            f"Router must be an instance of NatsRegistrator, "
            f"got {type(router).__name__} instead"
        )
        raise SetupError(msg)

    for stream, subjects in router._stream_builder.objects.values():
        for subj in subjects:
            router_subject = f"{self.config.prefix}{prefix}{subj}"
            self._stream_builder.add_subject(stream, router_subject)

    super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/_internal/broker/registrator.py
def include_routers(
    self,
    *routers: "Registrator[MsgType, Any]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

connect async #

connect()

Connect to a remote server.

Source code in faststream/_internal/broker/broker.py
async def connect(self) -> ConnectionType:
    """Connect to a remote server."""
    if self._connection is None:
        self._connection = await self._connect()
        self._setup_logger()

    return self._connection

stop async #

stop(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/nats/broker/broker.py
async def stop(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await super().stop(exc_type, exc_val, exc_tb)

    if self._connection is not None:
        await self._connection.drain()
        self._connection = None

    self.config.disconnect()

close async #

close(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/nats/broker/broker.py
@deprecated(
    "Deprecated in **FastStream 0.5.44**. "
    "Please, use `stop` method instead. "
    "Method `close` will be removed in **FastStream 0.7.0**.",
    category=DeprecationWarning,
    stacklevel=1,
)
async def close(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    await self.stop(exc_type, exc_val, exc_tb)

start async #

start()

Connect broker to NATS cluster and startup all subscribers.

Source code in faststream/nats/broker/broker.py
async def start(self) -> None:
    """Connect broker to NATS cluster and startup all subscribers."""
    await self.connect()

    stream_context = self.config.connection_state.stream

    for stream, subjects in filter(
        lambda x: x[0].declare,
        self._stream_builder.objects.values(),
    ):
        try:
            await stream_context.add_stream(
                config=stream.config,
                subjects=list(subjects),
            )

        except BadRequestError as e:  # noqa: PERF203
            self._setup_logger()

            log_context = LogicSubscriber.build_log_context(
                message=None,
                subject="",
                queue="",
                stream=stream.name,
            )

            logger_state = self.config.logger

            if (
                e.description
                == "stream name already in use with a different configuration"
            ):
                old_config = (await stream_context.stream_info(stream.name)).config

                logger_state.log(str(e), logging.WARNING, log_context)

                for subject in old_config.subjects or ():
                    subjects.append(subject)

                stream.config.subjects = list(subjects)
                await stream_context.update_stream(config=stream.config)

            else:  # pragma: no cover
                logger_state.log(
                    str(e),
                    logging.ERROR,
                    log_context,
                    exc_info=e,
                )

        finally:
            # prevent from double declaration
            stream.declare = False

    await super().start()

publish async #

publish(
    message: SendableMessage,
    subject: str,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    stream: None = None,
    timeout: float | None = None,
) -> None
publish(
    message: SendableMessage,
    subject: str,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    stream: str | None = None,
    timeout: float | None = None,
) -> PubAck
publish(
    message,
    subject,
    headers=None,
    reply_to="",
    correlation_id=None,
    stream=None,
    timeout=None,
)

Publish message directly.

This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks applications or to publish messages from time to time.

Please, use @broker.publisher(...) or broker.publisher(...).publish(...) instead in a regular way.

PARAMETER DESCRIPTION
message

Message body to send. Can be any encodable object (native python types or pydantic.BaseModel).

TYPE: SendableMessage

subject

NATS subject to send message.

TYPE: str

headers

Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

NATS subject name to send response.

TYPE: str DEFAULT: ''

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

stream

This option validates that the target subject is in presented stream. Can be omitted without any effect if you doesn't want PubAck frame.

TYPE: str | None DEFAULT: None

timeout

Timeout to send message to NATS.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
Optional[PubAck]

None if you publishes a regular message.

Optional[PubAck]

faststream.nats.PubAck if you publishes a message to stream.

Source code in faststream/nats/broker/broker.py
@override
async def publish(
    self,
    message: "SendableMessage",
    subject: str,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    stream: str | None = None,
    timeout: float | None = None,
) -> Optional["PubAck"]:
    """Publish message directly.

    This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
    applications or to publish messages from time to time.

    Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.

    Args:
        message:
            Message body to send.
            Can be any encodable object (native python types or `pydantic.BaseModel`).
        subject:
            NATS subject to send message.
        headers:
            Message headers to store metainformation.
            **content-type** and **correlation_id** will be set automatically by framework anyway.
        reply_to:
            NATS subject name to send response.
        correlation_id:
            Manual message **correlation_id** setter.
            **correlation_id** is a useful option to trace messages.
        stream:
            This option validates that the target subject is in presented stream.
            Can be omitted without any effect if you doesn't want PubAck frame.
        timeout:
            Timeout to send message to NATS.

    Returns:
        `None` if you publishes a regular message.
        `faststream.nats.PubAck` if you publishes a message to stream.
    """
    cmd = NatsPublishCommand(
        message=message,
        correlation_id=correlation_id or gen_cor_id(),
        subject=subject,
        headers=headers,
        reply_to=reply_to,
        stream=stream,
        timeout=timeout or 0.5,
        _publish_type=PublishType.PUBLISH,
    )

    result: PubAck | None
    if stream:
        result = await super()._basic_publish(cmd, producer=self.config.js_producer)
    else:
        result = await super()._basic_publish(cmd, producer=self.config.producer)
    return result

request async #

request(
    message,
    subject,
    headers=None,
    correlation_id=None,
    stream=None,
    timeout=0.5,
)

Make a synchronous request to outer subscriber.

If out subscriber listens subject by stream, you should setup the same stream explicitly. Another way you will reseave confirmation frame as a response.

PARAMETER DESCRIPTION
message

Message body to send. Can be any encodable object (native python types or pydantic.BaseModel).

TYPE: SendableMessage

subject

NATS subject to send message.

TYPE: str

headers

Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

NATS subject name to send response.

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

stream

JetStream name. This option is required if your target subscriber listens for events using JetStream.

TYPE: str | None DEFAULT: None

timeout

Timeout to send message to NATS.

TYPE: float DEFAULT: 0.5

RETURNS DESCRIPTION
NatsMessage

faststream.nats.message.NatsMessage object as an outer subscriber response.

Source code in faststream/nats/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    subject: str,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    stream: str | None = None,
    timeout: float = 0.5,
) -> "NatsMessage":
    """Make a synchronous request to outer subscriber.

    If out subscriber listens subject by stream, you should setup the same **stream** explicitly.
    Another way you will reseave confirmation frame as a response.

    Args:
        message:
            Message body to send.
            Can be any encodable object (native python types or `pydantic.BaseModel`).
        subject:
            NATS subject to send message.
        headers:
            Message headers to store metainformation.
            **content-type** and **correlation_id** will be set automatically by framework anyway.
        reply_to:
            NATS subject name to send response.
        correlation_id:
            Manual message **correlation_id** setter.
            **correlation_id** is a useful option to trace messages.
        stream:
            JetStream name. This option is required if your target subscriber listens for events using JetStream.
        timeout:
            Timeout to send message to NATS.

    Returns:
        `faststream.nats.message.NatsMessage` object as an outer subscriber response.
    """
    cmd = NatsPublishCommand(
        message=message,
        correlation_id=correlation_id or gen_cor_id(),
        subject=subject,
        headers=headers,
        timeout=timeout,
        stream=stream,
        _publish_type=PublishType.REQUEST,
    )

    producer = self.config.js_producer if stream is not None else self.config.producer

    msg: NatsMessage = await super()._basic_request(cmd, producer=producer)
    return msg

key_value async #

key_value(
    bucket,
    *,
    description=None,
    max_value_size=None,
    history=1,
    ttl=None,
    max_bytes=None,
    storage=None,
    replicas=1,
    placement=None,
    republish=None,
    direct=None,
    declare=True,
)
Source code in faststream/nats/broker/broker.py
async def key_value(
    self,
    bucket: str,
    *,
    description: str | None = None,
    max_value_size: int | None = None,
    history: int = 1,
    ttl: float | None = None,  # in seconds
    max_bytes: int | None = None,
    storage: Optional["StorageType"] = None,
    replicas: int = 1,
    placement: Optional["Placement"] = None,
    republish: Optional["RePublish"] = None,
    direct: bool | None = None,
    # custom
    declare: bool = True,
) -> "KeyValue":
    kv_declarer = cast("KVBucketDeclarer", self.config.kv_declarer)
    return await kv_declarer.create_key_value(
        bucket=bucket,
        description=description,
        max_value_size=max_value_size,
        history=history,
        ttl=ttl,
        max_bytes=max_bytes,
        storage=storage,
        replicas=replicas,
        placement=placement,
        republish=republish,
        direct=direct,
        declare=declare,
    )

object_storage async #

object_storage(
    bucket,
    *,
    description=None,
    ttl=None,
    max_bytes=None,
    storage=None,
    replicas=1,
    placement=None,
    declare=True,
)
Source code in faststream/nats/broker/broker.py
async def object_storage(
    self,
    bucket: str,
    *,
    description: str | None = None,
    ttl: float | None = None,
    max_bytes: int | None = None,
    storage: Optional["StorageType"] = None,
    replicas: int = 1,
    placement: Optional["Placement"] = None,
    declare: bool = True,
) -> "ObjectStore":
    os_declarer = cast("OSBucketDeclarer", self.config.os_declarer)
    return await os_declarer.create_object_store(
        bucket=bucket,
        description=description,
        ttl=ttl,
        max_bytes=max_bytes,
        storage=storage,
        replicas=replicas,
        placement=placement,
        declare=declare,
    )

new_inbox async #

new_inbox()

Return a unique inbox that can be used for NATS requests or subscriptions.

The inbox prefix can be customised by passing inbox_prefix when creating your NatsBroker.

This method calls nats.aio.client.Client.new_inbox [1] under the hood.

[1] https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.new_inbox

Source code in faststream/nats/broker/broker.py
async def new_inbox(self) -> str:
    """Return a unique inbox that can be used for NATS requests or subscriptions.

    The inbox prefix can be customised by passing `inbox_prefix` when creating your `NatsBroker`.

    This method calls `nats.aio.client.Client.new_inbox` [1] under the hood.

    [1] https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.new_inbox
    """
    return self.connection.new_inbox()

ping async #

ping(timeout)
Source code in faststream/nats/broker/broker.py
@override
async def ping(self, timeout: float | None) -> bool:
    sleep_time = (timeout or 10) / 10

    with anyio.move_on_after(timeout) as cancel_scope:
        if self._connection is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            if self._connection.is_connected:
                return True

            await anyio.sleep(sleep_time)

    return False