Skip to content

DefaultPublisher

faststream.confluent.publisher.usecase.DefaultPublisher #

DefaultPublisher(config, specifcication)

Bases: LogicPublisher

Source code in faststream/confluent/publisher/usecase.py
def __init__(
    self,
    config: "KafkaPublisherConfig",
    specifcication: "PublisherSpecification[Any, Any]",
) -> None:
    super().__init__(config, specifcication)

    self.key = config.key

key instance-attribute #

key = key

specification instance-attribute #

specification = specification

middlewares instance-attribute #

middlewares = middlewares

mock instance-attribute #

mock = MagicMock()

partition instance-attribute #

partition = partition

reply_to instance-attribute #

reply_to = reply_to

headers instance-attribute #

headers = headers or {}

topic property #

topic

publish async #

publish(
    message: SendableMessage,
    topic: str = "",
    *,
    key: bytes | str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: Literal[True] = ...,
) -> Future[Message | None]
publish(
    message: SendableMessage,
    topic: str = "",
    *,
    key: bytes | str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: Literal[False] = False,
) -> Message | None
publish(
    message: SendableMessage,
    topic: str = "",
    *,
    key: bytes | str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: bool = False,
) -> Future[Message | None] | Message | None
publish(
    message,
    topic="",
    *,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    correlation_id=None,
    reply_to="",
    no_confirm=False,
)
Source code in faststream/confluent/publisher/usecase.py
@override
async def publish(
    self,
    message: "SendableMessage",
    topic: str = "",
    *,
    key: bytes | str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    reply_to: str = "",
    no_confirm: bool = False,
) -> asyncio.Future[Message | None] | Message | None:
    cmd = KafkaPublishCommand(
        message,
        topic=topic or self.topic,
        key=key or self.key,
        partition=partition or self.partition,
        reply_to=reply_to or self.reply_to,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        timestamp_ms=timestamp_ms,
        no_confirm=no_confirm,
        _publish_type=PublishType.PUBLISH,
    )
    msg: asyncio.Future[Message | None] | Message | None = await self._basic_publish(
        cmd,
        producer=self._outer_config.producer,
        _extra_middlewares=(),
    )
    return msg

request async #

request(
    message,
    topic="",
    *,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    correlation_id=None,
    timeout=0.5,
)
Source code in faststream/confluent/publisher/usecase.py
@override
async def request(
    self,
    message: "SendableMessage",
    topic: str = "",
    *,
    key: bytes | str | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    timeout: float = 0.5,
) -> "KafkaMessage":
    return await super().request(
        message,
        topic=topic,
        key=key or self.key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        correlation_id=correlation_id,
        timeout=timeout,
    )

start async #

start()
Source code in faststream/_internal/endpoint/publisher/usecase.py
async def start(self) -> None:
    pass

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema()
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()

flush async #

flush()
Source code in faststream/confluent/publisher/usecase.py
async def flush(self) -> None:
    producer = cast("AsyncConfluentFastProducer", self._outer_config.producer)
    await producer.flush()