📦 EqualifyEverything / equalify-reflow

📄 basic_provider.py · 129 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129"""HTTP basic auth provider — operator-provisioned via env.

``AUTH_BASIC_USERS`` is a **semicolon-separated** list of
``username:argon2hash`` pairs. Hashes are produced with
``make auth-hash-password``. There is deliberately no signup endpoint, no
user database, and no admin UI — adding or removing a user means editing
env and restarting. This matches how API keys are managed today and keeps
a locked-down deployment locked down.

Why semicolon and not comma? Argon2id hashes embed a comma-separated
parameter block (``m=…,t=…,p=…``) in the middle of the hash string. A
comma-delimited CSV would split that block and corrupt every entry. Argon2
hashes do not contain ``;`` so semicolon is unambiguous.
"""

from __future__ import annotations

import logging
from dataclasses import dataclass

from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from fastapi import Request

from ..base import Identity
from ..session import make_identity

logger = logging.getLogger(__name__)


@dataclass(frozen=True, slots=True)
class _BasicUser:
    username: str
    password_hash: str


def _parse_users(raw: str) -> dict[str, _BasicUser]:
    """Parse the env-supplied user list into a username→user map.

    Entries are separated by ``;`` (not comma — argon2 parameter blocks
    contain commas). We split on only the first colon per entry so the
    full ``$argon2…`` hash on the right is preserved.
    """
    users: dict[str, _BasicUser] = {}
    for entry in raw.split(";"):
        entry = entry.strip()
        if not entry:
            continue
        if ":" not in entry:
            logger.warning("AUTH_BASIC_USERS entry missing ':' separator; skipping")
            continue
        username, _, password_hash = entry.partition(":")
        username = username.strip()
        password_hash = password_hash.strip()
        if not username or not password_hash.startswith("$argon2"):
            logger.warning("AUTH_BASIC_USERS entry rejected: malformed username or hash")
            continue
        users[username] = _BasicUser(username=username, password_hash=password_hash)
    return users


class InvalidCredentialsError(Exception):
    """Raised when login should fail. Caller maps to a 401 with an opaque
    error so we never leak whether the username exists.
    """


class BasicAuthProvider:
    id: str = "basic"
    display_name: str = "Sign in"

    def __init__(self, users_csv: str, session_ttl_seconds: int) -> None:
        self._users = _parse_users(users_csv)
        self._hasher = PasswordHasher()
        self._session_ttl = session_ttl_seconds
        if not self._users:
            logger.warning("BasicAuthProvider initialised with zero users — no login can succeed")

    async def login_url(self, *, request: Request, next_path: str) -> str:
        # Basic mode posts directly to /api/v1/auth/login; the SPA never needs
        # to hit a provider-specific URL. Returning the SPA login route keeps
        # the /auth/config response uniform for the UI.
        return "/login"

    async def handle_callback(self, request: Request) -> Identity:
        raise NotImplementedError("BasicAuthProvider has no redirect-back flow")

    async def logout_url(self, identity: Identity) -> str | None:
        return None

    def authenticate(self, *, username: str, password: str) -> Identity:
        """Verify the password and return a fresh :class:`Identity`.

        Raises :class:`InvalidCredentialsError` for both unknown users and wrong
        passwords. The argon2 verify is constant-time per its library
        contract; we run it against a dummy hash on unknown-user to keep
        timing identical between the two failure modes.
        """
        user = self._users.get(username)
        if user is None:
            # Run a verify against a known-bad reference hash so timing for
            # unknown-user matches wrong-password. argon2-cffi raises on
            # mismatch — we ignore the exception, only the timing matters.
            try:
                self._hasher.verify(_DUMMY_HASH, password)
            except Exception:
                pass
            raise InvalidCredentialsError("invalid credentials")

        try:
            self._hasher.verify(user.password_hash, password)
        except VerifyMismatchError as exc:
            raise InvalidCredentialsError("invalid credentials") from exc

        return make_identity(
            sub=user.username,
            provider_id=self.id,
            ttl_seconds=self._session_ttl,
            email=None,
            name=user.username,
        )


# Pre-computed argon2id hash of the literal string ``__never_a_real_password__``.
# Used solely to keep authenticate() timing constant for unknown users. The
# value isn't sensitive — it intentionally hashes a placeholder. We compute it
# at import time so we don't pay the cost on every unknown-user request.
_DUMMY_HASH = PasswordHasher().hash("__never_a_real_password__")