RedisRouter
faststream.redis.broker.RedisRouter #
RedisRouter(
prefix: str = "",
handlers: Iterable[RedisRoute] = (),
*,
dependencies: Iterable[Dependant] = (),
middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
routers: Iterable[RedisRegistrator] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
include_in_schema: bool | None = None,
ack_policy: AckPolicy = EMPTY,
)
Bases: RedisRegistrator, BrokerRouter[BaseMessage]
Includable to RedisBroker router.
Initialize the RedisRouter.
| PARAMETER | DESCRIPTION |
|---|---|
prefix | String prefix to add to all subscribers queues. TYPE: |
handlers | Route object to include. TYPE: |
dependencies | Dependencies list ( TYPE: |
middlewares | Router middlewares to apply to all routers' publishers/subscribers. TYPE: |
routers | Routers to apply to broker. TYPE: |
parser | Parser to map original IncomingMessage Msg to FastStream one. TYPE: |
decoder | Function to decode FastStream msg bytes body to python objects. TYPE: |
include_in_schema | Whetever to include operation in AsyncAPI schema or not. TYPE: |
ack_policy | Default acknowledgement policy for all subscribers in this router. Can be overridden at the subscriber level. TYPE: |
Source code in faststream/redis/broker/router.py
config instance-attribute #
add_middleware #
add_middleware(
middleware: BrokerMiddleware[Any, Any],
) -> None
Append BrokerMiddleware to the end of middlewares list.
Current middleware will be used as a most inner of the stack.
Source code in faststream/_internal/broker/registrator.py
insert_middleware #
insert_middleware(
middleware: BrokerMiddleware[Any, Any],
) -> None
Insert BrokerMiddleware to the start of middlewares list.
Current middleware will be used as a most outer of the stack.
Source code in faststream/_internal/broker/registrator.py
subscriber #
subscriber(
channel: Union[PubSub, str] = ...,
*,
list: None = None,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> ChannelSubscriber
subscriber(
channel: Union[PubSub, str] = ...,
*,
list: None = None,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: int = ...,
) -> ChannelConcurrentSubscriber
subscriber(
channel: None = None,
*,
list: str = ...,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> ListSubscriber
subscriber(
channel: None = None,
*,
list: Union[ListSub, str] = ...,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> Union[ListSubscriber, ListBatchSubscriber]
subscriber(
channel: None = None,
*,
list: Union[ListSub, str] = ...,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: int = ...,
) -> ListConcurrentSubscriber
subscriber(
channel: None = None,
*,
list: None = None,
stream: str = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> StreamSubscriber
subscriber(
channel: None = None,
*,
list: None = None,
stream: Union[StreamSub, str] = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> Union[StreamSubscriber, StreamBatchSubscriber]
subscriber(
channel: None = None,
*,
list: None = None,
stream: Union[StreamSub, str] = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: int = ...,
) -> StreamConcurrentSubscriber
subscriber(
channel: Union[PubSub, str, None] = None,
*,
list: Union[ListSub, str, None] = None,
stream: Union[StreamSub, str, None] = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: int | None = None,
) -> LogicSubscriber
subscriber(
channel: Union[PubSub, str, None] = None,
*,
list: Union[ListSub, str, None] = None,
stream: Union[StreamSub, str, None] = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: int | None = None,
) -> LogicSubscriber
Subscribe a handler to a RabbitMQ queue.
| PARAMETER | DESCRIPTION |
|---|---|
channel | Redis PubSub object name to send message. TYPE: |
list | Redis List object name to send message. TYPE: |
stream | Redis Stream object name to send message. TYPE: |
ack_policy | Acknowledgement policy for message processing. TYPE: |
dependencies | Dependencies list ( TYPE: |
parser | Parser to map original IncomingMessage Msg to FastStream one. TYPE: |
decoder | Function to decode FastStream msg bytes body to python objects. TYPE: |
no_reply | Whether to disable FastStream RPC and Reply To auto responses or not. TYPE: |
message_format | Which format to use when parsing messages. TYPE: |
persistent | Whether to make the subscriber persistent or not. TYPE: |
max_workers | Number of workers to process messages concurrently. TYPE: |
title | AsyncAPI subscriber object title. TYPE: |
description | AsyncAPI subscriber object description. Uses decorated docstring as default. TYPE: |
include_in_schema | Whether to include operation in AsyncAPI schema or not. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
SubscriberType | The subscriber object. TYPE: |
Source code in faststream/redis/broker/registrator.py
publisher #
publisher(
channel: None = None,
*,
list: None = None,
stream: Union[StreamSub, str] = ...,
headers: dict[str, Any] | None = None,
reply_to: str = "",
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> StreamPublisher
publisher(
channel: None = None,
*,
list: str = ...,
stream: None = None,
headers: dict[str, Any] | None = None,
reply_to: str = "",
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> ListPublisher
publisher(
channel: None = None,
*,
list: Union[ListSub, str] = ...,
stream: None = None,
headers: dict[str, Any] | None = None,
reply_to: str = "",
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> Union[ListPublisher, ListBatchPublisher]
publisher(
channel: Union[PubSub, str] = ...,
*,
list: None = None,
stream: None = None,
headers: dict[str, Any] | None = None,
reply_to: str = "",
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> ChannelPublisher
publisher(
channel: Union[PubSub, str, None] = None,
*,
list: Union[ListSub, str, None] = None,
stream: Union[StreamSub, str, None] = None,
headers: dict[str, Any] | None = None,
reply_to: str = "",
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> LogicPublisher
publisher(
channel: Union[PubSub, str, None] = None,
*,
list: Union[ListSub, str, None] = None,
stream: Union[StreamSub, str, None] = None,
headers: dict[str, Any] | None = None,
reply_to: str = "",
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> LogicPublisher
Creates long-living and AsyncAPI-documented publisher object.
You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.
Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).
| PARAMETER | DESCRIPTION |
|---|---|
channel | Redis PubSub object name to send message. TYPE: |
list | Redis List object name to send message. TYPE: |
stream | Redis Stream object name to send message. TYPE: |
headers | Message headers to store meta-information. Can be overridden by TYPE: |
reply_to | Reply message destination PubSub object name. TYPE: |
message_format | Which format to use when parsing messages. TYPE: |
title | AsyncAPI publisher object title. TYPE: |
description | AsyncAPI publisher object description. TYPE: |
schema | AsyncAPI publishing message type. Should be any python-native object annotation or TYPE: |
include_in_schema | Whether to include operation in AsyncAPI schema or not. TYPE: |
persistent | Whether to make the publisher persistent or not. TYPE: |
Source code in faststream/redis/broker/registrator.py
include_router #
include_router(
router: RedisRegistrator,
*,
prefix: str = "",
dependencies: Iterable[Dependant] = (),
middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
include_in_schema: bool | None = None,
) -> None