Skip to content

KVBucketDeclarer

faststream.nats.helpers.bucket_declarer.KVBucketDeclarer #

KVBucketDeclarer()
Source code in faststream/nats/helpers/bucket_declarer.py
def __init__(self) -> None:
    self.buckets = {}

    self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState()

buckets instance-attribute #

buckets = {}

connect #

connect(connection)
Source code in faststream/nats/helpers/bucket_declarer.py
def connect(self, connection: "JetStreamContext") -> None:
    self.__state = ConnectedState(connection)

disconnect #

disconnect()
Source code in faststream/nats/helpers/bucket_declarer.py
def disconnect(self) -> None:
    self.__state = EmptyConnectionState()

create_key_value async #

create_key_value(
    bucket,
    *,
    description=None,
    max_value_size=None,
    history=1,
    ttl=None,
    max_bytes=None,
    storage=None,
    replicas=1,
    placement=None,
    republish=None,
    direct=None,
    declare=True,
)
Source code in faststream/nats/helpers/bucket_declarer.py
async def create_key_value(
    self,
    bucket: str,
    *,
    description: str | None = None,
    max_value_size: int | None = None,
    history: int = 1,
    ttl: float | None = None,  # in seconds
    max_bytes: int | None = None,
    storage: Optional["StorageType"] = None,
    replicas: int = 1,
    placement: Optional["Placement"] = None,
    republish: Optional["RePublish"] = None,
    direct: bool | None = None,
    # custom
    declare: bool = True,
) -> "KeyValue":
    if (key_value := self.buckets.get(bucket)) is None:
        if declare:
            key_value = await self.__state.connection.create_key_value(
                config=KeyValueConfig(
                    bucket=bucket,
                    description=description,
                    max_value_size=max_value_size,
                    history=history,
                    ttl=ttl,
                    max_bytes=max_bytes,
                    storage=storage,
                    replicas=replicas,
                    placement=placement,
                    republish=republish,
                    direct=direct,
                ),
            )
        else:
            key_value = await self.__state.connection.key_value(bucket)

        self.buckets[bucket] = key_value

    return key_value