Skip to content

Consuming Acknowledgements#

As you may know, Nats employs a rather extensive Acknowledgement policy.

In most cases, FastStream automatically acknowledges (acks) messages on your behalf. When your function executes correctly, including sending all responses, a message will be acknowledged (and rejected in case of an exception).

However, there are situations where you might want to use different acknowledgement logic.

Manual Acknowledgement#

If you want to acknowledge a message manually, you can get access directly to the message object via the Context and call the method.

from faststream.nats.annotations import NatsMessage

@broker.subscriber("test")
async def base_handler(body: str, msg: NatsMessage):
    await msg.ack()
    # or
    await msg.nack()
    # or
    await msg.reject()

FastStream will see that the message was already acknowledged and will do nothing at the end of the process.

Interrupt Process#

If you want to interrupt message processing at any call stack, you can raise faststream.exceptions.AckMessage

from faststream import FastStream
from faststream.exceptions import AckMessage
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)


@broker.subscriber("test-subject", stream="test-stream")
async def handle(body):
    smth_processing(body)


def smth_processing(body):
    if True:
        raise AckMessage()


@app.after_startup
async def test_publishing():
    await broker.publish("Hello!", "test-subject")

This way, FastStream interrupts the current message processing and acknowledges it immediately. Also, you can raise NackMessage and RejectMessage too.

Tip

If you want to disable FastStream Acknowledgement logic at all, you can use @broker.subscriber(..., no_ack=True) option. This way you should always process a message (ack/nack/terminate/etc) by yourself.