Bases: NameRequired
A class to represent a Redis PubSub channel.
Source code in faststream/redis/schemas/pub_sub.py
| def __init__(
self,
channel: str,
pattern: bool = False,
polling_interval: float = 1.0,
) -> None:
reg, path = compile_path(
channel,
replace_symbol="*",
patch_regex=lambda x: x.replace(r"\*", ".*"),
)
if reg is not None:
pattern = True
super().__init__(path)
self.path_regex = reg
self.pattern = channel if pattern else None
self.polling_interval = polling_interval
|
path_regex
instance-attribute
pattern
instance-attribute
pattern = channel if pattern else None
polling_interval
instance-attribute
polling_interval = polling_interval
add_prefix
Source code in faststream/redis/schemas/pub_sub.py
| def add_prefix(self, prefix: str) -> "PubSub":
new_ch = deepcopy(self)
new_ch.name = f"{prefix}{new_ch.name}"
return new_ch
|
validate
classmethod
validate(value: str | Self, **kwargs: Any) -> Self
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)
Factory to create object.
Source code in faststream/_internal/proto.py
| @classmethod
def validate(cls, value: str | Self | None, **kwargs: Any) -> Self | None:
"""Factory to create object."""
if value is not None and isinstance(value, str):
value = cls(value, **kwargs)
return value
|