Skip to content

UnsubscribeAdapter

faststream.nats.subscriber.adapters.UnsubscribeAdapter #

UnsubscribeAdapter(subscription: WatchableT)

Bases: Unsubscriptable, Generic[WatchableT]

Source code in faststream/nats/subscriber/adapters.py
def __init__(self, subscription: WatchableT) -> None:
    self.obj = subscription

obj instance-attribute #

obj: WatchableT = subscription

unsubscribe async #

unsubscribe() -> None
Source code in faststream/nats/subscriber/adapters.py
async def unsubscribe(self) -> None:
    await self.obj.stop()