1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77"""Session middleware: read the session cookie, stamp ``request.state.identity``.
Wired only when ``settings.auth_mode != "none"`` (see ``main.py``). Runs
ahead of ``APIKeyAuthMiddleware`` so the latter can short-circuit on
identity. Both middlewares coexist on purpose โ API keys remain a parallel
auth path for programmatic clients.
The middleware never *rejects* a request on its own. It just populates
``request.state.identity`` when a valid session cookie is present and lets
``APIKeyAuthMiddleware`` (or the route's own ``Depends(require_identity)``)
make the final accept/reject decision. That separation keeps each
middleware single-purpose and makes the test matrix tractable.
It also handles sliding-window re-issue: if the session is past its
half-life and the request succeeds, we mint a fresh cookie before returning.
"""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from ..config import settings
from .cookies import set_session_cookies
from .factory import get_session_store
from .session import make_identity, should_reissue
logger = logging.getLogger(__name__)
class SessionAuthMiddleware(BaseHTTPMiddleware):
"""Read session cookie โ ``request.state.identity``; re-issue when stale."""
async def dispatch(
self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
cookie_name = settings.auth_session_cookie_name
cookie_value = request.cookies.get(cookie_name)
identity = None
if cookie_value:
store = get_session_store()
identity = store.decode(cookie_value)
if identity is None:
# Tampered or expired โ let downstream treat the request as
# anonymous. The browser will get the cookie cleared on the
# response below if the request needed auth and 401s.
logger.debug("Session cookie present but invalid; treating as anonymous")
if identity is not None:
request.state.identity = identity
response = await call_next(request)
# Sliding-window re-issue: only when the session is actually valid
# and the request didn't itself rotate the cookie (login/logout set
# ``response.headers["X-Auth-Cookie-Set"]`` to opt out). Avoids the
# double-write race where logout-then-reissue resurrects a session.
if (
identity is not None
and should_reissue(identity)
and "X-Auth-Cookie-Set" not in response.headers
):
fresh = make_identity(
sub=identity.sub,
provider_id=identity.provider_id,
ttl_seconds=settings.auth_session_ttl_seconds,
email=identity.email,
name=identity.name,
)
set_session_cookies(response=response, identity=fresh)
return response