Skip to content

SubjectsCollection

faststream.nats.schemas.js_stream.SubjectsCollection #

SubjectsCollection(initlist=None)

Bases: UserList[str]

Source code in faststream/nats/schemas/js_stream.py
def __init__(self, initlist: Iterable[str] | None = None, /) -> None:
    super().__init__(())
    self.extend(initlist or ())

extend #

extend(subjects)
Source code in faststream/nats/schemas/js_stream.py
def extend(self, subjects: Iterable[str], /) -> None:
    for subj in subjects:
        self.append(subj)

append #

append(subject)
Source code in faststream/nats/schemas/js_stream.py
def append(self, subject: str, /) -> None:
    _, subject = compile_nats_wildcard(subject)

    new_subjects = []
    for old_subject in self.data:
        if is_subject_match_wildcard(subject, old_subject):
            return

        if not is_subject_match_wildcard(old_subject, subject):
            new_subjects.append(old_subject)

    new_subjects.append(subject)
    self.data = new_subjects