๐Ÿ“ฆ EqualifyEverything / equalify-reflow

๐Ÿ“„ auth.py ยท 253 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253"""Fixtures for auth integration tests.

The global ``src.main.app`` is built against the Settings singleton at import
time, so we can't switch ``AUTH_MODE`` mid-test. These fixtures construct a
minimal FastAPI app per test against patched settings, exercising the same
middleware and router as production.
"""

from __future__ import annotations

from collections.abc import Iterator
from typing import Any

import pytest
from argon2 import PasswordHasher
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient

# Hoisted to module scope: ``from __future__ import annotations`` defers
# annotation evaluation, and FastAPI looks up route-parameter type hints
# in the route's ``__globals__`` at registration time. If ``Request`` were
# imported inside the fixture function, FastAPI couldn't resolve it and
# would treat ``request: Request`` as a query parameter (422 missing-field).
from src.auth.base import Identity


@pytest.fixture
def basic_password() -> str:
    return "correct-horse-battery-staple"


@pytest.fixture
def basic_users_csv(basic_password: str) -> str:
    """Single-user AUTH_BASIC_USERS CSV, hashed at fixture time."""
    return f"alice:{PasswordHasher().hash(basic_password)}"


def _build_app(settings_overrides: dict[str, Any]) -> FastAPI:
    """Build a fresh FastAPI app with the requested settings.

    We monkey-patch the ``src.config.settings`` singleton's attributes so the
    middleware and routes pick up the new values, then clear the factory's
    ``lru_cache`` so the auth provider is re-resolved.
    """
    from src.auth import factory
    from src.config import settings as global_settings

    # Stash + apply overrides
    saved: dict[str, Any] = {}
    for key, value in settings_overrides.items():
        saved[key] = getattr(global_settings, key)
        object.__setattr__(global_settings, key, value)

    factory.get_auth_provider.cache_clear()
    factory.get_auth_providers.cache_clear()
    factory.get_session_store.cache_clear()

    app = FastAPI()
    if global_settings.auth_mode != "none":
        from src.auth.middleware import SessionAuthMiddleware

        app.add_middleware(SessionAuthMiddleware)

    from src.auth.routes import router as auth_router

    app.include_router(auth_router)

    # Restore on teardown by stashing the saved dict on the app for the
    # caller fixture to clean up.
    app.state._auth_test_saved = saved
    return app


def _restore(app: FastAPI) -> None:
    from src.auth import factory
    from src.config import settings as global_settings

    saved = getattr(app.state, "_auth_test_saved", {})
    for key, value in saved.items():
        object.__setattr__(global_settings, key, value)
    factory.get_auth_provider.cache_clear()
    factory.get_auth_providers.cache_clear()
    factory.get_session_store.cache_clear()


@pytest.fixture
def auth_basic_app(basic_users_csv: str) -> Iterator[FastAPI]:
    """Minimal app wired for AUTH_MODE=basic."""
    from pydantic import SecretStr

    app = _build_app(
        {
            "auth_mode": "basic",
            "auth_secret_key": SecretStr("x" * 32),
            "auth_basic_users": SecretStr(basic_users_csv),
            "auth_cookie_secure": False,  # TestClient runs over HTTP
            "auth_session_cookie_name": "reflow_session",
            "auth_session_ttl_seconds": 3600,
        }
    )
    try:
        yield app
    finally:
        _restore(app)


@pytest.fixture
def auth_basic_client(auth_basic_app: FastAPI) -> TestClient:
    return TestClient(auth_basic_app)


@pytest.fixture
def auth_none_app() -> Iterator[FastAPI]:
    app = _build_app({"auth_mode": "none"})
    try:
        yield app
    finally:
        _restore(app)


@pytest.fixture
def auth_none_client(auth_none_app: FastAPI) -> TestClient:
    return TestClient(auth_none_app)


def _restore_settings(saved: dict[str, Any]) -> None:
    """Roll back settings overrides โ€” used when a fixture builds its own app."""
    from src.auth import factory
    from src.config import settings as global_settings

    for key, value in saved.items():
        object.__setattr__(global_settings, key, value)
    factory.get_auth_provider.cache_clear()
    factory.get_auth_providers.cache_clear()
    factory.get_session_store.cache_clear()


