📦 EqualifyEverything / equalify-reflow

📄 audit.py · 62 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62"""Structured ``auth_event`` log emitter.

Operators query CloudWatch Logs Insights / Logfire by ``category="auth_event"``
to get a clean login/logout timeline without standing up a separate audit
store. Failure reasons are categorical (``invalid_password``,
``unknown_user``, ``provider_error``) — never the username attempted, never
the password hash, never PII.
"""

from __future__ import annotations

import logging
from typing import Literal

logger = logging.getLogger(__name__)

AuthEvent = Literal[
    "login_success",
    "login_failure",
    "logout",
    "session_expired",
    "session_invalid",
]

FailureReason = Literal[
    "unknown_user",
    "invalid_password",
    "csrf_mismatch",
    "provider_error",
    "missing_credentials",
    "rate_limited",
]


def emit(
    event: AuthEvent,
    *,
    provider_id: str,
    sub: str | None = None,
    reason: FailureReason | None = None,
    client_ip: str | None = None,
) -> None:
    """Emit a structured auth event.

    Uses ``logger.info`` for success/logout and ``logger.warning`` for
    failures so default log filters don't drop signal-of-attack events.
    The ``extra`` dict carries the structured fields; Logfire and OTel
    pick them up automatically when enabled.
    """
    extra = {
        "category": "auth_event",
        "auth_event": event,
        "provider_id": provider_id,
        "sub": sub,
        "reason": reason,
        "client_ip": client_ip,
    }
    if event in ("login_failure", "session_invalid"):
        logger.warning("auth_event: %s (%s)", event, reason or "unspecified", extra=extra)
    else:
        logger.info("auth_event: %s", event, extra=extra)