Skip to content

change_producer

faststream.nats.testing.change_producer #

change_producer(config, producer)
Source code in faststream/nats/testing.py
@contextmanager
def change_producer(
    config: "ConfigComposition[NatsBrokerConfig]",
    producer: "NatsFastProducer",
) -> Generator[None, None, None]:
    old_producer, config.broker_config.producer = (
        config.broker_config.producer,
        producer,
    )
    old_js_producer, config.broker_config.js_producer = (
        config.broker_config.js_producer,
        producer,
    )
    yield
    config.broker_config.producer = old_producer
    config.broker_config.js_producer = old_js_producer