Skip to content

create_subscriber

faststream.nats.subscriber.create_subscriber #

create_subscriber(
    *,
    subject,
    queue,
    pending_msgs_limit,
    pending_bytes_limit,
    max_msgs,
    durable,
    config,
    ordered_consumer,
    idle_heartbeat,
    flow_control,
    deliver_policy,
    headers_only,
    pull_sub,
    kv_watch,
    obj_watch,
    inbox_prefix,
    ack_first,
    max_workers,
    stream,
    ack_policy,
    no_ack,
    no_reply,
    broker_config,
    title_,
    description_,
    include_in_schema,
)
Source code in faststream/nats/subscriber/factory.py
def create_subscriber(
    *,
    subject: str,
    queue: str,
    pending_msgs_limit: int | None,
    pending_bytes_limit: int | None,
    # Core args
    max_msgs: int,
    # JS args
    durable: str | None,
    config: Optional["api.ConsumerConfig"],
    ordered_consumer: bool,
    idle_heartbeat: float | None,
    flow_control: bool | None,
    deliver_policy: Optional["api.DeliverPolicy"],
    headers_only: bool | None,
    # pull args
    pull_sub: Optional["PullSub"],
    kv_watch: Optional["KvWatch"],
    obj_watch: Optional["ObjWatch"],
    inbox_prefix: bytes,
    # custom args
    ack_first: bool,
    max_workers: int,
    stream: Optional["JStream"],
    # Subscriber args
    ack_policy: "AckPolicy",
    no_ack: bool,
    no_reply: bool,
    broker_config: "NatsBrokerConfig",
    # Specification information
    title_: str | None,
    description_: str | None,
    include_in_schema: bool,
) -> "LogicSubscriber[Any]":
    _validate_input_for_misconfigure(
        subject=subject,
        queue=queue,
        pending_msgs_limit=pending_msgs_limit,
        pending_bytes_limit=pending_bytes_limit,
        max_msgs=max_msgs,
        durable=durable,
        config=config,
        ordered_consumer=ordered_consumer,
        idle_heartbeat=idle_heartbeat,
        flow_control=flow_control,
        deliver_policy=deliver_policy,
        headers_only=headers_only,
        pull_sub=pull_sub,
        ack_policy=ack_policy,
        no_ack=no_ack,
        kv_watch=kv_watch,
        obj_watch=obj_watch,
        ack_first=ack_first,
        max_workers=max_workers,
        stream=stream,
    )

    config = config or ConsumerConfig(filter_subjects=[])
    if config.durable_name is None:
        config.durable_name = durable
    if config.idle_heartbeat is None:
        config.idle_heartbeat = idle_heartbeat
    if config.headers_only is None:
        config.headers_only = headers_only
    if config.deliver_policy is DeliverPolicy.ALL:
        config.deliver_policy = deliver_policy or DeliverPolicy.ALL

    if stream:
        # Both JS Subscribers
        extra_options: dict[str, Any] = {
            "pending_msgs_limit": pending_msgs_limit or DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
            "pending_bytes_limit": pending_bytes_limit
            or DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
            "durable": durable,
            "stream": stream.name,
        }

        if pull_sub is not None:
            # JS Pull Subscriber
            extra_options.update({"inbox_prefix": inbox_prefix})

        else:
            # JS Push Subscriber
            if ack_first or ack_policy is AckPolicy.ACK_FIRST:
                manual_ack = False
                ack_policy = AckPolicy.MANUAL
            else:
                manual_ack = True

            extra_options.update(
                {
                    "ordered_consumer": ordered_consumer,
                    "idle_heartbeat": idle_heartbeat,
                    "flow_control": flow_control,
                    "deliver_policy": deliver_policy,
                    "headers_only": headers_only,
                    "manual_ack": manual_ack,
                },
            )

    else:
        # Core Subscriber
        extra_options = {
            "pending_msgs_limit": pending_msgs_limit or DEFAULT_SUB_PENDING_MSGS_LIMIT,
            "pending_bytes_limit": pending_bytes_limit or DEFAULT_SUB_PENDING_BYTES_LIMIT,
            "max_msgs": max_msgs,
        }

    subscriber_config = NatsSubscriberConfig(
        subject=subject,
        sub_config=config,
        extra_options=extra_options,
        no_reply=no_reply,
        _outer_config=broker_config,
        _ack_first=ack_first,
        _ack_policy=ack_policy,
        _no_ack=no_ack,
    )

    calls = CallsCollection[Any]()

    specification_config = NatsSubscriberSpecificationConfig(
        subject=subject,
        queue=queue or None,
        title_=title_,
        description_=description_,
        include_in_schema=include_in_schema,
    )

    specification = NatsSubscriberSpecification(
        _outer_config=broker_config,
        calls=calls,
        specification_config=specification_config,
    )

    not_include_spec = NotIncludeSpecifation(
        _outer_config=broker_config,
        calls=calls,
        specification_config=specification_config,
    )

    subscriber_options: SharedOptions = {
        "config": subscriber_config,
        "specification": specification,
        "calls": calls,
    }

    if obj_watch is not None:
        return ObjStoreWatchSubscriber(
            **(subscriber_options | {"specification": not_include_spec}),
            obj_watch=obj_watch,
        )

    if kv_watch is not None:
        return KeyValueWatchSubscriber(
            **(subscriber_options | {"specification": not_include_spec}),
            kv_watch=kv_watch,
        )

    if stream is None:
        if max_workers > 1:
            return ConcurrentCoreSubscriber(
                **subscriber_options,
                max_workers=max_workers,
                queue=queue,
            )

        return CoreSubscriber(
            **subscriber_options,
            queue=queue,
        )

    if max_workers > 1:
        if pull_sub is not None:
            return ConcurrentPullStreamSubscriber(
                **subscriber_options,
                max_workers=max_workers,
                queue=queue,
                stream=stream,
                pull_sub=pull_sub,
            )

        return ConcurrentPushStreamSubscriber(
            **subscriber_options,
            max_workers=max_workers,
            queue=queue,
            stream=stream,
        )

    if pull_sub is not None:
        if pull_sub.batch:
            return BatchPullStreamSubscriber(
                **subscriber_options,
                pull_sub=pull_sub,
                stream=stream,
            )

        return PullStreamSubscriber(
            **subscriber_options,
            queue=queue,
            pull_sub=pull_sub,
            stream=stream,
        )

    return PushStreamSubscriber(
        **subscriber_options,
        queue=queue,
        stream=stream,
    )