Skip to content

AdminService

faststream.confluent.helpers.admin.AdminService #

AdminService()
Source code in faststream/confluent/helpers/admin.py
def __init__(self) -> None:
    self.admin_client: AdminClient | None = None

admin_client instance-attribute #

admin_client = None

client property #

client

connect async #

connect(config)
Source code in faststream/confluent/helpers/admin.py
async def connect(self, config: "ConfluentFastConfig") -> None:
    if self.admin_client is None:
        self.admin_client = AdminClient(config.admin_config)

disconnect async #

disconnect()
Source code in faststream/confluent/helpers/admin.py
async def disconnect(self) -> None:
    self.admin_client = None

create_topics #

create_topics(topics)
Source code in faststream/confluent/helpers/admin.py
def create_topics(self, topics: list[str]) -> list[CreateResult]:
    create_result = self.client.create_topics(
        [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics],
    )

    final_results = []
    for topic, f in create_result.items():
        try:
            f.result()

        except Exception as e:
            if "TOPIC_ALREADY_EXISTS" not in str(e):
                result = CreateResult(topic, e)
            else:
                result = CreateResult(topic, None)

        else:
            result = CreateResult(topic, None)

        final_results.append(result)

    return final_results