Skip to content

AsgiFastStream

faststream.asgi.app.AsgiFastStream #

AsgiFastStream(
    broker: Optional[BrokerUsecase[Any, Any]] = None,
    /,
    asgi_routes: Sequence[tuple[str, ASGIApp]] = (),
    logger: Optional[LoggerProto] = logger,
    provider: Provider | None = None,
    serializer: Optional[SerializerProto] = EMPTY,
    context: ContextRepo | None = None,
    lifespan: Optional[Lifespan] = None,
    on_startup: Sequence[AnyCallable] = (),
    after_startup: Sequence[AnyCallable] = (),
    on_shutdown: Sequence[AnyCallable] = (),
    after_shutdown: Sequence[AnyCallable] = (),
    specification: Optional[SpecificationFactory] = None,
    asyncapi_path: str | AsyncAPIRoute | None = None,
)

Bases: Application

Source code in faststream/asgi/app.py
def __init__(
    self,
    broker: Optional["BrokerUsecase[Any, Any]"] = None,
    /,
    asgi_routes: Sequence[tuple[str, "ASGIApp"]] = (),
    logger: Optional["LoggerProto"] = logger,
    provider: Provider | None = None,
    serializer: Optional["SerializerProto"] = EMPTY,
    context: ContextRepo | None = None,
    lifespan: Optional["Lifespan"] = None,
    on_startup: Sequence["AnyCallable"] = (),
    after_startup: Sequence["AnyCallable"] = (),
    on_shutdown: Sequence["AnyCallable"] = (),
    after_shutdown: Sequence["AnyCallable"] = (),
    specification: Optional["SpecificationFactory"] = None,
    asyncapi_path: str | AsyncAPIRoute | None = None,
) -> None:
    self.routes = list(asgi_routes)

    super().__init__(
        broker,
        logger=logger,
        config=FastDependsConfig(
            provider=provider or dependency_provider,
            context=context or ContextRepo(),
            serializer=serializer,
        ),
        lifespan=lifespan,
        on_startup=on_startup,
        after_startup=after_startup,
        on_shutdown=on_shutdown,
        after_shutdown=after_shutdown,
        specification=specification,
    )

    if asyncapi_path:
        asyncapi_route = AsyncAPIRoute.ensure_route(asyncapi_path)
        handler = asyncapi_route(self.schema)
        handler.set_logger(logger)  # type: ignore[attr-defined]
        self.routes.append((asyncapi_route.path, handler))

    self._server = OuterRunState()

    self._log_level: int = logging.INFO
    self._run_extra_options: dict[str, SettingField] = {}

routes instance-attribute #

routes = list(asgi_routes)

context property #

context: ContextRepo

broker property #

broker: Optional[BrokerUsecase[Any, Any]]

logger instance-attribute #

logger = logger

lifespan_context instance-attribute #

lifespan_context = apply_types(
    func=lifespan,
    serializer_cls=_serializer,
    cast_result=False,
    context__=context,
)

from_app classmethod #

from_app(
    app: Application,
    asgi_routes: Sequence[tuple[str, ASGIApp]],
    asyncapi_path: str | AsyncAPIRoute | None = None,
) -> AsgiFastStream
Source code in faststream/asgi/app.py
@classmethod
def from_app(
    cls,
    app: Application,
    asgi_routes: Sequence[tuple[str, "ASGIApp"]],
    asyncapi_path: str | AsyncAPIRoute | None = None,
) -> "AsgiFastStream":
    asgi_app = cls(
        app.broker,
        asgi_routes=asgi_routes,
        asyncapi_path=asyncapi_path,
        logger=app.logger,
        lifespan=None,
    )
    asgi_app.lifespan_context = app.lifespan_context
    asgi_app._on_startup_calling = app._on_startup_calling
    asgi_app._after_startup_calling = app._after_startup_calling
    asgi_app._on_shutdown_calling = app._on_shutdown_calling
    asgi_app._after_shutdown_calling = app._after_shutdown_calling
    return asgi_app

mount #

mount(path: str, route: ASGIApp) -> None
Source code in faststream/asgi/app.py
def mount(self, path: str, route: "ASGIApp") -> None:
    asgi_route = (path, route)
    self.routes.append(asgi_route)
    self._register_route(asgi_route)

run async #

run(
    log_level: int = INFO,
    run_extra_options: dict[str, SettingField]
    | None = None,
) -> None
Source code in faststream/asgi/app.py
async def run(
    self,
    log_level: int = logging.INFO,
    run_extra_options: dict[str, "SettingField"] | None = None,
) -> None:
    if not HAS_UVICORN:
        raise ImportError(INSTALL_UVICORN)

    self._log_level = log_level
    self._run_extra_options = cast_uvicorn_params(run_extra_options or {})

    config = uvicorn.Config(
        app=self,
        log_level=self._log_level,
        **{
            key: v
            for key, v in self._run_extra_options.items()
            if key in set(inspect.signature(uvicorn.Config).parameters.keys())
        },
    )

    server = uvicorn.Server(config)
    await server.serve()

