Skip to content

create_publisher

faststream.nats.publisher.factory.create_publisher #

create_publisher(
    *,
    subject,
    reply_to,
    headers,
    stream,
    timeout,
    broker_config,
    middlewares,
    schema_,
    title_,
    description_,
    include_in_schema,
)
Source code in faststream/nats/publisher/factory.py
def create_publisher(
    *,
    subject: str,
    reply_to: str,
    headers: dict[str, str] | None,
    stream: Optional["JStream"],
    timeout: float | None,
    # Publisher args
    broker_config: "NatsBrokerConfig",
    middlewares: Sequence["PublisherMiddleware"],
    # AsyncAPI args
    schema_: Any | None,
    title_: str | None,
    description_: str | None,
    include_in_schema: bool,
) -> LogicPublisher:
    publisher_config = NatsPublisherConfig(
        subject=subject,
        stream=stream,
        reply_to=reply_to,
        headers=headers,
        timeout=timeout,
        middlewares=middlewares,
        _outer_config=broker_config,
    )

    specification = NatsPublisherSpecification(
        _outer_config=broker_config,
        specification_config=NatsPublisherSpecificationConfig(
            subject=subject,
            schema_=schema_,
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        ),
    )

    return LogicPublisher(publisher_config, specification)