Skip to content

Msgspec Serialization#

msgspec is a fast serialization and validation library, with builtin support for JSON, MessagePack, YAML, and TOML. It features:

  • 🚀 High performance encoders/decoders for common protocols. The JSON and MessagePack implementations regularly benchmark as the fastest options for Python.

  • 🎉 Support for a wide variety of Python types. Additional types may be supported through extending.

  • 🔍 Zero-cost schema validation using familiar Python type annotations. In benchmarks msgspec decodes and validates JSON faster than orjson can decode it alone.

  • A speedy Struct type for representing structured data. If you already use dataclasses or attrs, Structs should feel familiar. However, they're 5-60x faster for common operations.

FastStream supports msgspec as an alternative backend for serialization, which can be used instead of Pydantic.

To use it, you need to pass a MsgSpecSerializer object to the broker:

from fast_depends.msgspec import MsgSpecSerializer
from faststream.kafka import KafkaBroker

broker = KafkaBroker(serializer=MsgSpecSerializer())
from fast_depends.msgspec import MsgSpecSerializer
from faststream.confluent import KafkaBroker

broker = KafkaBroker(serializer=MsgSpecSerializer())
from fast_depends.msgspec import MsgSpecSerializer
from faststream.rabbit import RabbitBroker

broker = RabbitBroker(serializer=MsgSpecSerializer())
from fast_depends.msgspec import MsgSpecSerializer
from faststream.nats import NatsBroker

broker = NatsBroker(serializer=MsgSpecSerializer())
from fast_depends.msgspec import MsgSpecSerializer
from faststream.redis import RedisBroker

broker = RedisBroker(serializer=MsgSpecSerializer())

msgspec.field#

Msgspec field allows you to override the field name for encoding/decoding and provide default values.

Just use msgspec.field as a function default argument:

from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker(
    "localhost:9092",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


@broker.subscriber("test-channel")
async def handle(
    name: str,
    user_id: int = field(name="userId"),
):
    assert name == "John"
    assert user_id == 1
from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker(
    "localhost:9092",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


@broker.subscriber("test-channel", auto_offset_reset="earliest")
async def handle(
    name: str,
    user_id: int = field(name="userId"),
):
    assert name == "John"
    assert user_id == 1
from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker(
    "amqp://guest:guest@localhost:5672/",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


@broker.subscriber("test-queue")
async def handle(
    name: str,
    user_id: int = field(name="userId"),
):
    assert name == "John"
    assert user_id == 1
from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker(
    "nats://localhost:4222",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


@broker.subscriber("test-subject")
async def handle(
    name: str,
    user_id: int = field(name="userId"),
):
    assert name == "John"
    assert user_id == 1
from msgspec import field
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker(
    "redis://localhost:6379",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


@broker.subscriber("test-channel")
async def handle(
    name: str,
    user_id: int = field(name="userId"),
):
    assert name == "John"
    assert user_id == 1

msgspec.Struct#

To make your message schema reusable between different subscribers and publishers, you can declare it as a msgspec.Struct and use it as a single message annotation:

from msgspec import field, Struct
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker(
    "localhost:9092",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


class UserInfo(Struct):
    name: str
    user_id: int = field(name="userId")


@broker.subscriber("test-channel")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1
from msgspec import field, Struct
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker(
    "localhost:9092",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


class UserInfo(Struct):
    name: str
    user_id: int = field(name="userId")


@broker.subscriber("test-channel")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1
from msgspec import field, Struct
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker(
    "amqp://guest:guest@localhost:5672/",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


class UserInfo(Struct):
    name: str
    user_id: int = field(name="userId")


@broker.subscriber("test-queue")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1
from msgspec import field, Struct
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker(
    "nats://localhost:4222",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


class UserInfo(Struct):
    name: str
    user_id: int = field(name="userId")


@broker.subscriber("test-subject")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1
from msgspec import field, Struct
from fast_depends.msgspec import MsgSpecSerializer

from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker(
    "redis://localhost:6379",
    serializer=MsgSpecSerializer(),
)
app = FastStream(broker)


class UserInfo(Struct):
    name: str
    user_id: int = field(name="userId")


@broker.subscriber("test-channel")
async def handle(
    user: UserInfo,
):
    assert user.name == "John"
    assert user.user_id == 1