Skip to content

BaseMiddleware

faststream.BaseMiddleware #

BaseMiddleware(
    msg: AnyMsg | None, /, *, context: ContextRepo
)

Bases: Generic[PublishCommandType, AnyMsg]

A base middleware class.

Source code in faststream/_internal/middlewares.py
def __init__(
    self,
    msg: AnyMsg | None,
    /,
    *,
    context: "ContextRepo",
) -> None:
    self.msg = msg
    self.context = context

msg instance-attribute #

msg = msg

context instance-attribute #

context = context

on_receive async #

on_receive() -> None

Hook to call on message receive.

Source code in faststream/_internal/middlewares.py
async def on_receive(self) -> None:
    """Hook to call on message receive."""

after_processed async #

after_processed(
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional[TracebackType] = None,
) -> bool | None

Asynchronously called after processing.

Source code in faststream/_internal/middlewares.py
async def after_processed(
    self,
    exc_type: type[BaseException] | None = None,
    exc_val: BaseException | None = None,
    exc_tb: Optional["TracebackType"] = None,
) -> bool | None:
    """Asynchronously called after processing."""
    return False

on_consume async #

on_consume(
    msg: StreamMessage[AnyMsg],
) -> StreamMessage[AnyMsg]

This option was deprecated and will be removed in 0.7.0. Please, use consume_scope instead.

Source code in faststream/_internal/middlewares.py
async def on_consume(
    self,
    msg: "StreamMessage[AnyMsg]",
) -> "StreamMessage[AnyMsg]":
    """This option was deprecated and will be removed in 0.7.0. Please, use `consume_scope` instead."""
    return msg

after_consume async #

after_consume(err: Exception | None) -> None

This option was deprecated and will be removed in 0.7.0. Please, use consume_scope instead.

Source code in faststream/_internal/middlewares.py
async def after_consume(self, err: Exception | None) -> None:
    """This option was deprecated and will be removed in 0.7.0. Please, use `consume_scope` instead."""
    if err is not None:
        raise err

consume_scope async #

consume_scope(
    call_next: AsyncFuncAny, msg: StreamMessage[AnyMsg]
) -> Any

Asynchronously consumes a message and returns an asynchronous iterator of decoded messages.

Source code in faststream/_internal/middlewares.py
async def consume_scope(
    self,
    call_next: "AsyncFuncAny",
    msg: "StreamMessage[AnyMsg]",
) -> Any:
    """Asynchronously consumes a message and returns an asynchronous iterator of decoded messages."""
    err: Exception | None = None
    try:
        result = await call_next(await self.on_consume(msg))

    except Exception as e:
        err = e

    else:
        return result

    finally:
        await self.after_consume(err)

on_publish async #

on_publish(msg: PublishCommandType) -> PublishCommandType

This option was deprecated and will be removed in 0.7.0. Please, use publish_scope instead.

Source code in faststream/_internal/middlewares.py
async def on_publish(
    self,
    msg: PublishCommandType,
) -> PublishCommandType:
    """This option was deprecated and will be removed in 0.7.0. Please, use `publish_scope` instead."""
    return msg

after_publish async #

after_publish(err: Exception | None) -> None

This option was deprecated and will be removed in 0.7.0. Please, use publish_scope instead.

Source code in faststream/_internal/middlewares.py
async def after_publish(
    self,
    err: Exception | None,
) -> None:
    """This option was deprecated and will be removed in 0.7.0. Please, use `publish_scope` instead."""
    if err is not None:
        raise err

publish_scope async #

publish_scope(
    call_next: Callable[
        [PublishCommandType], Awaitable[Any]
    ],
    cmd: PublishCommandType,
) -> Any

Publish a message and return an async iterator.

Source code in faststream/_internal/middlewares.py
async def publish_scope(
    self,
    call_next: Callable[[PublishCommandType], Awaitable[Any]],
    cmd: PublishCommandType,
) -> Any:
    """Publish a message and return an async iterator."""
    err: Exception | None = None
    try:
        result = await call_next(await self.on_publish(cmd))

    except Exception as e:
        err = e

    else:
        return result

    finally:
        await self.after_publish(err)