MQTTBroker
faststream.mqtt.MQTTBroker #
MQTTBroker(
host: str = "localhost:1883",
port: int = EMPTY,
*,
client_id: str = "",
keepalive: int = 60,
clean_session: bool = True,
version: Literal["3.1.1", "5.0"] = "5.0",
reconnect: ReconnectConfig | None = None,
session_expiry_interval: int = 0,
graceful_timeout: float | None = 15.0,
decoder: Optional[CustomCallable] = None,
parser: Optional[CustomCallable] = None,
dependencies: Iterable[Dependant] = (),
middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
routers: Iterable[MQTTRegistrator] = (),
ack_policy: AckPolicy = EMPTY,
specification_url: str | None = None,
protocol_version: str | None = None,
description: str | None = None,
tags: Iterable[Tag | TagDict] = (),
security: Optional[BaseSecurity] = None,
logger: Optional[LoggerProto] = EMPTY,
log_level: int = INFO,
apply_types: bool = True,
serializer: Optional[SerializerProto] = EMPTY,
provider: Optional[Provider] = None,
context: Optional[ContextRepo] = None,
)
Bases: MQTTRegistrator, BrokerUsecase[Message, MQTTClient]
MQTT broker for FastStream using the zmqtt client library.
Source code in faststream/mqtt/broker/broker.py
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | |
config instance-attribute #
publish_batch async #
add_middleware #
add_middleware(
middleware: BrokerMiddleware[Any, Any],
) -> None
Append BrokerMiddleware to the end of middlewares list.
Current middleware will be used as a most inner of the stack.
Source code in faststream/_internal/broker/registrator.py
insert_middleware #
insert_middleware(
middleware: BrokerMiddleware[Any, Any],
) -> None
Insert BrokerMiddleware to the start of middlewares list.
Current middleware will be used as a most outer of the stack.
Source code in faststream/_internal/broker/registrator.py
subscriber #
subscriber(
topic: str,
*,
qos: QoS = AT_MOST_ONCE,
shared: str | None = None,
ack_policy: AckPolicy = EMPTY,
no_reply: bool = False,
dependencies: Iterable[Dependant] = (),
parser: Optional[CustomCallable] = None,
decoder: Optional[CustomCallable] = None,
max_workers: int = 1,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
) -> MQTTDefaultSubscriber | MQTTConcurrentSubscriber
Subscribe a handler to an MQTT topic.
| PARAMETER | DESCRIPTION |
|---|---|
topic | MQTT topic filter. Wildcards TYPE: |
qos | QoS level for the subscription (0, 1, or 2). TYPE: |
shared | Optional shared subscription group name. When set, subscribes as TYPE: |
ack_policy | Acknowledgement policy for message processing. TYPE: |
no_reply | Whether to disable FastStream RPC / reply-to responses. TYPE: |
dependencies | Dependencies list to apply to the subscriber. TYPE: |
parser | Custom parser to map raw messages to FastStream ones. TYPE: |
decoder | Function to decode FastStream message bytes to Python objects. TYPE: |
max_workers | Number of workers to process messages concurrently. TYPE: |
persistent | Whether to retain the subscriber across broker restarts. TYPE: |
title | AsyncAPI subscriber object title. TYPE: |
description | AsyncAPI subscriber object description. TYPE: |
include_in_schema | Whether to include operation in AsyncAPI schema. TYPE: |
Source code in faststream/mqtt/broker/registrator.py
publisher #
publisher(
topic: str,
*,
qos: QoS = AT_MOST_ONCE,
retain: bool = False,
headers: dict[str, str] | None = None,
persistent: bool = True,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> MQTTPublisher
Create a persistent publisher object for the given MQTT topic.
| PARAMETER | DESCRIPTION |
|---|---|
topic | MQTT topic to publish to. Must not contain wildcards. TYPE: |
qos | QoS level for published messages (0, 1, or 2). TYPE: |
retain | Whether the broker should retain the last message. TYPE: |
headers | Default headers to include in every published message. TYPE: |
persistent | Whether to retain the publisher across broker restarts. TYPE: |
title | AsyncAPI publisher object title. TYPE: |
description | AsyncAPI publisher object description. TYPE: |
schema | AsyncAPI publishing message type. TYPE: |
include_in_schema | Whether to include operation in AsyncAPI schema. TYPE: |
Source code in faststream/mqtt/broker/registrator.py
include_router #
include_router(
router: MQTTRegistrator,
*,
prefix: str = "",
dependencies: Iterable[Dependant] = (),
middlewares: Sequence[BrokerMiddleware[Any, Any]] = (),
include_in_schema: bool | None = None,
) -> None
Source code in faststream/mqtt/broker/registrator.py
include_routers #
connect async #
start async #
stop async #
stop(
exc_type: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: Optional[TracebackType] = None,
) -> None
Source code in faststream/mqtt/broker/broker.py
ping async #
Source code in faststream/mqtt/broker/broker.py
publish async #
publish(
message: SendableMessage = None,
topic: str = "",
*,
qos: QoS = AT_MOST_ONCE,
retain: bool = False,
headers: dict[str, str] | None = None,
correlation_id: str | None = None,
reply_to: str = "",
) -> None
Publish a message to an MQTT topic.
| PARAMETER | DESCRIPTION |
|---|---|
message | Message body to send. TYPE: |
topic | MQTT topic to publish to. TYPE: |
qos | QoS level (0, 1, or 2). TYPE: |
retain | Whether the broker should retain the message. TYPE: |
headers | Message headers (MQTT 5.0 user properties). TYPE: |
correlation_id | Correlation ID for message tracing. TYPE: |
reply_to | Response topic (MQTT 5.0 response_topic property). TYPE: |
Source code in faststream/mqtt/broker/broker.py
request async #
request(
message: SendableMessage = None,
topic: str = "",
/,
timeout: float = 0.5,
correlation_id: str | None = None,
headers: dict[str, str] | None = None,
qos: QoS = AT_MOST_ONCE,
reply_to: str = "",
) -> MQTTMessage