๐Ÿ“ฆ EqualifyEverything / equalify-reflow

๐Ÿ“„ session_store.py ยท 134 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134"""In-memory session store for pipeline SSE streaming sessions.

Each session holds the pipeline result and event buffer for SSE
reconnection support.  Sessions expire after a configurable TTL
(default 1 hour).

This is appropriate for the Pipeline Viewer dev tool.  Production usage
would swap for Redis-backed persistence.
"""

from __future__ import annotations

import asyncio
import logging
import time
import uuid
from dataclasses import dataclass, field

from .pipeline_viewer_models import PipelineViewerResult

logger = logging.getLogger(__name__)

SESSION_TTL_SECONDS = 3600  # 1 hour


@dataclass
class PipelineSession:
    """State for a single streaming session."""

    session_id: str
    result: PipelineViewerResult
    created_at: float = field(default_factory=time.time)
    last_accessed: float = field(default_factory=time.time)
    # SSE reconnect support
    event_buffer: list[str] = field(default_factory=list)
    status: str = "processing"  # "processing" | "completed" | "error"
    event_counter: int = 0
    # Push notification for buffer readers
    new_event: asyncio.Event = field(default_factory=asyncio.Event)
    # Background pipeline task reference
    pipeline_task: asyncio.Task | None = None
    # PII review gate โ€” pipeline awaits this event when findings require a
    # human decision. `pii_decision` holds "approved" or "denied" once set.
    pii_decision_event: asyncio.Event = field(default_factory=asyncio.Event)
    pii_decision: str | None = None
    # Identity attribution โ€” populated when AUTH_MODE != 'none' and the
    # request that created this session carried a session cookie. Both
    # remain None when auth is off, preserving today's anonymous flow.
    identity_sub: str | None = None
    provider_id: str | None = None

    def touch(self) -> None:
        """Update the last-accessed timestamp."""
        self.last_accessed = time.time()

    @property
    def is_expired(self) -> bool:
        return (time.time() - self.last_accessed) > SESSION_TTL_SECONDS


class SessionStore:
    """In-memory store for pipeline sessions.

    Safe for single-threaded asyncio usage.  Not thread-safe โ€” do not
    access from sync background threads without external locking.
    """

    def __init__(self) -> None:
        self._sessions: dict[str, PipelineSession] = {}

    def create_for_stream(
        self,
        filename: str,
        *,
        identity_sub: str | None = None,
        provider_id: str | None = None,
    ) -> PipelineSession:
        """Create a session early for SSE reconnect support.

        The session starts with an empty result and status="processing".
        The caller populates the result as processing completes.

        ``identity_sub`` and ``provider_id`` are passed through from the
        request's authenticated identity when ``AUTH_MODE != 'none'``.
        """
        self._evict_expired()
        session_id = uuid.uuid4().hex[:12]
        session = PipelineSession(
            session_id=session_id,
            result=PipelineViewerResult(filename=filename, total_pages=0),
            status="processing",
            identity_sub=identity_sub,
            provider_id=provider_id,
        )
        self._sessions[session_id] = session
        return session

    def get(self, session_id: str) -> PipelineSession | None:
        """Retrieve a session by ID, or None if not found / expired."""
        self._evict_expired()
        session = self._sessions.get(session_id)
        if session is None:
            return None
        if session.is_expired:
            del self._sessions[session_id]
            return None
        session.touch()
        return session

    def delete(self, session_id: str) -> bool:
        """Delete a session. Returns True if it existed."""
        return self._sessions.pop(session_id, None) is not None

    def _evict_expired(self) -> None:
        """Remove all expired sessions and cancel their pipeline tasks."""
        expired = [
            sid for sid, s in self._sessions.items() if s.is_expired
        ]
        for sid in expired:
            session = self._sessions.pop(sid)
            if session.pipeline_task and not session.pipeline_task.done():
                session.pipeline_task.cancel()
                logger.debug("Cancelled pipeline task for expired session %s", sid)

    @property
    def count(self) -> int:
        """Number of active (non-expired) sessions."""
        self._evict_expired()
        return len(self._sessions)


# Module-level singleton
session_store = SessionStore()