def create_topics(self, topics: list[str]) -> list[CreateResult]:
create_result = self.client.create_topics(
[NewTopic(topic, num_partitions=1, replication_factor=1) for topic in topics],
)
final_results = []
for topic, f in create_result.items():
try:
f.result()
except Exception as e:
if "TOPIC_ALREADY_EXISTS" not in str(e):
result = CreateResult(topic, e)
else:
result = CreateResult(topic, None)
else:
result = CreateResult(topic, None)
final_results.append(result)
return final_results