๐Ÿ“ฆ EqualifyEverything / equalify-reflow

๐Ÿ“„ events.py ยท 91 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91"""Event bus registry for job streaming.

Provides an EventBus for publishing pipeline events to SSE subscribers,
plus Pydantic event models for the document stream contract.
"""

from __future__ import annotations

import asyncio

from pydantic import BaseModel


# โ”€โ”€ Event models โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€


class PipelinePhaseEvent(BaseModel):
    """Emitted when a pipeline phase starts."""

    event_type: str = "pipeline:phase"
    phase: str
    display_name: str
    step_number: int
    total_steps: int


class ProcessingCompleteEvent(BaseModel):
    """Emitted when processing finishes successfully."""

    event_type: str = "processing:complete"


class ProcessingErrorEvent(BaseModel):
    """Emitted when processing fails."""

    event_type: str = "processing:error"
    error: str


# โ”€โ”€ EventBus โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€


class EventBus:
    """Publish-subscribe event bus backed by asyncio queues.

    Events are stored in `.events` for replay to late-connecting subscribers.
    """

    def __init__(self) -> None:
        self.events: list[BaseModel] = []
        self._subscribers: list[asyncio.Queue[BaseModel]] = []

    def publish(self, event: BaseModel) -> None:
        """Store event and push to all subscribers."""
        self.events.append(event)
        for queue in self._subscribers:
            queue.put_nowait(event)

    def subscribe(self) -> asyncio.Queue[BaseModel]:
        """Create and return a new subscriber queue."""
        queue: asyncio.Queue[BaseModel] = asyncio.Queue()
        self._subscribers.append(queue)
        return queue

    def unsubscribe(self, queue: asyncio.Queue[BaseModel]) -> None:
        """Remove a subscriber queue."""
        try:
            self._subscribers.remove(queue)
        except ValueError:
            pass


# โ”€โ”€ Global registry (job_id โ†’ EventBus) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

_event_registry: dict[str, EventBus] = {}


def register_event_bus(job_id: str, event_bus: EventBus) -> None:
    """Register an event bus for a job."""
    _event_registry[job_id] = event_bus


def get_event_bus(job_id: str) -> EventBus | None:
    """Get the event bus for a job (returns None if not registered)."""
    return _event_registry.get(job_id)


def unregister_event_bus(job_id: str) -> None:
    """Unregister an event bus for a job."""
    _event_registry.pop(job_id, None)