Skip to content

AsgiRequest

faststream.asgi.request.AsgiRequest #

AsgiRequest(scope, receive, send)
Source code in faststream/asgi/request.py
def __init__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
    self._scope = scope
    self._receive = receive
    self._send = send
    self._stream_consumed = False
    self._body: bytes | None = None
    self._json: Any = EMPTY
    self._query_params: dict[str, list[str]] | None = None
    self._headers: dict[str, str] | None = None

query_params property #

query_params

headers property #

headers

method property #

method

stream async #

stream()
Source code in faststream/asgi/request.py
async def stream(self) -> AsyncGenerator[bytes, None]:
    if self._body is not None:
        yield self._body
        yield b""
        return
    if self._stream_consumed:
        msg = "Stream consumed"
        raise RuntimeError(msg)
    while not self._stream_consumed:
        message = await self._receive()
        if message["type"] == "http.request":
            body = message.get("body", b"")
            if not message.get("more_body", False):
                self._stream_consumed = True
            if body:
                yield body
        elif message["type"] == "http.disconnect":  # pragma: no branch
            self._is_disconnected = True
            raise ClientDisconnectError
    yield b""

body async #

body()
Source code in faststream/asgi/request.py
async def body(self) -> bytes:
    if self._body is None:
        chunks: list[bytes] = [chunk async for chunk in self.stream()]
        self._body = b"".join(chunks)
    return self._body

json async #

json()
Source code in faststream/asgi/request.py
async def json(self) -> Any:
    if self._json is EMPTY:
        body = await self.body()
        self._json = json.loads(body)
    return self._json