RedisBroker
faststream.redis.broker.RedisBroker
#
RedisBroker(
url="redis://localhost:6379",
*,
host=EMPTY,
port=EMPTY,
db=EMPTY,
connection_class=EMPTY,
client_name=None,
health_check_interval=0,
max_connections=None,
socket_timeout=None,
socket_connect_timeout=None,
socket_read_size=65536,
socket_keepalive=False,
socket_keepalive_options=None,
socket_type=0,
retry_on_timeout=False,
encoding="utf-8",
encoding_errors="strict",
parser_class=DefaultParser,
encoder_class=Encoder,
graceful_timeout=15.0,
decoder=None,
parser=None,
dependencies=(),
middlewares=(),
routers=(),
message_format=BinaryMessageFormatV1,
security=None,
specification_url=None,
protocol=None,
protocol_version="custom",
description=None,
tags=(),
logger=EMPTY,
log_level=INFO,
apply_types=True,
serializer=EMPTY,
provider=None,
context=None,
)
Bases: RedisRegistrator
, BrokerUsecase[UnifyRedisDict, 'Redis[bytes]']
Redis broker.
Initialized the RedisBroker.
PARAMETER | DESCRIPTION |
---|---|
url
|
The Redis connection URL. Defaults to "redis://localhost:6379".
TYPE:
|
host
|
The Redis host to connect to. If not provided, it will be extracted from the URL.
TYPE:
|
port
|
The Redis port to connect to. If not provided, it will be extracted from the URL.
TYPE:
|
db
|
The Redis database to use. If not provided, it will be extracted from the URL.
TYPE:
|
connection_class
|
The class to use for establishing connections. Defaults to EMPTY.
TYPE:
|
client_name
|
The name of the Redis client. Defaults to None.
TYPE:
|
health_check_interval
|
The interval at which to perform health checks on the broker. Defaults to 0.
TYPE:
|
max_connections
|
The maximum number of connections to establish. Defaults to None.
TYPE:
|
socket_timeout
|
The timeout for socket operations. Defaults to None.
TYPE:
|
socket_connect_timeout
|
The timeout for connecting sockets. Defaults to None.
TYPE:
|
socket_read_size
|
The size of the buffer used for reading from sockets. Defaults to 65536.
TYPE:
|
socket_keepalive
|
Whether to enable keep-alive on sockets. Defaults to False.
TYPE:
|
socket_keepalive_options
|
Options for keep-alive on sockets. Defaults to None.
TYPE:
|
socket_type
|
The type of socket to use (if supported by your platform). Defaults to 0.
TYPE:
|
retry_on_timeout
|
Whether to retry operations that timeout. Defaults to False.
TYPE:
|
encoding
|
The encoding used for sending and receiving data. Defaults to "utf-8".
TYPE:
|
encoding_errors
|
How to handle encoding errors. Defaults to "strict".
TYPE:
|
parser_class
|
The class to use for parsing messages. Defaults to DefaultParser.
TYPE:
|
encoder_class
|
The class to use for encoding messages. Defaults to Encoder.
TYPE:
|
graceful_timeout
|
Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down. Defaults to 15.0.
TYPE:
|
decoder
|
Custom decoder object. Defaults to None.
TYPE:
|
parser
|
Custom parser object. Defaults to None.
TYPE:
|
dependencies
|
Dependencies to apply to all broker subscribers. Defaults to ().
TYPE:
|
middlewares
|
Middlewares to apply to all broker publishers/subscribers. Defaults to ().
TYPE:
|
routers
|
Routers to apply to broker. Defaults to ().
TYPE:
|
message_format
|
What format to use when parsing messages. Defaults to BinaryMessageFormatV1.
TYPE:
|
security
|
Security options to connect broker and generate AsyncAPI server security information. Defaults to None.
TYPE:
|
specification_url
|
AsyncAPI hardcoded server addresses. Use
TYPE:
|
protocol
|
AsyncAPI server protocol. Defaults to None.
TYPE:
|
protocol_version
|
AsyncAPI server protocol version. Defaults to "custom".
TYPE:
|
description
|
AsyncAPI server description. Defaults to None.
TYPE:
|
tags
|
AsyncAPI server tags. Defaults to (). |
logger
|
User specified logger to pass into Context and log service messages. Defaults to EMPTY.
TYPE:
|
log_level
|
Service messages log level. Defaults to logging.INFO.
TYPE:
|
apply_types
|
Whether to use FastDepends or not. Defaults to True.
TYPE:
|
serializer
|
Serializer object. Defaults to EMPTY.
TYPE:
|
provider
|
Provider for FastDepends library. Defaults to None.
TYPE:
|
context
|
Context repository for FastDepends library. Defaults to None.
TYPE:
|
Source code in faststream/redis/broker/broker.py
|
|
add_middleware
#
Append BrokerMiddleware to the end of middlewares list.
Current middleware will be used as a most inner of the stack.
Source code in faststream/_internal/broker/registrator.py
insert_middleware
#
Insert BrokerMiddleware to the start of middlewares list.
Current middleware will be used as a most outer of the stack.
Source code in faststream/_internal/broker/registrator.py
subscriber
#
subscriber(
channel: Union[PubSub, str] = ...,
*,
list: None = None,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
middlewares: Sequence[SubscriberMiddleware[Any]] = (),
no_ack: bool = EMPTY,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> ChannelSubscriber
subscriber(
channel: Union[PubSub, str] = ...,
*,
list: None = None,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
middlewares: Sequence[SubscriberMiddleware[Any]] = (),
no_ack: bool = EMPTY,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: int = ...,
) -> ChannelConcurrentSubscriber
subscriber(
channel: None = None,
*,
list: str = ...,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
middlewares: Sequence[SubscriberMiddleware[Any]] = (),
no_ack: bool = EMPTY,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> ListSubscriber
subscriber(
channel: None = None,
*,
list: Union[ListSub, str] = ...,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
middlewares: Sequence[SubscriberMiddleware[Any]] = (),
no_ack: bool = EMPTY,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> Union[ListSubscriber, ListBatchSubscriber]
subscriber(
channel: None = None,
*,
list: Union[ListSub, str] = ...,
stream: None = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
middlewares: Sequence[SubscriberMiddleware[Any]] = (),
no_ack: bool = EMPTY,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: int = ...,
) -> ListConcurrentSubscriber
subscriber(
channel: None = None,
*,
list: None = None,
stream: str = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
middlewares: Sequence[SubscriberMiddleware[Any]] = (),
no_ack: bool = EMPTY,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> StreamSubscriber
subscriber(
channel: None = None,
*,
list: None = None,
stream: Union[StreamSub, str] = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
middlewares: Sequence[SubscriberMiddleware[Any]] = (),
no_ack: bool = EMPTY,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: None = None,
) -> Union[StreamSubscriber, StreamBatchSubscriber]
subscriber(
channel: None = None,
*,
list: None = None,
stream: Union[StreamSub, str] = ...,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
middlewares: Sequence[SubscriberMiddleware[Any]] = (),
no_ack: bool = EMPTY,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: int = ...,
) -> StreamConcurrentSubscriber
subscriber(
channel: Union[PubSub, str, None] = None,
*,
list: Union[ListSub, str, None] = None,
stream: Union[StreamSub, str, None] = None,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
middlewares: Sequence[SubscriberMiddleware[Any]] = (),
no_ack: bool = EMPTY,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
max_workers: int | None = None,
) -> LogicSubscriber
subscriber(
channel=None,
*,
list=None,
stream=None,
dependencies=(),
parser=None,
decoder=None,
middlewares=(),
no_ack=EMPTY,
ack_policy=EMPTY,
no_reply=False,
message_format=None,
persistent=True,
title=None,
description=None,
include_in_schema=True,
max_workers=None,
)
Subscribe a handler to a RabbitMQ queue.
PARAMETER | DESCRIPTION |
---|---|
channel
|
Redis PubSub object name to send message.
TYPE:
|
list
|
Redis List object name to send message.
TYPE:
|
stream
|
Redis Stream object name to send message.
TYPE:
|
no_ack
|
Whether to disable FastStream auto acknowledgement logic or not.
TYPE:
|
ack_policy
|
Acknowledgement policy for message processing.
TYPE:
|
dependencies
|
Dependencies list (
TYPE:
|
parser
|
Parser to map original IncomingMessage Msg to FastStream one.
TYPE:
|
decoder
|
Function to decode FastStream msg bytes body to python objects.
TYPE:
|
middlewares
|
Subscriber middlewares to wrap incoming message processing.
TYPE:
|
no_reply
|
Whether to disable FastStream RPC and Reply To auto responses or not.
TYPE:
|
message_format
|
Which format to use when parsing messages.
TYPE:
|
persistent
|
Whether to make the subscriber persistent or not.
TYPE:
|
max_workers
|
Number of workers to process messages concurrently.
TYPE:
|
title
|
AsyncAPI subscriber object title.
TYPE:
|
description
|
AsyncAPI subscriber object description. Uses decorated docstring as default.
TYPE:
|
include_in_schema
|
Whether to include operation in AsyncAPI schema or not.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
SubscriberType
|
The subscriber object.
TYPE:
|
Source code in faststream/redis/broker/registrator.py
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 |
|
publisher
#
publisher(
channel: None = None,
*,
list: None = None,
stream: Union[StreamSub, str] = ...,
headers: dict[str, Any] | None = None,
reply_to: str = "",
middlewares: Sequence[PublisherMiddleware] = (),
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> StreamPublisher
publisher(
channel: None = None,
*,
list: str = ...,
stream: None = None,
headers: dict[str, Any] | None = None,
reply_to: str = "",
middlewares: Sequence[PublisherMiddleware] = (),
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> ListPublisher
publisher(
channel: None = None,
*,
list: Union[ListSub, str] = ...,
stream: None = None,
headers: dict[str, Any] | None = None,
reply_to: str = "",
middlewares: Sequence[PublisherMiddleware] = (),
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> Union[ListPublisher, ListBatchPublisher]
publisher(
channel: Union[PubSub, str] = ...,
*,
list: None = None,
stream: None = None,
headers: dict[str, Any] | None = None,
reply_to: str = "",
middlewares: Sequence[PublisherMiddleware] = (),
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> ChannelPublisher
publisher(
channel: Union[PubSub, str, None] = None,
*,
list: Union[ListSub, str, None] = None,
stream: Union[StreamSub, str, None] = None,
headers: dict[str, Any] | None = None,
reply_to: str = "",
middlewares: Sequence[PublisherMiddleware] = (),
message_format: type[MessageFormat] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> LogicPublisher
publisher(
channel=None,
*,
list=None,
stream=None,
headers=None,
reply_to="",
middlewares=(),
message_format=None,
persistent=True,
title=None,
description=None,
schema=None,
include_in_schema=True,
)
Creates long-living and AsyncAPI-documented publisher object.
You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...)
too) - @broker.publisher(...)
.
In such case publisher will publish your handler return value.
Or you can create a publisher object to call it lately - broker.publisher(...).publish(...)
.
PARAMETER | DESCRIPTION |
---|---|
channel
|
Redis PubSub object name to send message.
TYPE:
|
list
|
Redis List object name to send message.
TYPE:
|
stream
|
Redis Stream object name to send message.
TYPE:
|
headers
|
Message headers to store meta-information. Can be overridden
by
TYPE:
|
reply_to
|
Reply message destination PubSub object name.
TYPE:
|
middlewares
|
Publisher middlewares to wrap outgoing messages.
TYPE:
|
message_format
|
Which format to use when parsing messages.
TYPE:
|
title
|
AsyncAPI publisher object title.
TYPE:
|
description
|
AsyncAPI publisher object description.
TYPE:
|
schema
|
AsyncAPI publishing message type. Should be any python-native
object annotation or
TYPE:
|
include_in_schema
|
Whether to include operation in AsyncAPI schema or not.
TYPE:
|
persistent
|
Whether to make the publisher persistent or not.
TYPE:
|
Source code in faststream/redis/broker/registrator.py
include_router
#
Source code in faststream/redis/broker/registrator.py
include_routers
#
connect
async
#
stop
async
#
Source code in faststream/redis/broker/broker.py
close
async
#
Source code in faststream/redis/broker/broker.py
start
async
#
publish
async
#
publish(
message=None,
channel=None,
*,
reply_to="",
headers=None,
correlation_id=None,
list=None,
stream=None,
maxlen=None,
pipeline=None,
)
Publish message directly.
This method allows you to publish a message in a non-AsyncAPI-documented way. It can be used in other frameworks or to publish messages at specific intervals.
PARAMETER | DESCRIPTION |
---|---|
message
|
Message body to send.
TYPE:
|
channel
|
Redis PubSub object name to send message.
TYPE:
|
reply_to
|
Reply message destination PubSub object name.
TYPE:
|
headers
|
Message headers to store metainformation.
TYPE:
|
correlation_id
|
Manual message correlation_id setter. correlation_id is a useful option to trace messages.
TYPE:
|
list
|
Redis List object name to send message.
TYPE:
|
stream
|
Redis Stream object name to send message.
TYPE:
|
maxlen
|
Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded.
TYPE:
|
pipeline
|
Redis pipeline to use for publishing messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The result of the publish operation, typically the number of messages published.
TYPE:
|
Source code in faststream/redis/broker/broker.py
request
async
#
request(
message,
channel=None,
*,
list=None,
stream=None,
maxlen=None,
correlation_id=None,
headers=None,
timeout=30.0,
)
Source code in faststream/redis/broker/broker.py
publish_batch
async
#
Publish multiple messages to Redis List by one request.
PARAMETER | DESCRIPTION |
---|---|
*messages
|
Messages bodies to send.
TYPE:
|
list
|
Redis List object name to send messages.
TYPE:
|
correlation_id
|
Manual message correlation_id setter. correlation_id is a useful option to trace messages.
TYPE:
|
reply_to
|
Reply message destination PubSub object name.
TYPE:
|
headers
|
Message headers to store metainformation.
TYPE:
|
pipeline
|
Redis pipeline to use for publishing messages.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
int
|
The result of the batch publish operation.
TYPE:
|