Skip to content

OSBucketDeclarer

faststream.nats.helpers.obj_storage_declarer.OSBucketDeclarer #

OSBucketDeclarer()
Source code in faststream/nats/helpers/obj_storage_declarer.py
def __init__(self) -> None:
    self.buckets = {}

    self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState()

buckets instance-attribute #

buckets = {}

connect #

connect(connection)
Source code in faststream/nats/helpers/obj_storage_declarer.py
def connect(self, connection: "JetStreamContext") -> None:
    self.__state = ConnectedState(connection)

disconnect #

disconnect()
Source code in faststream/nats/helpers/obj_storage_declarer.py
def disconnect(self) -> None:
    self.__state = EmptyConnectionState()

create_object_store async #

create_object_store(
    bucket,
    *,
    description=None,
    ttl=None,
    max_bytes=None,
    storage=None,
    replicas=1,
    placement=None,
    declare=True,
)
Source code in faststream/nats/helpers/obj_storage_declarer.py
async def create_object_store(
    self,
    bucket: str,
    *,
    description: str | None = None,
    ttl: float | None = None,
    max_bytes: int | None = None,
    storage: Optional["StorageType"] = None,
    replicas: int = 1,
    placement: Optional["Placement"] = None,
    # custom
    declare: bool = True,
) -> "ObjectStore":
    if (object_store := self.buckets.get(bucket)) is None:
        if declare:
            object_store = await self.__state.connection.create_object_store(
                bucket=bucket,
                config=ObjectStoreConfig(
                    bucket=bucket,
                    description=description,
                    ttl=ttl,
                    max_bytes=max_bytes,
                    storage=storage,
                    replicas=replicas,
                    placement=placement,
                ),
            )
        else:
            object_store = await self.__state.connection.object_store(bucket)

        self.buckets[bucket] = object_store

    return object_store