Skip to content

NatsBrokerConfig

faststream.nats.configs.broker.NatsBrokerConfig dataclass #

NatsBrokerConfig(
    *,
    prefix="",
    include_in_schema=True,
    broker_middlewares=(),
    broker_parser=None,
    broker_decoder=None,
    producer=FakeNatsFastProducer(),
    logger=LoggerState(),
    fd_config=FastDependsConfig(),
    broker_dependencies=(),
    graceful_timeout=None,
    extra_context=dict(),
    js_options=dict(),
    js_producer=FakeNatsFastProducer(),
    connection_state=BrokerState(),
    kv_declarer=KVBucketDeclarer(),
    os_declarer=OSBucketDeclarer(),
)

Bases: BrokerConfig

js_options class-attribute instance-attribute #

js_options = field(default_factory=dict)

producer class-attribute instance-attribute #

producer = field(default_factory=FakeNatsFastProducer)

js_producer class-attribute instance-attribute #

js_producer = field(default_factory=FakeNatsFastProducer)

connection_state class-attribute instance-attribute #

connection_state = field(default_factory=BrokerState)

kv_declarer class-attribute instance-attribute #

kv_declarer = field(default_factory=KVBucketDeclarer)

os_declarer class-attribute instance-attribute #

os_declarer = field(default_factory=OSBucketDeclarer)

prefix class-attribute instance-attribute #

prefix = ''

include_in_schema class-attribute instance-attribute #

include_in_schema = True

broker_middlewares class-attribute instance-attribute #

broker_middlewares = ()

broker_parser class-attribute instance-attribute #

broker_parser = None

broker_decoder class-attribute instance-attribute #

broker_decoder = None

logger class-attribute instance-attribute #

logger = field(default_factory=LoggerState)

fd_config class-attribute instance-attribute #

fd_config = field(default_factory=FastDependsConfig)

broker_dependencies class-attribute instance-attribute #

broker_dependencies = ()

graceful_timeout class-attribute instance-attribute #

graceful_timeout = None

extra_context class-attribute instance-attribute #

extra_context = field(default_factory=dict)

connect #

connect(connection)
Source code in faststream/nats/configs/broker.py
def connect(self, connection: "Client") -> None:
    stream = connection.jetstream(**self.js_options)

    self.producer.connect(connection, serializer=self.fd_config._serializer)

    self.js_producer.connect(stream, serializer=self.fd_config._serializer)
    self.kv_declarer.connect(stream)
    self.os_declarer.connect(stream)

    self.connection_state.connect(connection, stream)

disconnect #

disconnect()
Source code in faststream/nats/configs/broker.py
def disconnect(self) -> None:
    self.producer.disconnect()
    self.js_producer.disconnect()
    self.kv_declarer.disconnect()
    self.os_declarer.disconnect()

    self.connection_state.disconnect()

add_middleware #

add_middleware(middleware)
Source code in faststream/_internal/configs/broker.py
def add_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
    self.broker_middlewares = (*self.broker_middlewares, middleware)

insert_middleware #

insert_middleware(middleware)
Source code in faststream/_internal/configs/broker.py
def insert_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
    self.broker_middlewares = (middleware, *self.broker_middlewares)