1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91"""Event bus registry for job streaming.
Provides an EventBus for publishing pipeline events to SSE subscribers,
plus Pydantic event models for the document stream contract.
"""
from __future__ import annotations
import asyncio
from pydantic import BaseModel
# โโ Event models โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
class PipelinePhaseEvent(BaseModel):
"""Emitted when a pipeline phase starts."""
event_type: str = "pipeline:phase"
phase: str
display_name: str
step_number: int
total_steps: int
class ProcessingCompleteEvent(BaseModel):
"""Emitted when processing finishes successfully."""
event_type: str = "processing:complete"
class ProcessingErrorEvent(BaseModel):
"""Emitted when processing fails."""
event_type: str = "processing:error"
error: str
# โโ EventBus โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
class EventBus:
"""Publish-subscribe event bus backed by asyncio queues.
Events are stored in `.events` for replay to late-connecting subscribers.
"""
def __init__(self) -> None:
self.events: list[BaseModel] = []
self._subscribers: list[asyncio.Queue[BaseModel]] = []
def publish(self, event: BaseModel) -> None:
"""Store event and push to all subscribers."""
self.events.append(event)
for queue in self._subscribers:
queue.put_nowait(event)
def subscribe(self) -> asyncio.Queue[BaseModel]:
"""Create and return a new subscriber queue."""
queue: asyncio.Queue[BaseModel] = asyncio.Queue()
self._subscribers.append(queue)
return queue
def unsubscribe(self, queue: asyncio.Queue[BaseModel]) -> None:
"""Remove a subscriber queue."""
try:
self._subscribers.remove(queue)
except ValueError:
pass
# โโ Global registry (job_id โ EventBus) โโโโโโโโโโโโโโโโโโโโโโโโ
_event_registry: dict[str, EventBus] = {}
def register_event_bus(job_id: str, event_bus: EventBus) -> None:
"""Register an event bus for a job."""
_event_registry[job_id] = event_bus
def get_event_bus(job_id: str) -> EventBus | None:
"""Get the event bus for a job (returns None if not registered)."""
return _event_registry.get(job_id)
def unregister_event_bus(job_id: str) -> None:
"""Unregister an event bus for a job."""
_event_registry.pop(job_id, None)