Annotation Serialization#
Basic usage#
As you already know, FastStream serializes your incoming message body according to the function type annotations using Pydantic.
So, there are some valid use cases:
As with other Python primitive types as well (float
, bool
, datetime
, etc)
Note
If the incoming message cannot be serialized by the described schema, FastStream raises a pydantic.ValidationError
with a correct log message.
Also, thanks to Pydantic (again), FastStream is able to serialize (and validate) more complex types like pydantic.HttpUrl
, pydantic.PositiveInt
, etc.
JSON Basic Serialization#
But how can we serialize more complex message, like { "name": "John", "user_id": 1 }
?
For sure, we can serialize it as a simple dict
But it doesn't looks like a correct message validation, does it?
For this reason, FastStream supports per-argument message serialization: you can declare multiple arguments with various types and your message will unpack to them:
Tip
By default FastStream uses json.loads()
to decode and json.dumps()
to encode your messages. But if you prefer orjson or ujson, just install it and framework will use it automatically.
Serialization details#
Simple message#
If you expect to consume simple message like b"1"
or b"any_string"
, using the single argument as a function annotation.
In this case your argument name has no matter cuz it is a total message body.
See the examples below:
JSON-like message#
If you expect to consume a message with a specific structure like JSON, multiple arguments is a shortcut for JSONs.
In this case your message will be unpacked and serialized by various fields
See the examples below:
To consume single JSON, you should create a single-field pydantic model and use it for annotation.
Partial body consuming#
If you don't need to use all the fields, you can simply specify the fields you want to use, and the other will be ignored. See the example below: