Skip to content

CLI#

The FastStream CLI is a built-in tool designed to streamline your development workflow.

Thanks to typer, watchfiles and uvicorn. Their work is the basis of this tool.

Installation:#

To use the FastStream CLI, install the required dependencies:

pip install 'faststream[cli]'

AsyncAPI Schema#

Generate your AsyncAPI document as a .json or .yaml file from your code, or host it directly as a styled HTML page. Learn more about hosting options

Publishing messages#

The FastStream CLI allows you to publish test messages directly to your broker. This is especially useful during development for debugging and testing without needing to write custom publisher code.

faststream publish main:app '{"name": "John"}' --subject 'my-subject'

Running the Project#

The primary command to launch a FastStream application faststream run. This command supports a variety of options to customize your application runtime:

Scaling#

FastStream allows you to scale application right from the command line by running you application in multiple instances. Just set the --workers option to scale your application:

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(worker_id: int) -> None:
    print(f"Worker {worker_id} started")
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(worker_id: int) -> None:
    print(f"Worker {worker_id} started")
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(worker_id: int) -> None:
    print(f"Worker {worker_id} started")
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(worker_id: int) -> None:
    print(f"Worker {worker_id} started")
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(worker_id: int) -> None:
    print(f"Worker {worker_id} started")
faststream run main:app --workers 3

2025-08-20 17:35:03,932 INFO     - Started parent process [95606]
2025-08-20 17:35:03,940 INFO     - Started child process 0 [95608]
2025-08-20 17:35:03,942 INFO     - Started child process 1 [95609]
2025-08-20 17:35:03,944 INFO     - Started child process 2 [95610]
Worker 0 started
Worker 1 started
Worker 2 started

ASGI Support#

Running your app as ASGI. For details, see ASGI Support

Hot Reload#

Extends reload option to also watch and reload on additional files (e.g., templates, configurations, specifications, etc.).

faststream run main:app --reload

By default FastStream watches for .py file changes, but you can specify an extra file extensions to watch by (your config files as an example)

faststream run main:app --reload  --reload-ext .yml --realod-ext .yaml

Extra options#

FastStream support extra startup arguments:

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(port: int, foo: str) -> None:
    print("Port:", port)
    print("Foo:", foo)
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(port: int, foo: str) -> None:
    print("Port:", port)
    print("Foo:", foo)
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(port: int, foo: str) -> None:
    print("Port:", port)
    print("Foo:", foo)
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(port: int, foo: str) -> None:
    print("Port:", port)
    print("Foo:", foo)
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker()
app = FastStream(broker)


@broker.subscriber("my-subject")
async def handler(msg: dict) -> None:
    print(f"Received: {msg}")


@app.on_startup
async def startup(port: int, foo: str) -> None:
    print("Port:", port)
    print("Foo:", foo)
faststream run main:app --port 5000 --foo bar

2025-08-20 17:53:44,224 INFO     - FastStream app starting...
Port: 5000
Foo: bar

Environment Management#

You can pass any custom flags or configuration options to the CLI without predefining them in your application. These values will be available in your application's environment.

For example, we will pass the .env file to the context of our application:

faststream run main:app --env=.env.dev
from faststream import FastStream, ContextRepo
from faststream.kafka import KafkaBroker
from pydantic_settings import BaseSettings

broker = KafkaBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
from faststream import FastStream, ContextRepo
from faststream.confluent import KafkaBroker
from pydantic_settings import BaseSettings

broker = KafkaBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
from faststream import FastStream, ContextRepo
from faststream.rabbit import RabbitBroker
from pydantic_settings import BaseSettings

broker = RabbitBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
from faststream import FastStream, ContextRepo
from faststream.nats import NatsBroker
from pydantic_settings import BaseSettings

broker = NatsBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)
from faststream import FastStream, ContextRepo
from faststream.redis import RedisBroker
from pydantic_settings import BaseSettings

broker = RedisBroker()

app = FastStream(broker)

class Settings(BaseSettings):
    any_flag: bool

@app.on_startup
async def setup(context: ContextRepo, env: str = ".env"):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)

Note

Note that the env parameter was passed to the setup function directly from the command line

All passed values can be of type bool, str or list[str].

Logging Configuration#

You can pass any custom flags for logging configuration, it's --log-level or --log-config for detailed logging configuration. See here