Skip to content

AsgiFastStream

faststream.asgi.AsgiFastStream #

AsgiFastStream(
    broker=None,
    /,
    asgi_routes=(),
    logger=logger,
    provider=None,
    serializer=EMPTY,
    context=None,
    lifespan=None,
    on_startup=(),
    after_startup=(),
    on_shutdown=(),
    after_shutdown=(),
    specification=None,
    asyncapi_path=None,
)

Bases: Application

Source code in faststream/asgi/app.py
def __init__(
    self,
    broker: Optional["BrokerUsecase[Any, Any]"] = None,
    /,
    asgi_routes: Sequence[tuple[str, "ASGIApp"]] = (),
    logger: Optional["LoggerProto"] = logger,
    provider: Provider | None = None,
    serializer: Optional["SerializerProto"] = EMPTY,
    context: ContextRepo | None = None,
    lifespan: Optional["Lifespan"] = None,
    on_startup: Sequence["AnyCallable"] = (),
    after_startup: Sequence["AnyCallable"] = (),
    on_shutdown: Sequence["AnyCallable"] = (),
    after_shutdown: Sequence["AnyCallable"] = (),
    specification: Optional["SpecificationFactory"] = None,
    asyncapi_path: str | AsyncAPIRoute | None = None,
) -> None:
    self.routes = list(asgi_routes)

    super().__init__(
        broker,
        logger=logger,
        config=FastDependsConfig(
            provider=provider or dependency_provider,
            context=context or ContextRepo(),
            serializer=serializer,
        ),
        lifespan=lifespan,
        on_startup=on_startup,
        after_startup=after_startup,
        on_shutdown=on_shutdown,
        after_shutdown=after_shutdown,
        specification=specification,
    )

    if asyncapi_path:
        asyncapi_route = AsyncAPIRoute.ensure_route(asyncapi_path)
        self.routes.append((asyncapi_route.path, asyncapi_route(self.schema)))

    self._server = OuterRunState()

    self._log_level: int = logging.INFO
    self._run_extra_options: dict[str, SettingField] = {}

context property #

context

broker property #

broker

logger instance-attribute #

logger = logger

lifespan_context instance-attribute #

lifespan_context = apply_types(
    func=lifespan,
    serializer_cls=_serializer,
    cast_result=False,
    context__=context,
)

routes instance-attribute #

routes = list(asgi_routes)

set_broker #

set_broker(broker)

Set already existed App object broker.

Useful then you create/init broker in on_startup hook.

Source code in faststream/_internal/application.py
def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
    """Set already existed App object broker.

    Useful then you create/init broker in `on_startup` hook.
    """
    if self.brokers:
        msg = f"`{self}` already has a broker. You can't use multiple brokers until 1.0.0 release."
        raise SetupError(msg)

    self.brokers.append(broker)

start async #

start(**run_extra_options)

Executes startup hooks and start broker.

Source code in faststream/_internal/application.py
async def start(
    self,
    **run_extra_options: "SettingField",
) -> None:
    """Executes startup hooks and start broker."""
    async with self._start_hooks_context(**run_extra_options):
        await self._start_broker()

stop async #

stop()

Executes shutdown hooks and stop broker.

Source code in faststream/_internal/application.py
async def stop(self) -> None:
    """Executes shutdown hooks and stop broker."""
    async with self._shutdown_hooks_context():
        for broker in self.brokers:
            await broker.stop()

on_startup #

on_startup(func)

Add hook running BEFORE broker connected.

This hook also takes an extra CLI options as a kwargs.

Source code in faststream/_internal/application.py
def on_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker connected.

    This hook also takes an extra CLI options as a kwargs.
    """
    self._on_startup_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

on_shutdown #

on_shutdown(func)

Add hook running BEFORE broker disconnected.

Source code in faststream/_internal/application.py
def on_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker disconnected."""
    self._on_shutdown_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

after_startup #

after_startup(func)

Add hook running AFTER broker connected.

Source code in faststream/_internal/application.py
def after_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker connected."""
    self._after_startup_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

after_shutdown #

after_shutdown(func)

Add hook running AFTER broker disconnected.

Source code in faststream/_internal/application.py
def after_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker disconnected."""
    self._after_shutdown_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

from_app classmethod #

