Skip to content

PubSub

faststream.redis.schemas.PubSub #

PubSub(channel, pattern=False, polling_interval=1.0)

Bases: NameRequired

A class to represent a Redis PubSub channel.

Source code in faststream/redis/schemas/pub_sub.py
def __init__(
    self,
    channel: str,
    pattern: bool = False,
    polling_interval: float = 1.0,
) -> None:
    reg, path = compile_path(
        channel,
        replace_symbol="*",
        patch_regex=lambda x: x.replace(r"\*", ".*"),
    )

    if reg is not None:
        pattern = True

    super().__init__(path)

    self.path_regex = reg
    self.pattern = channel if pattern else None
    self.polling_interval = polling_interval

name instance-attribute #

name = name

path_regex instance-attribute #

path_regex = reg

pattern instance-attribute #

pattern = channel if pattern else None

polling_interval instance-attribute #

polling_interval = polling_interval

validate classmethod #

validate(value: str | Self, **kwargs: Any) -> Self
validate(value: None, **kwargs: Any) -> None
validate(value, **kwargs)

Factory to create object.

Source code in faststream/_internal/proto.py
@classmethod
def validate(cls, value: str | Self | None, **kwargs: Any) -> Self | None:
    """Factory to create object."""
    if value is not None and isinstance(value, str):
        value = cls(value, **kwargs)
    return value

add_prefix #

add_prefix(prefix)
Source code in faststream/redis/schemas/pub_sub.py
def add_prefix(self, prefix: str) -> "PubSub":
    new_ch = deepcopy(self)
    new_ch.name = f"{prefix}{new_ch.name}"
    return new_ch