Skip to content

LogicPublisher

faststream.nats.publisher.usecase.LogicPublisher #

LogicPublisher(config, specification)

Bases: PublisherUsecase

A class to represent a NATS publisher.

Initialize NATS publisher object.

Source code in faststream/nats/publisher/usecase.py
def __init__(
    self,
    config: "NatsPublisherConfig",
    specification: "PublisherSpecification[Any, Any]",
) -> None:
    """Initialize NATS publisher object."""
    super().__init__(config, specification)

    self._subject = config.subject
    self.stream = config.stream
    self.timeout = config.timeout or 0.5
    self.headers = config.headers or {}
    self.reply_to = config.reply_to

stream instance-attribute #

stream = stream

timeout instance-attribute #

timeout = timeout or 0.5

headers instance-attribute #

headers = headers or {}

reply_to instance-attribute #

reply_to = reply_to

clear_subject property #

clear_subject

Compile test.{name} to test.* subject.

subject property #

subject

specification instance-attribute #

specification = specification

middlewares instance-attribute #

middlewares = middlewares

mock instance-attribute #

mock = MagicMock()

publish async #

publish(
    message: SendableMessage,
    subject: str = "",
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    stream: None = None,
    timeout: float | None = None,
) -> None
publish(
    message: SendableMessage,
    subject: str = "",
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    stream: str | None = None,
    timeout: float | None = None,
) -> PubAck
publish(
    message,
    subject="",
    headers=None,
    reply_to="",
    correlation_id=None,
    stream=None,
    timeout=None,
)

Publish message directly.

PARAMETER DESCRIPTION
message

Message body to send. Can be any encodable object (native python types or pydantic.BaseModel).

TYPE: SendableMessage

subject

NATS subject to send message.

TYPE: str DEFAULT: ''

headers

Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

NATS subject name to send response.

TYPE: str DEFAULT: ''

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

stream

This option validates that the target subject is in presented stream. Can be omitted without any effect if you doesn't want PubAck frame.

TYPE: str | None DEFAULT: None

timeout

Timeout to send message to NATS.

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
Optional[PubAck]

None if you publishes a regular message.

Optional[PubAck]

faststream.nats.PubAck if you publishes a message to stream.

Source code in faststream/nats/publisher/usecase.py
@override
async def publish(
    self,
    message: "SendableMessage",
    subject: str = "",
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    correlation_id: str | None = None,
    stream: str | None = None,
    timeout: float | None = None,
) -> Optional["PubAck"]:
    """Publish message directly.

    Args:
        message:
            Message body to send.
            Can be any encodable object (native python types or `pydantic.BaseModel`).
        subject:
            NATS subject to send message.
        headers:
            Message headers to store metainformation.
            **content-type** and **correlation_id** will be set automatically by framework anyway.
        reply_to:
            NATS subject name to send response.
        correlation_id:
            Manual message **correlation_id** setter.
            **correlation_id** is a useful option to trace messages.
        stream:
            This option validates that the target subject is in presented stream.
            Can be omitted without any effect if you doesn't want PubAck frame.
        timeout:
            Timeout to send message to NATS.

    Returns:
        `None` if you publishes a regular message.
        `faststream.nats.PubAck` if you publishes a message to stream.
    """
    cmd = NatsPublishCommand(
        message,
        subject=subject or self.subject,
        headers=self.headers | (headers or {}),
        reply_to=reply_to or self.reply_to,
        correlation_id=correlation_id or gen_cor_id(),
        stream=stream or getattr(self.stream, "name", None),
        timeout=timeout or self.timeout,
        _publish_type=PublishType.PUBLISH,
    )

    response: PubAck | None
    if cmd.stream:
        response = cast(
            "PubAck",
            await self._basic_publish(
                cmd,
                producer=self._outer_config.js_producer,
                _extra_middlewares=(),
            ),
        )
    else:
        response = await self._basic_publish(
            cmd,
            producer=self._outer_config.producer,
            _extra_middlewares=(),
        )

    return response

request async #

request(
    message,
    subject="",
    headers=None,
    correlation_id=None,
    stream=None,
    timeout=0.5,
)

Make a synchronous request to outer subscriber.

If out subscriber listens subject by stream, you should setup the same stream explicitly. Another way you will reseave confirmation frame as a response.

PARAMETER DESCRIPTION
message

Message body to send. Can be any encodable object (native python types or pydantic.BaseModel).

TYPE: SendableMessage

subject

NATS subject to send message.

TYPE: str DEFAULT: ''

headers

Message headers to store metainformation. content-type and correlation_id will be set automatically by framework anyway.

TYPE: dict[str, str] | None DEFAULT: None

reply_to

NATS subject name to send response.

correlation_id

Manual message correlation_id setter. correlation_id is a useful option to trace messages.

TYPE: str | None DEFAULT: None

stream

This allows to make RPC calls over JetStream subjects.

TYPE: str | None DEFAULT: None

timeout

Timeout to send message to NATS.

TYPE: float DEFAULT: 0.5

RETURNS DESCRIPTION
NatsMessage

faststream.nats.message.NatsMessage object as an outer subscriber response.

Source code in faststream/nats/publisher/usecase.py
@override
async def request(
    self,
    message: "SendableMessage",
    subject: str = "",
    headers: dict[str, str] | None = None,
    correlation_id: str | None = None,
    stream: str | None = None,
    timeout: float = 0.5,
) -> "NatsMessage":
    """Make a synchronous request to outer subscriber.

    If out subscriber listens subject by stream, you should setup the same **stream** explicitly.
    Another way you will reseave confirmation frame as a response.

    Args:
        message:
            Message body to send.
            Can be any encodable object (native python types or `pydantic.BaseModel`).
        subject:
            NATS subject to send message.
        headers:
            Message headers to store metainformation.
            **content-type** and **correlation_id** will be set automatically by framework anyway.
        reply_to:
            NATS subject name to send response.
        correlation_id:
            Manual message **correlation_id** setter.
            **correlation_id** is a useful option to trace messages.
        stream:
            This allows to make RPC calls over JetStream subjects.
        timeout:
            Timeout to send message to NATS.

    Returns:
        `faststream.nats.message.NatsMessage` object as an outer subscriber response.
    """
    cmd = NatsPublishCommand(
        message=message,
        subject=subject or self.subject,
        headers=self.headers | (headers or {}),
        timeout=timeout or self.timeout,
        correlation_id=correlation_id or gen_cor_id(),
        stream=stream or getattr(self.stream, "name", None),
        _publish_type=PublishType.REQUEST,
    )

    if cmd.stream:
        producer: ProducerProto[Any] = self._outer_config.js_producer
    else:
        producer = self._outer_config.producer

    msg: NatsMessage = await self._basic_request(cmd, producer=producer)
    return msg

start async #

start()
Source code in faststream/_internal/endpoint/publisher/usecase.py
async def start(self) -> None:
    pass

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def set_test(
    self,
    *,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/_internal/endpoint/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock.reset_mock()

schema #

schema()
Source code in faststream/_internal/endpoint/publisher/usecase.py
def schema(self) -> dict[str, "PublisherSpec"]:
    return self.specification.get_schema()