Skip to content

StreamBuilder

faststream.nats.helpers.StreamBuilder #

StreamBuilder()

A class to register stream-subjects pairs in Broker/Router.

Source code in faststream/nats/helpers/stream_builder.py
def __init__(self) -> None:
    # stores stream: SubjectsCollection pairs
    # where SubjectsCollection contains subjects
    # made by current builder only
    self.objects: dict[str, tuple[JStream, SubjectsCollection]] = {}

objects instance-attribute #

objects = {}

create #

create(name)

Get an object.

Source code in faststream/nats/helpers/stream_builder.py
def create(
    self,
    name: Union[str, "JStream", None],
) -> Optional["JStream"]:
    """Get an object."""
    if (stream := JStream.validate(name)) and (stream.name not in self.objects):
        self.objects[stream.name] = (stream, stream.subjects.copy())
    return stream

get #

get(stream, default=None)
Source code in faststream/nats/helpers/stream_builder.py
def get(
    self,
    stream: Union["JStream", str, None],
    default: tuple["JStream", "SubjectsCollection"] | None = None,
) -> tuple["JStream", "SubjectsCollection"] | None:
    if stream := JStream.validate(stream):
        return self.objects.get(stream.name, default)
    return default

add_subject #

add_subject(stream, subject)
Source code in faststream/nats/helpers/stream_builder.py
def add_subject(
    self,
    stream: Union["JStream", str, None],
    subject: str,
) -> None:
    if (stream := JStream.validate(stream)) and subject:
        stream, subjects = self.objects.get(
            stream.name,
            (stream, stream.subjects.copy()),
        )
        subjects.append(subject)
        self.objects[stream.name] = (stream, subjects)