NatsRouter
faststream.nats.broker.NatsRouter #
NatsRouter(
prefix: str = "",
handlers: Iterable[NatsRoute] = (),
*,
dependencies: Iterable[Dependant] = (),
middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
routers: Iterable[NatsRegistrator] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
include_in_schema: bool | None = None,
ack_policy: AckPolicy = EMPTY,
)
Bases: NatsRegistrator, BrokerRouter[Msg]
Includable to NatsBroker router.
Initialize the NatsRouter instance.
| PARAMETER | DESCRIPTION |
|---|---|
prefix | String prefix to add to all subscribers subjects. Defaults to "". TYPE: |
handlers | Route object to include. Defaults to (). TYPE: |
dependencies | Dependencies list ( TYPE: |
middlewares | Router middlewares to apply to all routers' publishers/subscribers. Defaults to (). TYPE: |
routers | Routers to apply to broker. Defaults to (). TYPE: |
parser | Parser to map original IncomingMessage Msg to FastStream one. Defaults to None. TYPE: |
decoder | Function to decode FastStream msg bytes body to python objects. Defaults to None. TYPE: |
include_in_schema | Whetever to include operation in AsyncAPI schema or not. TYPE: |
ack_policy | Default acknowledgement policy for all subscribers in this router. Can be overridden at the subscriber level. Defaults to None. TYPE: |
Source code in faststream/nats/broker/router.py
config instance-attribute #
add_middleware #
add_middleware(
middleware: BrokerMiddleware[Any, Any],
) -> None
Append BrokerMiddleware to the end of middlewares list.
Current middleware will be used as a most inner of the stack.
Source code in faststream/_internal/broker/registrator.py
insert_middleware #
insert_middleware(
middleware: BrokerMiddleware[Any, Any],
) -> None
Insert BrokerMiddleware to the start of middlewares list.
Current middleware will be used as a most outer of the stack.
Source code in faststream/_internal/broker/registrator.py
subscriber #
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: None = None,
config: None = None,
ordered_consumer: Literal[False] = False,
idle_heartbeat: None = None,
flow_control: None = None,
deliver_policy: None = None,
headers_only: None = None,
pull_sub: Literal[False] = False,
kv_watch: None = None,
obj_watch: Literal[False] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: None = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> CoreSubscriber
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: None = None,
config: None = None,
ordered_consumer: Literal[False] = False,
idle_heartbeat: None = None,
flow_control: None = None,
deliver_policy: None = None,
headers_only: None = None,
pull_sub: Literal[False] = False,
kv_watch: None = None,
obj_watch: Literal[False] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: int = ...,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> ConcurrentCoreSubscriber
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: str | None = None,
config: Optional[ConsumerConfig] = None,
ordered_consumer: bool = False,
idle_heartbeat: float | None = None,
flow_control: bool | None = None,
deliver_policy: Optional[DeliverPolicy] = None,
headers_only: bool | None = None,
pull_sub: Literal[False] = False,
kv_watch: None = None,
obj_watch: Literal[False] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: Union[str, JStream] = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: None = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> PushStreamSubscriber
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: str | None = None,
config: Optional[ConsumerConfig] = None,
ordered_consumer: bool = False,
idle_heartbeat: float | None = None,
flow_control: bool | None = None,
deliver_policy: Optional[DeliverPolicy] = None,
headers_only: bool | None = None,
pull_sub: Literal[False] = False,
kv_watch: None = None,
obj_watch: Literal[False] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: Union[str, JStream] = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: int = ...,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> ConcurrentPushStreamSubscriber
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: str | None = None,
config: Optional[ConsumerConfig] = None,
ordered_consumer: bool = False,
idle_heartbeat: float | None = None,
flow_control: bool | None = None,
deliver_policy: Optional[DeliverPolicy] = None,
headers_only: bool | None = None,
pull_sub: Literal[True] = ...,
kv_watch: None = None,
obj_watch: Literal[False] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: Union[str, JStream] = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: None = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> PullStreamSubscriber
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: str | None = None,
config: Optional[ConsumerConfig] = None,
ordered_consumer: bool = False,
idle_heartbeat: float | None = None,
flow_control: bool | None = None,
deliver_policy: Optional[DeliverPolicy] = None,
headers_only: bool | None = None,
pull_sub: Literal[True] = ...,
kv_watch: None = None,
obj_watch: Literal[False] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: Union[str, JStream] = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: int = ...,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> ConcurrentPullStreamSubscriber
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: str | None = None,
config: Optional[ConsumerConfig] = None,
ordered_consumer: bool = False,
idle_heartbeat: float | None = None,
flow_control: bool | None = None,
deliver_policy: Optional[DeliverPolicy] = None,
headers_only: bool | None = None,
pull_sub: PullSub = ...,
kv_watch: None = None,
obj_watch: Literal[False] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: Union[str, JStream] = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: None = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> Union[PullStreamSubscriber, BatchPullStreamSubscriber]
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: None = None,
config: None = None,
ordered_consumer: Literal[False] = False,
idle_heartbeat: None = None,
flow_control: None = None,
deliver_policy: None = None,
headers_only: None = None,
pull_sub: Literal[False] = False,
kv_watch: Union[str, KvWatch] = ...,
obj_watch: Literal[False] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: None = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> KeyValueWatchSubscriber
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: None = None,
config: None = None,
ordered_consumer: Literal[False] = False,
idle_heartbeat: None = None,
flow_control: None = None,
deliver_policy: None = None,
headers_only: None = None,
pull_sub: Literal[False] = False,
kv_watch: None = None,
obj_watch: Union[Literal[True], ObjWatch] = ...,
inbox_prefix: bytes = INBOX_PREFIX,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: None = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> ObjStoreWatchSubscriber
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: str | None = None,
config: Optional[ConsumerConfig] = None,
ordered_consumer: bool = False,
idle_heartbeat: float | None = None,
flow_control: bool | None = None,
deliver_policy: Optional[DeliverPolicy] = None,
headers_only: bool | None = None,
pull_sub: Union[bool, PullSub] = False,
kv_watch: Union[str, KvWatch, None] = None,
obj_watch: Union[bool, ObjWatch] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: Union[str, JStream, None] = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: int | None = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> LogicSubscriber[Any]
subscriber(
subject: str = "",
queue: str = "",
pending_msgs_limit: int | None = None,
pending_bytes_limit: int | None = None,
max_msgs: int = 0,
durable: str | None = None,
config: Optional[ConsumerConfig] = None,
ordered_consumer: bool = False,
idle_heartbeat: float | None = None,
flow_control: bool | None = None,
deliver_policy: Optional[DeliverPolicy] = None,
headers_only: bool | None = None,
pull_sub: Union[bool, PullSub] = False,
kv_watch: Union[str, KvWatch, None] = None,
obj_watch: Union[bool, ObjWatch] = False,
inbox_prefix: bytes = INBOX_PREFIX,
stream: Union[str, JStream, None] = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
persistent: bool = True,
max_workers: int | None = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> LogicSubscriber[Any]
Creates NATS subscriber object.
You can use it as a handler decorator @broker.subscriber(...).
| PARAMETER | DESCRIPTION |
|---|---|
subject | NATS subject to subscribe. TYPE: |
queue | Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS server. TYPE: |
pending_msgs_limit | Limit of messages, considered by NATS server as possible to be delivered to the client without been answered. In case of NATS Core, if that limits exceeds, you will receive NATS 'Slow Consumer' error. That's literally means that your worker can't handle the whole load. In case of NATS JetStream, you will no longer receive messages until some of delivered messages will be acked in any way. TYPE: |
pending_bytes_limit | The number of bytes, considered by NATS server as possible to be delivered to the client without been answered. In case of NATS Core, if that limit exceeds, you will receive NATS 'Slow Consumer' error. That's literally means that your worker can't handle the whole load. In case of NATS JetStream, you will no longer receive messages until some of delivered messages will be acked in any way. TYPE: |
max_msgs | Consuming messages limiter. Automatically disconnect if reached. TYPE: |
durable | Name of the durable consumer to which the the subscription should be bound. TYPE: |
config | Configuration of JetStream consumer to be subscribed with. TYPE: |
ordered_consumer | Enable ordered consumer mode. TYPE: |
idle_heartbeat | Enable Heartbeats for a consumer to detect failures. TYPE: |
flow_control | Enable Flow Control for a consumer. TYPE: |
deliver_policy | Deliver Policy to be used for subscription. TYPE: |
headers_only | Should be message delivered without payload, only headers and metadata. TYPE: |
pull_sub | NATS Pull consumer parameters container. Should be used with TYPE: |
kv_watch | KeyValue watch parameters container. TYPE: |
obj_watch | ObjectStore watch parameters container. TYPE: |
inbox_prefix | Prefix for generating unique inboxes, subjects with that prefix and NUID. TYPE: |
stream | Subscribe to NATS Stream with TYPE: |
dependencies | Dependencies list ( TYPE: |
parser | Parser to map original nats-py Msg to FastStream one. TYPE: |
decoder | Function to decode FastStream msg bytes body to python objects. TYPE: |
max_workers | Number of workers to process messages concurrently. TYPE: |
ack_policy | Whether to TYPE: |
no_reply | Whether to disable FastStream RPC and Reply To auto responses or not. TYPE: |
title | AsyncAPI subscriber object title. TYPE: |
description | AsyncAPI subscriber object description. Uses decorated docstring as default. TYPE: |
include_in_schema | Whetever to include operation in AsyncAPI schema or not. TYPE: |
persistent | Whether to make the subscriber persistent or not. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
LogicSubscriber[Any] | LogicSubscriber[Any]: The created subscriber object. |
Source code in faststream/nats/broker/registrator.py
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 | |
publisher #
publisher(
subject: str,
*,
headers: dict[str, str] | None = None,
reply_to: str = "",
stream: Union[str, JStream, None] = None,
timeout: float | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> LogicPublisher
Creates long-living and AsyncAPI-documented publisher object.
You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.
Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).
| PARAMETER | DESCRIPTION |
|---|---|
subject | NATS subject to send message. TYPE: |
headers | Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway. Can be overridden by TYPE: |
reply_to | NATS subject name to send response. TYPE: |
stream | This option validates that the target TYPE: |
timeout | Timeout to send message to NATS. TYPE: |
title | AsyncAPI publisher object title. TYPE: |
description | AsyncAPI publisher object description. TYPE: |
schema | AsyncAPI publishing message type. Should be any python-native object annotation or TYPE: |
include_in_schema | Whetever to include operation in AsyncAPI schema or not. TYPE: |
persistent | Whether to make the publisher persistent or not. TYPE: |
Source code in faststream/nats/broker/registrator.py
include_router #
include_router(
router: NatsRegistrator,
*,
prefix: str = "",
dependencies: Iterable[Dependant] = (),
middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
include_in_schema: bool | None = None,
) -> None