Skip to content

create_subscriber

faststream.redis.subscriber.factory.create_subscriber #

create_subscriber(
    *,
    channel,
    list,
    stream,
    ack_policy,
    no_ack,
    config,
    no_reply=False,
    message_format,
    title_=None,
    description_=None,
    include_in_schema=True,
    max_workers=1,
)
Source code in faststream/redis/subscriber/factory.py
def create_subscriber(
    *,
    channel: Union["PubSub", str, None],
    list: Union["ListSub", str, None],
    stream: Union["StreamSub", str, None],
    # Subscriber args
    ack_policy: "AckPolicy",
    no_ack: bool,
    config: "RedisBrokerConfig",
    no_reply: bool = False,
    message_format: type["MessageFormat"] | None,
    # AsyncAPI args
    title_: str | None = None,
    description_: str | None = None,
    include_in_schema: bool = True,
    max_workers: int = 1,
) -> SubscriberType:
    _validate_input_for_misconfigure(
        channel=channel,
        list=list,
        stream=stream,
        ack_policy=ack_policy,
        no_ack=no_ack,
        max_workers=max_workers,
        message_format=message_format,
    )

    subscriber_config = RedisSubscriberConfig(
        channel_sub=PubSub.validate(channel),
        list_sub=ListSub.validate(list),
        stream_sub=StreamSub.validate(stream),
        no_reply=no_reply,
        _outer_config=config,
        _ack_policy=ack_policy,
        _message_format=message_format,
    )

    specification_config = RedisSubscriberSpecificationConfig(
        title_=title_,
        description_=description_,
        include_in_schema=include_in_schema,
    )

    calls = CallsCollection[Any]()

    specification: RedisSubscriberSpecification
    if subscriber_config.channel_sub:
        specification = ChannelSubscriberSpecification(
            config,
            specification_config,
            calls,
            channel=subscriber_config.channel_sub,
        )

        subscriber_config._ack_policy = AckPolicy.MANUAL

        if max_workers > 1:
            return ChannelConcurrentSubscriber(
                subscriber_config,
                specification,
                calls,
                max_workers=max_workers,
            )

        return ChannelSubscriber(subscriber_config, specification, calls)

    if subscriber_config.stream_sub:
        specification = StreamSubscriberSpecification(
            config,
            specification_config,
            calls,
            stream_sub=subscriber_config.stream_sub,
        )

        if subscriber_config.stream_sub.batch:
            # TODO: raise warning if max_workers in `_validate_input_for_misconfigure`
            return StreamBatchSubscriber(subscriber_config, specification, calls)

        if max_workers > 1:
            return StreamConcurrentSubscriber(
                subscriber_config,
                specification,
                calls,
                max_workers=max_workers,
            )

        return StreamSubscriber(subscriber_config, specification, calls)

    if subscriber_config.list_sub:
        specification = ListSubscriberSpecification(
            config,
            specification_config,
            calls,
            list_sub=subscriber_config.list_sub,
        )

        if subscriber_config.list_sub.batch:
            # TODO: raise warning if max_workers in `_validate_input_for_misconfigure`
            return ListBatchSubscriber(subscriber_config, specification, calls)

        if max_workers > 1:
            return ListConcurrentSubscriber(
                subscriber_config,
                specification,
                calls,
                max_workers=max_workers,
            )

        return ListSubscriber(subscriber_config, specification, calls)

    raise SetupError(INCORRECT_SETUP_MSG)