Skip to content

StreamBuilder

faststream.nats.helpers.StreamBuilder #

StreamBuilder()

A class to register stream-subjects pairs in Broker/Router.

Source code in faststream/nats/helpers/stream_builder.py
def __init__(self) -> None:
    # stores stream: SubjectsCollection pairs
    # where SubjectsCollection contains subjects
    # made by current builder only
    self.objects: dict[str, tuple[JStream, SubjectsCollection]] = {}

objects instance-attribute #

create #

create(
    name: Union[str, JStream, None],
) -> Optional[JStream]

Get an object.

Source code in faststream/nats/helpers/stream_builder.py
def create(
    self,
    name: Union[str, "JStream", None],
) -> Optional["JStream"]:
    """Get an object."""
    if (stream := JStream.validate(name)) and (stream.name not in self.objects):
        self.objects[stream.name] = (stream, stream.subjects.copy())
    return stream

get #

get(
    stream: Union[JStream, str, None],
    default: tuple[JStream, SubjectsCollection]
    | None = None,
) -> tuple[JStream, SubjectsCollection] | None
Source code in faststream/nats/helpers/stream_builder.py
def get(
    self,
    stream: Union["JStream", str, None],
    default: tuple["JStream", "SubjectsCollection"] | None = None,
) -> tuple["JStream", "SubjectsCollection"] | None:
    if stream := JStream.validate(stream):
        return self.objects.get(stream.name, default)
    return default

add_subject #

add_subject(
    stream: Union[JStream, str, None], subject: str
) -> None
Source code in faststream/nats/helpers/stream_builder.py
def add_subject(
    self,
    stream: Union["JStream", str, None],
    subject: str,
) -> None:
    if (stream := JStream.validate(stream)) and subject:
        stream, subjects = self.objects.get(
            stream.name,
            (stream, stream.subjects.copy()),
        )
        subjects.append(subject)
        self.objects[stream.name] = (stream, subjects)