Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 39 additions & 45 deletions docs/integrations/faststream.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,10 @@ FastStream - Litestar/FastAPI - dishka integration
from dishka import make_async_container
from dishka.integrations import faststream as faststream_integration
from dishka.integrations import litestar as litestar_integration
from dishka.integrations.base import FromDishka as Depends
from dishka.integrations.base import FromDishka
from dishka.integrations.faststream import inject as faststream_inject
from dishka.integrations.litestar import inject as litestar_inject
from faststream import FastStream
from faststream.rabbit import RabbitBroker, RabbitRouter, RabbitRoute
from faststream.rabbit import RabbitBroker, RabbitRouter
from litestar import Litestar, route, HttpMethod


Expand All @@ -131,34 +130,30 @@ FastStream - Litestar/FastAPI - dishka integration

@route(http_method=HttpMethod.GET, path="/", status_code=200)
@litestar_inject
async def http_handler(some_dependency: Depends[SomeDependency]) -> None:
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(data: str, some_dependency: Depends[SomeDependency]) -> None:
print(f"{data=}")
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()


def create_app():
def create_app() -> Litestar:
container = make_async_container(SomeProvider())

broker = RabbitBroker(
url="amqp://guest:guest@localhost:5672/",
)
amqp_routes = RabbitRouter(
handlers=(
RabbitRoute(amqp_handler, "test-queue"),
)
)
broker.include_router(amqp_routes)
faststream_integration.setup_dishka(container, FastStream(broker))
broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
broker.include_router(amqp_router)
faststream_integration.setup_dishka(container, broker=broker)

http = Litestar(
route_handlers=[http_handler],
on_startup=[broker.start],
on_shutdown=[broker.close],
on_shutdown=[broker.stop],
)
litestar_integration.setup_dishka(container, http)
return http
Expand All @@ -167,21 +162,23 @@ FastStream - Litestar/FastAPI - dishka integration
if __name__ == "__main__":
uvicorn.run(create_app(), host="0.0.0.0", port=8000)


3. Example of usage ``FastStream`` + ``FastAPI``

.. code-block:: python

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

import uvicorn
from dishka import Provider, Scope, provide
from dishka import make_async_container
from fastapi import APIRouter, FastAPI
from faststream.rabbit import RabbitBroker, RabbitRouter
from dishka import Provider, Scope, make_async_container, provide
from dishka.integrations import fastapi as fastapi_integration
from dishka.integrations import faststream as faststream_integration
from dishka.integrations.base import FromDishka as Depends
from dishka.integrations.base import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from dishka.integrations.faststream import inject as faststream_inject
from fastapi import FastAPI, APIRouter
from faststream import FastStream
from faststream.rabbit import RabbitBroker, RabbitRouter, RabbitRoute


class SomeDependency:
Expand All @@ -196,40 +193,37 @@ FastStream - Litestar/FastAPI - dishka integration
return SomeDependency()


router = APIRouter(
route_class=DishkaRoute,
)
router = APIRouter(route_class=DishkaRoute)


@router.get("/")
async def http_handler(some_dependency: Depends[SomeDependency]) -> None:
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(data: str, some_dependency: Depends[SomeDependency]) -> None:
print(f"{data=}")
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()


def create_app():
def create_app() -> FastAPI:
container = make_async_container(SomeProvider())

broker = RabbitBroker(
url="amqp://guest:guest@localhost:5672/",
)
amqp_routes = RabbitRouter(
handlers=(
RabbitRoute(amqp_handler, "test-queue"),
)
)
broker.include_router(amqp_routes)
faststream_integration.setup_dishka(container, FastStream(broker))
broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
broker.include_router(amqp_router)
faststream_integration.setup_dishka(container, broker=broker)

http = FastAPI(
on_startup=[broker.start],
on_shutdown=[broker.close],
)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
async with broker:
await broker.start()
yield

http = FastAPI(lifespan=lifespan)
http.include_router(router)
fastapi_integration.setup_dishka(container, http)
return http
Expand Down
6 changes: 5 additions & 1 deletion src/dishka/integrations/faststream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
if FASTSTREAM_05:
from .faststream_05 import FastStreamProvider, inject, setup_dishka
elif FASTSTREAM_06:
from .faststream_06 import FastStreamProvider, inject, setup_dishka
from .faststream_06 import ( # type: ignore[assignment]
FastStreamProvider,
inject,
setup_dishka,
)
else:
raise RuntimeError( # noqa: TRY003
f"FastStream {FASTSTREAM_VERSION} version not supported",
Expand Down
25 changes: 16 additions & 9 deletions src/dishka/integrations/faststream/faststream_06.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
get_type_hints,
)

from faststream import BaseMiddleware, FastStream
from faststream import BaseMiddleware, Context, FastStream
from faststream._internal.basic_types import DecodedMessage
from faststream._internal.broker import BrokerUsecase as BrokerType
from faststream._internal.context import ContextRepo
Expand All @@ -37,17 +37,24 @@ class FastStreamProvider(Provider):
message = from_context(StreamMessage, scope=Scope.REQUEST)


Application: TypeAlias = FastStream | AsgiFastStream # type: ignore[no-redef,misc]
Application: TypeAlias = FastStream | AsgiFastStream

try:
# import works only if fastapi is installed
from faststream._internal.fastapi import Context, StreamRouter
from faststream._internal.fastapi import Context as FastAPIContext
from faststream._internal.fastapi import StreamRouter

except ImportError:
from faststream import Context
ContextAnnotation: TypeAlias = Annotated[ContextRepo, Context("context")]

else:
Application |= StreamRouter # type: ignore[assignment]
ContextAnnotation = Annotated[ # type: ignore[misc,assignment]
ContextRepo,
FastAPIContext("context"),
Context("context"),
]

Application |= StreamRouter


class ApplicationLike(Protocol):
Expand Down Expand Up @@ -92,7 +99,7 @@ def setup_dishka(
stacklevel=2,
)

broker: BrokerType = broker or getattr(app, "broker", None)
broker = broker or getattr(app, "broker", None)
assert broker # noqa: S101

broker.insert_middleware(DishkaMiddleware(container))
Expand Down Expand Up @@ -129,7 +136,7 @@ def __init__(
self.container = container
super().__init__(*args, **kwargs)

async def consume_scope( # type: ignore[misc]
async def consume_scope(
self,
call_next: Callable[[Any], Awaitable[Any]],
msg: StreamMessage[Any],
Expand Down Expand Up @@ -173,7 +180,7 @@ def _find_context_param(func: Callable[_ParamsP, _ReturnT]) -> str | None:


DISHKA_CONTEXT_PARAM = Parameter(
name="___dishka_context",
annotation=Annotated[ContextRepo, Context("context")],
name="dishka_context__",
annotation=ContextAnnotation,
kind=Parameter.KEYWORD_ONLY,
)
Loading