KvWatch(
bucket: str,
headers_only: bool = False,
include_history: bool = False,
ignore_deletes: bool = False,
meta_only: bool = False,
inactive_threshold: float | None = None,
timeout: float | None = 5.0,
declare: bool = True,
)
Bases: NameRequired
A class to represent a NATS kv watch subscription.
| PARAMETER | DESCRIPTION |
bucket | TYPE: str |
headers_only | Whether to receive only headers (default is False). TYPE: bool DEFAULT: False |
include_history | Whether to include history (default is False). TYPE: bool DEFAULT: False |
ignore_deletes | Whether to ignore deletes (default is False). TYPE: bool DEFAULT: False |
meta_only | Whether to receive only metadata (default is False). TYPE: bool DEFAULT: False |
inactive_threshold ( | obj:float, optional): Inactivity threshold (default is None). |
timeout ( | obj:float, optional): Timeout in seconds (default is 5.0). |
declare | Whether to create bucket automatically or just connect to it (default is True). TYPE: bool DEFAULT: True |
Source code in faststream/nats/schemas/kv_watch.py
| def __init__(
self,
bucket: str,
headers_only: bool = False,
include_history: bool = False,
ignore_deletes: bool = False,
meta_only: bool = False,
inactive_threshold: float | None = None,
timeout: float | None = 5.0,
# custom
declare: bool = True,
) -> None:
super().__init__(bucket)
self.headers_only = headers_only
self.include_history = include_history
self.ignore_deletes = ignore_deletes
self.meta_only = meta_only
self.inactive_threshold = inactive_threshold
self.timeout = timeout
self.declare = declare
|
headers_only = headers_only
include_history instance-attribute
include_history = include_history
ignore_deletes instance-attribute
ignore_deletes = ignore_deletes
inactive_threshold instance-attribute
inactive_threshold = inactive_threshold
timeout instance-attribute
declare instance-attribute
validate classmethod
validate(value: str | Self, **kwargs: Any) -> Self
validate(value: None, **kwargs: Any) -> None
validate(
value: str | Self | None, **kwargs: Any
) -> Self | None
Factory to create object.
Source code in faststream/_internal/proto.py
| @classmethod
def validate(cls, value: str | Self | None, **kwargs: Any) -> Self | None:
"""Factory to create object."""
if value is not None and isinstance(value, str):
value = cls(value, **kwargs)
return value
|