Msgspec Serialization
msgspec is a fast serialization and validation library, with builtin
support for JSON , MessagePack , YAML , and TOML . It features:
🚀 High performance encoders/decoders for common protocols. The JSON and
MessagePack implementations regularly benchmark
as the
fastest options for Python.
🎉 Support for a wide variety of Python types . Additional types may
be supported through extending
.
🔍 Zero-cost schema validation using familiar Python type annotations.
In benchmarks
msgspec
decodes and validates JSON
faster than orjson can decode it alone.
✨ A speedy Struct type for representing structured data. If you already
use dataclasses or attrs , Structs
should feel familiar. However,
they're 5-60x
faster for common operations.
FastStream supports msgspec as an alternative backend for serialization, which can be used instead of Pydantic .
To use it, you need to pass a MsgSpecSerializer
object to the broker:
msgspec.field
Msgspec field allows you to override the field name for encoding/decoding and provide default values.
Just use msgspec.field
as a function default argument:
AIOKafka Confluent RabbitMQ NATS Redis
from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker (
"localhost:9092" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
@broker . subscriber ( "test-channel" )
async def handle (
name : str ,
user_id : int = field ( name = "userId" ),
):
assert name == "John"
assert user_id == 1
from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.confluent import KafkaBroker
broker = KafkaBroker (
"localhost:9092" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
@broker . subscriber ( "test-channel" , auto_offset_reset = "earliest" )
async def handle (
name : str ,
user_id : int = field ( name = "userId" ),
):
assert name == "John"
assert user_id == 1
from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker (
"amqp://guest:guest@localhost:5672/" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
@broker . subscriber ( "test-queue" )
async def handle (
name : str ,
user_id : int = field ( name = "userId" ),
):
assert name == "John"
assert user_id == 1
from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker (
"nats://localhost:4222" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
@broker . subscriber ( "test-subject" )
async def handle (
name : str ,
user_id : int = field ( name = "userId" ),
):
assert name == "John"
assert user_id == 1
from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.redis import RedisBroker
broker = RedisBroker (
"redis://localhost:6379" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
@broker . subscriber ( "test-channel" )
async def handle (
name : str ,
user_id : int = field ( name = "userId" ),
):
assert name == "John"
assert user_id == 1
msgspec.Struct
To make your message schema reusable between different subscribers and publishers, you can declare it as a msgspec.Struct
and use it as a single message annotation:
AIOKafka Confluent RabbitMQ NATS Redis
from msgspec import field , Struct
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.kafka import KafkaBroker
broker = KafkaBroker (
"localhost:9092" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
class UserInfo ( Struct ):
name : str
user_id : int = field ( name = "userId" )
@broker . subscriber ( "test-channel" )
async def handle (
user : UserInfo ,
):
assert user . name == "John"
assert user . user_id == 1
from msgspec import field , Struct
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.confluent import KafkaBroker
broker = KafkaBroker (
"localhost:9092" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
class UserInfo ( Struct ):
name : str
user_id : int = field ( name = "userId" )
@broker . subscriber ( "test-channel" )
async def handle (
user : UserInfo ,
):
assert user . name == "John"
assert user . user_id == 1
from msgspec import field , Struct
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker (
"amqp://guest:guest@localhost:5672/" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
class UserInfo ( Struct ):
name : str
user_id : int = field ( name = "userId" )
@broker . subscriber ( "test-queue" )
async def handle (
user : UserInfo ,
):
assert user . name == "John"
assert user . user_id == 1
from msgspec import field , Struct
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker (
"nats://localhost:4222" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
class UserInfo ( Struct ):
name : str
user_id : int = field ( name = "userId" )
@broker . subscriber ( "test-subject" )
async def handle (
user : UserInfo ,
):
assert user . name == "John"
assert user . user_id == 1
from msgspec import field , Struct
from fast_depends.msgspec import MsgSpecSerializer
from faststream import FastStream
from faststream.redis import RedisBroker
broker = RedisBroker (
"redis://localhost:6379" ,
serializer = MsgSpecSerializer (),
)
app = FastStream ( broker )
class UserInfo ( Struct ):
name : str
user_id : int = field ( name = "userId" )
@broker . subscriber ( "test-channel" )
async def handle (
user : UserInfo ,
):
assert user . name == "John"
assert user . user_id == 1