1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121"""Signed-cookie session encoder/decoder.
Phase 1+2 use a stateless signed cookie via :mod:`itsdangerous`. The
:class:`SessionStore` Protocol is defined so a Redis-backed implementation
can be swapped in for Phase 3 with a one-line registration change in
``factory.py``; nothing else moves.
"""
from __future__ import annotations
import logging
from datetime import UTC, datetime, timedelta
from typing import Protocol
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
from .base import Identity
logger = logging.getLogger(__name__)
class SessionStore(Protocol):
"""Encode/decode an :class:`Identity` to/from an opaque cookie value."""
def encode(self, identity: Identity) -> str:
"""Serialise + sign an identity into a cookie-safe string."""
...
def decode(self, value: str) -> Identity | None:
"""Verify and deserialise a cookie value. Returns ``None`` for
tampered, expired, or otherwise invalid values โ callers must treat
``None`` as "no session".
"""
...
class SignedCookieSession:
"""Stateless session: the cookie *is* the session.
The signing key is also used as the input to ``itsdangerous`` so rotating
it invalidates every active session โ that's intentional and documented
in the architecture explanation.
"""
_SALT = "reflow-session-v1"
def __init__(self, secret_key: str, max_age_seconds: int) -> None:
if len(secret_key) < 32:
raise ValueError("auth_secret_key must be at least 32 characters")
self._serializer = URLSafeTimedSerializer(secret_key, salt=self._SALT)
self._max_age = max_age_seconds
def encode(self, identity: Identity) -> str:
# Pydantic emits ISO-8601 for datetime; json round-trips through .model_dump_json
return self._serializer.dumps(identity.model_dump(mode="json"))
def decode(self, value: str) -> Identity | None:
try:
payload = self._serializer.loads(value, max_age=self._max_age)
except SignatureExpired:
return None
except BadSignature:
logger.warning("Session cookie failed signature verification")
return None
except Exception:
# Malformed payload โ never propagate; treat as anonymous.
logger.warning("Failed to decode session cookie", exc_info=True)
return None
try:
return Identity.model_validate(payload)
except Exception:
logger.warning("Session cookie payload failed Identity validation", exc_info=True)
return None
def make_identity(
*, sub: str, provider_id: str, ttl_seconds: int, email: str | None = None, name: str | None = None
) -> Identity:
"""Construct a fresh :class:`Identity` with ``issued_at``/``expires_at``
derived from ``ttl_seconds``. Use at login and at sliding-window re-issue.
"""
now = datetime.now(UTC)
return Identity(
sub=sub,
provider_id=provider_id,
email=email,
name=name,
issued_at=now,
expires_at=now + timedelta(seconds=ttl_seconds),
)
def should_reissue(identity: Identity) -> bool:
"""Sliding-window check: re-issue once we're past the half-life."""
now = datetime.now(UTC)
total = (identity.expires_at - identity.issued_at).total_seconds()
elapsed = (now - identity.issued_at).total_seconds()
return elapsed > (total / 2.0)
def make_tx_serializer(secret_key: str) -> URLSafeTimedSerializer:
"""Serializer for the short-lived OAuth transaction cookie.
Uses a different salt from :class:`SignedCookieSession` so a session
cookie can never be replayed as a tx cookie or vice versa. Caller
enforces TTL via the ``max_age`` argument to ``loads``.
"""
if len(secret_key) < 32:
raise ValueError("auth_secret_key must be at least 32 characters")
return URLSafeTimedSerializer(secret_key, salt="reflow-oauth-tx-v1")
__all__ = [
"SessionStore",
"SignedCookieSession",
"make_identity",
"make_tx_serializer",
"should_reissue",
]