Bases: TestBroker[KafkaBroker]
A class to test Kafka brokers.
Source code in faststream/_internal/testing/broker.py
| def __init__(
self,
broker: Broker,
with_real: bool = False,
connect_only: bool | None = None,
) -> None:
self.with_real = with_real
self.broker = broker
if connect_only is None:
try:
connect_only = is_contains_context_name(
self.__class__.__name__,
TestApp.__name__,
)
except Exception: # pragma: no cover
warnings.warn(
(
"\nError `{e!r}` occurred at `{self.__class__.__name__}` AST parsing."
"\n`connect_only` is set to `False` by default."
),
category=RuntimeWarning,
stacklevel=1,
)
connect_only = False
self.connect_only = connect_only
self._fake_subscribers: list[SubscriberUsecase[Any]] = []
|
with_real
instance-attribute
broker
instance-attribute
connect_only
instance-attribute
connect_only = connect_only
create_publisher_fake_subscriber
staticmethod
create_publisher_fake_subscriber(broker, publisher)
Source code in faststream/confluent/testing.py
| @staticmethod
def create_publisher_fake_subscriber(
broker: KafkaBroker,
publisher: "LogicPublisher",
) -> tuple["LogicSubscriber[Any]", bool]:
sub: LogicSubscriber[Any] | None = None
for handler in broker.subscribers:
handler = cast("LogicSubscriber[Any]", handler)
if _is_handler_matches(
handler,
topic=publisher.topic,
partition=publisher.partition,
):
sub = handler
break
if sub is None:
is_real = False
topic_name = publisher.topic
if publisher.partition:
tp = TopicPartition(
topic=topic_name,
partition=publisher.partition,
)
sub = broker.subscriber(
partitions=[tp],
batch=isinstance(publisher, BatchPublisher),
auto_offset_reset="earliest",
persistent=False,
)
else:
sub = broker.subscriber(
topic_name,
batch=isinstance(publisher, BatchPublisher),
auto_offset_reset="earliest",
persistent=False,
)
else:
is_real = True
return sub, is_real
|