from_app(app, asgi_routes, asyncapi_path=None)
Source code in faststream/asgi/app.py
@classmethod
def from_app(
    cls,
    app: Application,
    asgi_routes: Sequence[tuple[str, "ASGIApp"]],
    asyncapi_path: str | AsyncAPIRoute | None = None,
) -> "AsgiFastStream":
    asgi_app = cls(
        app.broker,
        asgi_routes=asgi_routes,
        asyncapi_path=asyncapi_path,
        logger=app.logger,
        lifespan=None,
    )
    asgi_app.lifespan_context = app.lifespan_context
    asgi_app._on_startup_calling = app._on_startup_calling
    asgi_app._after_startup_calling = app._after_startup_calling
    asgi_app._on_shutdown_calling = app._on_shutdown_calling
    asgi_app._after_shutdown_calling = app._after_shutdown_calling
    return asgi_app

mount #

mount(path, route)
Source code in faststream/asgi/app.py
def mount(self, path: str, route: "ASGIApp") -> None:
    asgi_route = (path, route)
    self.routes.append(asgi_route)
    self._register_route(asgi_route)

run async #

run(log_level=INFO, run_extra_options=None)
Source code in faststream/asgi/app.py
async def run(
    self,
    log_level: int = logging.INFO,
    run_extra_options: dict[str, "SettingField"] | None = None,
) -> None:
    if not HAS_UVICORN:
        raise ImportError(INSTALL_UVICORN)

    self._log_level = log_level
    self._run_extra_options = cast_uvicorn_params(run_extra_options or {})

    config = uvicorn.Config(
        app=self,
        log_level=self._log_level,
        **{
            key: v
            for key, v in self._run_extra_options.items()
            if key in set(inspect.signature(uvicorn.Config).parameters.keys())
        },
    )

    server = uvicorn.Server(config)
    await server.serve()

exit #

exit()

Manual stop method.

Source code in faststream/asgi/app.py
def exit(self) -> None:
    """Manual stop method."""
    self._server.stop()

start_lifespan_context async #

start_lifespan_context(run_extra_options=None)
Source code in faststream/asgi/app.py
@asynccontextmanager
async def start_lifespan_context(
    self,
    run_extra_options: dict[str, "SettingField"] | None = None,
) -> AsyncIterator[None]:
    run_extra_options = run_extra_options or self._run_extra_options

    async with self.lifespan_context(**run_extra_options):
        try:
            async with anyio.create_task_group() as tg:
                await tg.start(self.__start, logging.INFO, run_extra_options)

                try:
                    yield
                finally:
                    await self._shutdown()
                    tg.cancel_scope.cancel()

        except ExceptionGroup as e:
            for ex in e.exceptions:
                raise ex from None

lifespan async #

lifespan(scope, receive, send)

Handle ASGI lifespan messages to start and shutdown the app.

Source code in faststream/asgi/app.py
async def lifespan(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
    """Handle ASGI lifespan messages to start and shutdown the app."""
    started = False
    await receive()  # handle `lifespan.startup` event

    async def process_exception(ex: BaseException) -> None:
        exc_text = traceback.format_exc()
        if started:
            await send({"type": "lifespan.shutdown.failed", "message": exc_text})
        else:
            await send({"type": "lifespan.startup.failed", "message": exc_text})
        raise ex

    try:
        async with self.start_lifespan_context():
            await send({"type": "lifespan.startup.complete"})
            started = True
            await receive()  # handle `lifespan.shutdown` event

    except StartupValidationError as startup_exc:
        # Process `on_startup` and `lifespan` missed extra options
        if HAS_TYPER:
            from faststream._internal.cli.utils.errors import draw_startup_errors

            draw_startup_errors(startup_exc)
            await send({"type": "lifespan.startup.failed", "message": ""})

        else:
            await process_exception(startup_exc)

    except BaseException as base_exc:
        await process_exception(base_exc)

    else:
        await send({"type": "lifespan.shutdown.complete"})

not_found async #

not_found(scope, receive, send)
Source code in faststream/asgi/app.py
async def not_found(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
    not_found_msg = "Application doesn't support regular HTTP protocol."

    if scope["type"] == "websocket":
        websocket_close = WebSocketClose(
            code=1000,
            reason=not_found_msg,
        )
        await websocket_close(scope, receive, send)
        return

    response = AsgiResponse(
        body=not_found_msg.encode(),
        status_code=404,
    )

    await response(scope, receive, send)