Skip to content

StreamSource

nats.js.api.StreamSource dataclass #

StreamSource(
    name: str,
    opt_start_seq: Optional[int] = None,
    opt_start_time: Optional[datetime] = None,
    filter_subject: Optional[str] = None,
    external: Optional[ExternalStream] = None,
    subject_transforms: Optional[
        List[SubjectTransform]
    ] = None,
)

Bases: Base

name instance-attribute #

name: str

opt_start_seq class-attribute instance-attribute #

opt_start_seq: Optional[int] = None

opt_start_time class-attribute instance-attribute #

opt_start_time: Optional[datetime] = None

filter_subject class-attribute instance-attribute #

filter_subject: Optional[str] = None

external class-attribute instance-attribute #

external: Optional[ExternalStream] = None

subject_transforms class-attribute instance-attribute #

subject_transforms: Optional[List[SubjectTransform]] = None

from_response classmethod #

from_response(resp: Dict[str, Any])
Source code in nats/js/api.py
@classmethod
def from_response(cls, resp: Dict[str, Any]):
    cls._convert(resp, "external", ExternalStream)
    cls._convert(resp, "subject_transforms", SubjectTransform)
    cls._convert_utc_iso(resp, "opt_start_time")
    return super().from_response(resp)

as_dict #

as_dict() -> Dict[str, object]
Source code in nats/js/api.py
def as_dict(self) -> Dict[str, object]:
    result = super().as_dict()
    if self.subject_transforms:
        result["subject_transforms"] = [tr.as_dict() for tr in self.subject_transforms]
    if self.opt_start_time is not None:
        result["opt_start_time"] = self._to_utc_iso(self.opt_start_time)
    return result

evolve #

evolve(**params) -> _B

Return a copy of the instance with the passed values replaced.

Source code in nats/js/api.py
def evolve(self: _B, **params) -> _B:
    """Return a copy of the instance with the passed values replaced."""
    return replace(self, **params)