Skip to content

FastStream

faststream.FastStream #

FastStream(
    broker=None,
    /,
    logger=logger,
    provider=None,
    serializer=EMPTY,
    context=None,
    lifespan=None,
    on_startup=(),
    after_startup=(),
    on_shutdown=(),
    after_shutdown=(),
    specification=None,
)

Bases: Application

A class representing a FastStream application.

Source code in faststream/app.py
def __init__(
    self,
    broker: Optional["BrokerUsecase[Any, Any]"] = None,
    /,
    logger: Optional["LoggerProto"] = logger,
    provider: Optional["Provider"] = None,
    serializer: Optional["SerializerProto"] = EMPTY,
    context: ContextRepo | None = None,
    lifespan: Optional["Lifespan"] = None,
    on_startup: Sequence["AnyCallable"] = (),
    after_startup: Sequence["AnyCallable"] = (),
    on_shutdown: Sequence["AnyCallable"] = (),
    after_shutdown: Sequence["AnyCallable"] = (),
    specification: Optional["SpecificationFactory"] = None,
) -> None:
    super().__init__(
        broker,
        logger=logger,
        config=FastDependsConfig(
            provider=provider or dependency_provider,
            context=context or ContextRepo(),
            serializer=serializer,
        ),
        lifespan=lifespan,
        on_startup=on_startup,
        after_startup=after_startup,
        on_shutdown=on_shutdown,
        after_shutdown=after_shutdown,
        specification=specification,
    )

    self._should_exit = False

context property #

context

broker property #

broker

logger instance-attribute #

logger = logger

lifespan_context instance-attribute #

lifespan_context = apply_types(
    func=lifespan,
    serializer_cls=_serializer,
    cast_result=False,
    context__=context,
)

set_broker #

set_broker(broker)

Set already existed App object broker.

Useful then you create/init broker in on_startup hook.

Source code in faststream/_internal/application.py
def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
    """Set already existed App object broker.

    Useful then you create/init broker in `on_startup` hook.
    """
    if self.brokers:
        msg = f"`{self}` already has a broker. You can't use multiple brokers until 1.0.0 release."
        raise SetupError(msg)

    self.brokers.append(broker)

start async #

start(**run_extra_options)

Executes startup hooks and start broker.

Source code in faststream/_internal/application.py
async def start(
    self,
    **run_extra_options: "SettingField",
) -> None:
    """Executes startup hooks and start broker."""
    async with self._start_hooks_context(**run_extra_options):
        await self._start_broker()

stop async #

stop()

Executes shutdown hooks and stop broker.

Source code in faststream/_internal/application.py
async def stop(self) -> None:
    """Executes shutdown hooks and stop broker."""
    async with self._shutdown_hooks_context():
        for broker in self.brokers:
            await broker.stop()

on_startup #

on_startup(func)

Add hook running BEFORE broker connected.

This hook also takes an extra CLI options as a kwargs.

Source code in faststream/_internal/application.py
def on_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker connected.

    This hook also takes an extra CLI options as a kwargs.
    """
    self._on_startup_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

on_shutdown #

on_shutdown(func)

Add hook running BEFORE broker disconnected.

Source code in faststream/_internal/application.py
def on_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running BEFORE broker disconnected."""
    self._on_shutdown_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

after_startup #

after_startup(func)

Add hook running AFTER broker connected.

Source code in faststream/_internal/application.py
def after_startup(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker connected."""
    self._after_startup_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

after_shutdown #

after_shutdown(func)

Add hook running AFTER broker disconnected.

Source code in faststream/_internal/application.py
def after_shutdown(
    self,
    func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
    """Add hook running AFTER broker disconnected."""
    self._after_shutdown_calling.append(
        apply_types(
            to_async(func),
            serializer_cls=self.config._serializer,
            context__=self.context,
        ),
    )
    return func

run async #

run(log_level=INFO, run_extra_options=None, sleep_time=0.1)

Run FastStream Application.

Source code in faststream/app.py
async def run(
    self,
    log_level: int = logging.INFO,
    run_extra_options: dict[str, "SettingField"] | None = None,
    sleep_time: float = 0.1,
) -> None:
    """Run FastStream Application."""
    set_exit(lambda *_: self.exit(), sync=False)

    async with self.lifespan_context(**(run_extra_options or {})):
        try:
            async with anyio.create_task_group() as tg:
                tg.start_soon(self._startup, log_level, run_extra_options)

                while not self._should_exit:  # noqa: ASYNC110 (requested by creator)
                    await anyio.sleep(sleep_time)

                await self._shutdown(log_level)
                tg.cancel_scope.cancel()
        except ExceptionGroup as e:
            for ex in e.exceptions:
                raise ex from None

exit #

exit()

Stop application manually.

Source code in faststream/app.py
def exit(self) -> None:
    """Stop application manually."""
    self._should_exit = True

as_asgi #

as_asgi(asgi_routes=(), asyncapi_path=None)
Source code in faststream/app.py
def as_asgi(
    self,
    asgi_routes: Sequence[tuple[str, "ASGIApp"]] = (),
    asyncapi_path: Union[str, "AsyncAPIRoute", None] = None,
) -> AsgiFastStream:
    return AsgiFastStream.from_app(
        self,
        asgi_routes=asgi_routes,
        asyncapi_path=asyncapi_path,
    )