exit #

exit() -> None

Manual stop method.

Source code in faststream/asgi/app.py
def exit(self) -> None:
    """Manual stop method."""
    self._server.stop()

start_lifespan_context async #

start_lifespan_context(
    run_extra_options: dict[str, SettingField]
    | None = None,
) -> AsyncIterator[None]
Source code in faststream/asgi/app.py
@asynccontextmanager
async def start_lifespan_context(
    self,
    run_extra_options: dict[str, "SettingField"] | None = None,
) -> AsyncIterator[None]:
    run_extra_options = run_extra_options or self._run_extra_options

    async with self.lifespan_context(**run_extra_options):
        try:
            async with anyio.create_task_group() as tg:
                await tg.start(self.__start, logging.INFO, run_extra_options)

                try:
                    yield
                finally:
                    await self._shutdown()
                    tg.cancel_scope.cancel()

        except ExceptionGroup as e:
            for ex in e.exceptions:
                raise ex from None

lifespan async #

lifespan(
    scope: Scope, receive: Receive, send: Send
) -> None

Handle ASGI lifespan messages to start and shutdown the app.

Source code in faststream/asgi/app.py
async def lifespan(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
    """Handle ASGI lifespan messages to start and shutdown the app."""
    started = False
    await receive()  # handle `lifespan.startup` event

    async def process_exception(ex: BaseException) -> None:
        exc_text = traceback.format_exc()
        if started:
            await send({"type": "lifespan.shutdown.failed", "message": exc_text})
        else:
            await send({"type": "lifespan.startup.failed", "message": exc_text})
        raise ex

    try:
        async with self.start_lifespan_context():
            await send({"type": "lifespan.startup.complete"})
            started = True
            await receive()  # handle `lifespan.shutdown` event

    except StartupValidationError as startup_exc:
        # Process `on_startup` and `lifespan` missed extra options
        if HAS_TYPER:
            from faststream._internal.cli.utils.errors import draw_startup_errors

            draw_startup_errors(startup_exc)
            await send({"type": "lifespan.startup.failed", "message": ""})

        else:
            await process_exception(startup_exc)

    except BaseException as base_exc:
        await process_exception(base_exc)

    else:
        await send({"type": "lifespan.shutdown.complete"})

not_found async #

not_found(
    scope: Scope, receive: Receive, send: Send
) -> None
Source code in faststream/asgi/app.py
async def not_found(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
    not_found_msg = "Application doesn't support regular HTTP protocol."

    if scope["type"] == "websocket":
        websocket_close = WebSocketClose(
            code=1000,
            reason=not_found_msg,
        )
        await websocket_close(scope, receive, send)
        return

    response = AsgiResponse(
        body=not_found_msg.encode(),
        status_code=404,
    )

    await response(scope, receive, send)

set_broker #

set_broker(broker: BrokerUsecase[Any, Any]) -> None

Set already existed App object broker.

Useful then you create/init broker in on_startup hook.

Source code in faststream/_internal/application.py
def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
    """Set already existed App object broker.

    Useful then you create/init broker in `on_startup` hook.
    """
    if self.brokers:
        msg = f"`{self}` already has a broker. You can't use multiple brokers until 1.0.0 release."
        raise SetupError(msg)

    self.brokers.append(broker)
    _ = self.schema.add_broker(broker)

start async #

start(**run_extra_options: SettingField) -> None

Executes startup hooks and start broker.

Source code in faststream/_internal/application.py
async def start(
    self,
    **run_extra_options: "SettingField",
) -> None:
    """Executes startup hooks and start broker."""
    async with self._start_hooks_context(**run_extra_options):
        await self._start_broker()

stop async #

stop() -> None

Executes shutdown hooks and stop broker.

Source code in faststream/_internal/application.py
async def stop(self) -> None:
    """Executes shutdown hooks and stop broker."""
    async with self._shutdown_hooks_context():
        for broker in self.brokers:
            await broker.stop()

on_startup #

on_startup(
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]

Add hook running BEFORE broker connected.

This hook also takes an extra CLI options as a kwargs.

Source code in faststream/_internal/application.py
def on_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker connected.

    This hook also takes an extra CLI options as a kwargs.
    """
    self._on_startup_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

on_shutdown #

on_shutdown(
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]

Add hook running BEFORE broker disconnected.

Source code in faststream/_internal/application.py
def on_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker disconnected."""
    self._on_shutdown_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

after_startup #

after_startup(
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]

Add hook running AFTER broker connected.

Source code in faststream/_internal/application.py
def after_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker connected."""
    self._after_startup_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

after_shutdown #

after_shutdown(
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]

Add hook running AFTER broker disconnected.

Source code in faststream/_internal/application.py
def after_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker disconnected."""
    self._after_shutdown_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func