Skip to content

Dealing with message encoded by FastStream#

To provide great features like observability and more, FastStream needs to include extra data in your message. Redis, in turn, provides the ability to send any type of data within a message. Therefore, FastStream uses its own binary format for messages, which supports any type of data you want to use and can include any additional information.

Migration plan#

Currently, FastStream uses JSON as the default format for encoding and parsing messages internally. However, in future updates, JSON will be deprecated and eventually removed, as per the following plan:

  • In versions 0.5.x - JSON will continue to be the default message format
  • In versions 0.6.x - the new binary format will become the default, with JSON being deprecated
  • In versions 0.7.x and beyond - JSON will no longer be supported and will be removed completely

Message structure#

The message compiled by FastStream in binary format has the following structure:

# Format metadata
[Identification header: 8 bytes]
[Format version: 16 bit big-endian int]
[Headers offset (points to the number of headers): 32 bit big-endian uint]
[Data offset (indicates position at which the data starts): 32 bit big-endian uint]
[Number of headers: 16 bit big-endian uint]
# headers
[Length of key: 16 bit big-endian uint]
[Key: UTF-8 string]
[Length of value: 16 bit big-endian uint]
[Value: UTF-8 string]
# and so on until headers length is reached ...

# The data
[Data: untouched user data until the end of the message]

Note

The [...] blocks are arranged in a sequence, with no symbols or data separating them.

Switching between formats#

On the publisher's side#

from faststream import FastStream
from faststream.redis import RedisBroker
from faststream.redis.parser import BinaryMessageFormatV1

# JSONMessageFormat can be used instead, but it will be deprecated in future updates
broker = RedisBroker(message_format=BinaryMessageFormatV1)

app =  FastStream(broker)

@broker.publisher("queue")
async def message_publisher():
    return "message"

On the subscriber's side#

from faststream import FastStream
from faststream.redis import RedisBroker
from faststream.redis.parser import BinaryMessageFormatV1

broker = RedisBroker()

app =  FastStream(broker)

# JSONMessageFormat can be used instead, but it will be deprecated in future updates
@broker.subscriber("queue", message_format=BinaryMessageFormatV1)
async def message_handler(msg):
    print(msg)

Parsing in FastStream application#

This message format is primarily internal, but there are some cases where you may need to correctly parse it. For example, in the on_receive method of the middleware:

from faststream import BaseMiddleware
from faststream.redis.parser import BinaryMessageFormatV1


class MyMiddleware(BaseMiddleware):
    async def on_receive(self) -> None:
        data, headers = BinaryMessageFormatV1.parse(self.msg["data"])
        data *= 2
        self.msg["data"] = BinaryMessageFormatV1.encode(
            message=data,
            reply_to=None,
            correlation_id=headers["correlation_id"],
            headers=headers,
        )

Parsing without FastStream#

You can always find the FastStream's implementation on github.

Parsing in Go#

Go example parser:

package main

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "os"
)

func main() {
    // Read entire file
    raw, err := GetMessage()
    if err != nil {
        fmt.Println("Error reading file:", err)
        os.Exit(1)
    }

    // Parse fixed metadata
    idHeader := raw[0:8]
    version := binary.BigEndian.Uint16(raw[8:10])
    headersOffset := binary.BigEndian.Uint32(raw[10:14])
    dataOffset := binary.BigEndian.Uint32(raw[14:18])

    // Parse headers section
    buf := bytes.NewBuffer(raw[headersOffset:])
    headerCount := int(binary.BigEndian.Uint16(buf.Next(2)))

    headers := make(map[string]string)
    for i := 0; i < headerCount; i++ {
        keyLen := int(binary.BigEndian.Uint16(buf.Next(2)))
        key := string(buf.Next(keyLen))
        valLen := int(binary.BigEndian.Uint16(buf.Next(2)))
        value := string(buf.Next(valLen))
        headers[key] = value
    }

    // Parse data section (assumed to be UTF-8 text)
    data := string(raw[dataOffset:])

    // Print results
    fmt.Printf("ID Header: % x\n", idHeader)
    fmt.Printf("Version: %d\n", version)
    fmt.Printf("Headers: %v\n", headers)
    fmt.Printf("Data: %s\n", data)
}

Parsing in Java#

Java example parser:

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.Map;

class MessageParser {

    public static void main(String[] args) throws Exception {
        // Read entire file into byte array
        byte[] fileData = getMessage();

        // Parse using fixed offsets
        ByteBuffer buffer = ByteBuffer.wrap(fileData).order(ByteOrder.BIG_ENDIAN);

        // Parse header
        byte[] idHeader = new byte[8];
        buffer.get(idHeader);
        int version = buffer.getShort() & 0xFFFF;
        int headersOffset = buffer.getInt();
        int dataOffset = buffer.getInt();

        // Jump to headers section
        buffer.position(headersOffset);
        int headerCount = buffer.getShort() & 0xFFFF;

        // Parse key-value pairs
        Map<String, String> headers = new LinkedHashMap<>();
        for (int i = 0; i < headerCount; i++) {
            String key = readString(buffer);
            String value = readString(buffer);
            headers.put(key, value);
        }

        // Extract and convert text data
        byte[] dataBytes = new byte[fileData.length - dataOffset];
        System.arraycopy(fileData, dataOffset, dataBytes, 0, dataBytes.length);
        String data = new String(dataBytes, StandardCharsets.UTF_8);

        // Print results
        System.out.println("ID Header: " + bytesToHex(idHeader));
        System.out.println("Version: " + version);
        System.out.println("Headers: " + headers);
        System.out.println("Data: " + data);
    }

    private static String readString(ByteBuffer buffer) {
        int length = buffer.getShort() & 0xFFFF;
        byte[] bytes = new byte[length];
        buffer.get(bytes);
        return new String(bytes, StandardCharsets.UTF_8);
    }

    private static String bytesToHex(byte[] bytes) {
        StringBuilder hex = new StringBuilder();
        for (byte b : bytes) {
            hex.append(String.format("%02X ", b));
        }
        return hex.toString().trim();
    }
}