AsgiFastStream
faststream.asgi.AsgiFastStream #
AsgiFastStream(
broker: Optional[BrokerUsecase[Any, Any]] = None,
/,
asgi_routes: Sequence[tuple[str, ASGIApp]] = (),
logger: Optional[LoggerProto] = logger,
provider: Provider | None = None,
serializer: Optional[SerializerProto] = EMPTY,
context: ContextRepo | None = None,
lifespan: Optional[Lifespan] = None,
on_startup: Sequence[AnyCallable] = (),
after_startup: Sequence[AnyCallable] = (),
on_shutdown: Sequence[AnyCallable] = (),
after_shutdown: Sequence[AnyCallable] = (),
specification: Optional[SpecificationFactory] = None,
asyncapi_path: str | AsyncAPIRoute | None = None,
)
Bases: Application
Source code in faststream/asgi/app.py
lifespan_context instance-attribute #
lifespan_context = apply_types(
func=lifespan,
serializer_cls=_serializer,
cast_result=False,
context__=context,
)
set_broker #
Set already existed App object broker.
Useful then you create/init broker in on_startup hook.
Source code in faststream/_internal/application.py
start async #
Executes startup hooks and start broker.
stop async #
on_startup #
Add hook running BEFORE broker connected.
This hook also takes an extra CLI options as a kwargs.
Source code in faststream/_internal/application.py
on_shutdown #
Add hook running BEFORE broker disconnected.
Source code in faststream/_internal/application.py
after_startup #
after_startup(
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]
Add hook running AFTER broker connected.
Source code in faststream/_internal/application.py
after_shutdown #
after_shutdown(
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]
Add hook running AFTER broker disconnected.
Source code in faststream/_internal/application.py
from_app classmethod #
from_app(
app: Application,
asgi_routes: Sequence[tuple[str, ASGIApp]],
asyncapi_path: str | AsyncAPIRoute | None = None,
) -> AsgiFastStream
Source code in faststream/asgi/app.py
run async #
Source code in faststream/asgi/app.py
exit #
start_lifespan_context async #
start_lifespan_context(
run_extra_options: dict[str, SettingField]
| None = None,
) -> AsyncIterator[None]
Source code in faststream/asgi/app.py
lifespan async #
Handle ASGI lifespan messages to start and shutdown the app.