Skip to content

LogicPublisher

faststream.kafka.publisher.usecase.LogicPublisher #

LogicPublisher(config, specification)

Bases: PublisherUsecase

A class to publish messages to a Kafka topic.

Source code in faststream/kafka/publisher/usecase.py
def __init__(
    self,
    config: "KafkaPublisherConfig",
    specification: "PublisherSpecification[Any, Any]",
) -> None:
    super().__init__(config, specification)

    self._topic = config.topic
    self.partition = config.partition
    self.reply_to = config.reply_to
    self.headers = config.headers or {}

partition instance-attribute #

partition = partition

reply_to instance-attribute #

reply_to = reply_to

headers instance-attribute #

headers = headers or {}

topic property #

topic

specification instance-attribute #

specification = specification

middlewares instance-attribute #

middlewares = middlewares

mock instance-attribute #

mock = MagicMock()

request async #

request(
    message,
    topic="",
    *,
    key=None,
    partition=None,
    timestamp_ms=None,
    headers=None,
    correlation_id=None,
    timeout=0.5,
)

Send a request message to Kafka topic.

PARAMETER DESCRIPTION
message

Message body to send.

TYPE: SendableMessage

topic

Topic where the message will be published.

TYPE: str DEFAULT: ''

key

A key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer's partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer.

TYPE: bytes | Any | None DEFAULT: None

partition

Specify a partition. If not set, the partition will be selected using the configured partitioner.

TYPE: int | None DEFAULT: None

timestamp_ms

Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.

TYPE: int | None DEFAULT: None

headers

Message headers to store metainformation.

TYPE: dict[str, str] | None DEFAULT: None

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

timeout

Timeout to send RPC request.

TYPE: float DEFAULT: 0.5

RETURNS DESCRIPTION
KafkaMessage

The response message.

TYPE: KafkaMessage

Source code in faststream/kafka/publisher/usecase.py
@override
async def request(
    self,
    message: "SendableMessage",
    topic: str = "",
    *,
    key: bytes | Any | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    timeout: float = 0.5,
) -> "KafkaMessage":
    """Send a request message to Kafka topic.

    Args:
        message: Message body to send.
        topic: Topic where the message will be published.
        key: A key to associate with the message. Can be used to
            determine which partition to send the message to. If partition
            is `None` (and producer's partitioner config is left as default),
            then messages with the same key will be delivered to the same
            partition (but if key is `None`, partition is chosen randomly).
            Must be type `bytes`, or be serializable to bytes via configured
            `key_serializer`.
        partition: Specify a partition. If not set, the partition will be
            selected using the configured `partitioner`.
        timestamp_ms: Epoch milliseconds (from Jan 1 1970 UTC) to use as
            the message timestamp. Defaults to current time.
        headers: Message headers to store metainformation.
        correlation_id: Manual message **correlation_id** setter.
            **correlation_id** is a useful option to trace messages.
        timeout: Timeout to send RPC request.

    Returns:
        KafkaMessage: The response message.
    """
    cmd = KafkaPublishCommand(
        message,
        topic=topic or self.topic,
        key=key,
        partition=partition or self.partition,
        headers=self.headers | (headers or {}),
        correlation_id=correlation_id or gen_cor_id(),
        timestamp_ms=timestamp_ms,
        timeout=timeout,
        _publish_type=PublishType.REQUEST,
    )

    msg: KafkaMessage = await self._basic_request(
        cmd,
        producer=self._outer_config.producer,
    )
    return msg

flush async #

flush()
Source code in faststream/kafka/publisher/usecase.py
async def flush(self) -> None:
    producer = cast("AioKafkaFastProducer", self._outer_config.producer)
    await producer.flush()

publish abstractmethod async #

publish(message, /, *, correlation_id=None)

Public method to publish a message.

Should be called by user only broker.publisher(...).publish(...).

Source code in faststream/_internal/endpoint/publisher/proto.py
@abstractmethod
async def publish(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: str | None = None,
) -> Any | None:
    """Public method to publish a message.

    Should be called by user only `broker.publisher(...).publish(...)`.
    """
    ...

start async #

start()
Source code in faststream/_internal/endpoint/publisher/usecase.py
async def start(self) -> None:
    pass

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema()
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()