Skip to content

Annotation Serialization#

Basic usage#

As you already know, FastStream serializes your incoming message body according to the function type annotations using Pydantic.

So, there are some valid use cases:

@broker.subscriber("test")
async def handle(
    msg: str,
):
    ...

@broker.subscriber("test")
async def handle(
    msg: bytes,
):
    ...

@broker.subscriber("test")
async def handle(
    msg: int,
):
    ...

As with other Python primitive types as well (float, bool, datetime, etc)

Note

If the incoming message cannot be serialized by the described schema, FastStream raises a pydantic.ValidationError with a correct log message.

Also, thanks to Pydantic (again), FastStream is able to serialize (and validate) more complex types like pydantic.HttpUrl, pydantic.PositiveInt, etc.

JSON Basic Serialization#

But how can we serialize more complex message, like { "name": "John", "user_id": 1 } ?

For sure, we can serialize it as a simple dict

1
2
3
4
5
6
7
from typing import Dict, Any

@broker.subscriber("test")
async def handle(
    msg: dict[str, Any],
):
    ...

But it doesn't looks like a correct message validation, does it?

For this reason, FastStream supports per-argument message serialization: you can declare multiple arguments with various types and your message will unpack to them:

1
2
3
4
5
6
7
@broker.subscriber("test-topic")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1
1
2
3
4
5
6
7
@broker.subscriber("test-topic")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1
1
2
3
4
5
6
7
@broker.subscriber("test-queue")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1
1
2
3
4
5
6
7
@broker.subscriber("test-subject")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1
1
2
3
4
5
6
7
@broker.subscriber("test-channel")
async def handle(
    name: str,
    user_id: int,
):
    assert name == "John"
    assert user_id == 1

Tip

By default FastStream uses json.loads() to decode and json.dumps() to encode your messages. But if you prefer orjson just install it and framework will use it automatically.

Serialization details#

Simple message#

If you expect to consume simple message like b"1" or b"any_string", using the single argument as a function annotation.

In this case your argument name has no matter cuz it is a total message body.

See the examples below:

async def handler(body: int): ...
    # waits any int-serializable simple message like b"1"
async def handler(body: str): ...
    # waits any str-serializable simple message like b"any_string"

JSON-like message#

If you expect to consume a message with a specific structure like JSON, multiple arguments is a shortcut for JSONs.

In this case your message will be unpacked and serialized by various fields

See the examples below:

async def handler(name: str, id: int): ...
    # waits for { "name": "John", "id": 1, ... }

To consume single JSON, you should create a single-field pydantic model and use it for annotation.

1
2
3
4
5
class User(BaseModel):
    name: str

async def handler(body: User): ...
    # waits for { "name": "John" }

Partial body consuming#

If you don't need to use all the fields, you can simply specify the fields you want to use, and the other will be ignored. See the example below:

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker()
app = FastStream(broker)

@broker.subscriber("test")
async def handle(name: str, age: int):
    print(f"{name=}, {age=}")

@app.after_startup
async def t():
    await broker.publish({
        "name": "John",
        "age": 25,
        "useless": {
            "nested": "useless"
        }
    }, topic="test")