Skip to content

Consuming Acknowledgements#

As you may know, Kafka consumer should commit a topic offset when consuming a message.

The default behaviour, also implemented as such in the FastStream, uses the AckPolicy.ACK_FIRST policy which automatically commits (acks) topic offset using enable.auto.commit setting. This is the at most once consuming strategy.

However, if you wish to use at least once strategy, you should commit offset AFTER the message is processed correctly. To accomplish that, set a consumer group and use AckPolicy.ACK strategy:

@broker.subscriber(
    "test", group_id="group", ack_policy=AckPolicy.ACK
)
async def base_handler(body: str):
    ...

This way, the message processed will be acknowledged after handler execution. In the case of an exception being raised, the message will also be acknowledged.

If you want to retry on error, you can use AckPolicy.NACK_ON_ERROR strategy. In this way offset will not be committed and consumer seeks to read this message again:

@broker.subscriber(
    "test", group_id="group", ack_policy=AckPolicy.NACK_ON_ERROR
)
async def base_handler(body: str):
    ...

However, there are situations where you might want to use a different acknowledgement logic.

Manual Acknowledgement#

If you want to acknowledge a message manually, you can get direct access to the message object via the Context and acknowledge the message by calling the ack method:

from faststream.kafka.annotations import KafkaMessage


@broker.subscriber(
    "test", group_id="group", ack_policy=AckPolicy.MANUAL
)
async def base_handler(body: str, msg: KafkaMessage):
    await msg.ack()
    # or
    await msg.nack()
    # or any custom logic

Tip

You can use the nack method to prevent offset commit and the message can be consumed by another consumer within the same group.

FastStream will see that the message was already acknowledged and will do nothing at the end of the process.

Summary FastStream AckPolicy behavior#

AckPolicy On success On error Description
MANUAL Do nothing Do nothing Consumer never commits offset, full manual control
ACK_FIRST Do nothing Do nothing Offset committed by Kafka client within enable.auto.commit setting
ACK Commit offset Commit offset
REJECT_ON_ERROR Commit offset Commit offset Same as ack, because Kafka has not native support for rejecting messages
NACK_ON_ERROR Commit offset Seek offset Seek offset to read message again

Interrupt Process#

If you wish to interrupt the processing of a message at any call stack level and acknowledge the message, you can achieve that by raising the faststream.exceptions.AckMessage.

from faststream import FastStream, AckPolicy
from faststream.exceptions import AckMessage
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


@broker.subscriber(
    "test-topic", group_id="test-group", ack_policy=AckPolicy.REJECT_ON_ERROR,
)
async def handle(body):
    smth_processing(body)


def smth_processing(body):
    if True:
        raise AckMessage()


@app.after_startup
async def test_publishing():
    await broker.publish("Hello!", "test-topic")

This way, FastStream interrupts the current message processing and acknowledges it immediately. Similarly, you can raise NackMessage as well to prevent the message from being committed.

Tip

If you want to disable FastStream Acknowledgement logic at all, you can use @broker.subscriber(..., no_ack=True) option. This way you should always process a message (ack/nack/terminate/etc) by yourself.