@pytest.fixture
def auth_full_app(basic_users_csv: str) -> Iterator[FastAPI]:
    """App that mirrors production's middleware composition.

    Both ``SessionAuthMiddleware`` (when AUTH_MODE != none) and
    ``APIKeyAuthMiddleware`` (when ENABLE_API_KEY_AUTH=True) are wired in
    the same order as ``src/main.py``: API-key middleware added first
    (innermost), session middleware added second (outermost). A protected
    route at ``/api/v1/test/ping`` lets us assert end-to-end behaviour.
    """
    from pydantic import SecretStr
    from src.auth import factory
    from src.auth.middleware import SessionAuthMiddleware
    from src.auth.routes import router as auth_router
    from src.config import settings as global_settings
    from src.middleware.api_key_auth import APIKeyAuthMiddleware

    overrides: dict[str, Any] = {
        "auth_mode": "basic",
        "auth_secret_key": SecretStr("x" * 32),
        "auth_basic_users": SecretStr(basic_users_csv),
        "auth_cookie_secure": False,
        "auth_session_cookie_name": "reflow_session",
        "auth_session_ttl_seconds": 3600,
        "enable_api_key_auth": True,
        "api_keys": SecretStr("test-api-key-1,test-api-key-2"),
    }
    saved: dict[str, Any] = {}
    for key, value in overrides.items():
        saved[key] = getattr(global_settings, key)
        object.__setattr__(global_settings, key, value)
    factory.get_auth_provider.cache_clear()
    factory.get_auth_providers.cache_clear()
    factory.get_session_store.cache_clear()

    app = FastAPI()
    # Order matches src/main.py: APIKeyAuth added first โ†’ innermost.
    # SessionAuth added second โ†’ outermost โ†’ runs first โ†’ stamps identity โ†’
    # APIKeyAuth then short-circuits on it.
    app.add_middleware(APIKeyAuthMiddleware)
    app.add_middleware(SessionAuthMiddleware)
    app.include_router(auth_router)

    @app.get("/api/v1/test/ping")
    async def _ping(request: Request) -> dict[str, str | None]:
        identity = getattr(request.state, "identity", None)
        return {
            "user": identity.sub if isinstance(identity, Identity) else None,
        }

    try:
        yield app
    finally:
        _restore_settings(saved)


@pytest.fixture
def auth_full_client(auth_full_app: FastAPI) -> TestClient:
    return TestClient(auth_full_app)


@pytest.fixture
def auth_oidc_app() -> Iterator[FastAPI]:
    """App configured for AUTH_MODE=oidc with one mocked-IdP provider entry.

    Use respx in the test body to mock the IdP's discovery / JWKS / token
    endpoints. The provider's discovery_url is ``http://idp.example/.well-
    known/openid-configuration`` so respx can intercept it.
    """
    import json

    from pydantic import SecretStr
    from src.auth import factory
    from src.auth.middleware import SessionAuthMiddleware
    from src.auth.routes import router as auth_router
    from src.config import settings as global_settings

    providers = [
        {
            "id": "idp",
            "display_name": "Sign in with IdP",
            "discovery_url": "http://idp.example/.well-known/openid-configuration",
            "client_id": "client-abc",
            "client_secret": "client-secret-shh",
            "scopes": "openid email profile",
        }
    ]
    overrides: dict[str, Any] = {
        "auth_mode": "oidc",
        "auth_secret_key": SecretStr("x" * 32),
        "auth_oidc_providers": SecretStr(json.dumps(providers)),
        "auth_cookie_secure": False,
        "auth_session_cookie_name": "reflow_session",
        "auth_session_ttl_seconds": 3600,
    }
    saved: dict[str, Any] = {}
    for key, value in overrides.items():
        saved[key] = getattr(global_settings, key)
        object.__setattr__(global_settings, key, value)
    factory.get_auth_provider.cache_clear()
    factory.get_auth_providers.cache_clear()
    factory.get_session_store.cache_clear()

    app = FastAPI()
    app.add_middleware(SessionAuthMiddleware)
    app.include_router(auth_router)
    try:
        yield app
    finally:
        _restore_settings(saved)


@pytest.fixture
def auth_oidc_client(auth_oidc_app: FastAPI) -> TestClient:
    return TestClient(auth_oidc_app)