A class to register stream-subjects pairs in Broker/Router.
Source code in faststream/nats/helpers/stream_builder.py
| def __init__(self) -> None:
# stores stream: SubjectsCollection pairs
# where SubjectsCollection contains subjects
# made by current builder only
self.objects: dict[str, tuple[JStream, SubjectsCollection]] = {}
|
objects
instance-attribute
create
Get an object.
Source code in faststream/nats/helpers/stream_builder.py
| def create(
self,
name: Union[str, "JStream", None],
) -> Optional["JStream"]:
"""Get an object."""
if (stream := JStream.validate(name)) and (stream.name not in self.objects):
self.objects[stream.name] = (stream, stream.subjects.copy())
return stream
|
get
get(stream, default=None)
Source code in faststream/nats/helpers/stream_builder.py
| def get(
self,
stream: Union["JStream", str, None],
default: tuple["JStream", "SubjectsCollection"] | None = None,
) -> tuple["JStream", "SubjectsCollection"] | None:
if stream := JStream.validate(stream):
return self.objects.get(stream.name, default)
return default
|
add_subject
add_subject(stream, subject)
Source code in faststream/nats/helpers/stream_builder.py
| def add_subject(
self,
stream: Union["JStream", str, None],
subject: str,
) -> None:
if (stream := JStream.validate(stream)) and subject:
stream, subjects = self.objects.get(
stream.name,
(stream, stream.subjects.copy()),
)
subjects.append(subject)
self.objects[stream.name] = (stream, subjects)
|