1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249"""Tests for Redis integration and key generation."""
from datetime import UTC, datetime
import pytest
from shared.models import (
APPROVAL_QUEUE,
APPROVAL_TIMEOUTS,
DAILY_METRICS,
JOB_STATUS_PREFIX,
PII_QUEUE,
PROCESSING_QUEUE,
JobStatus,
JobSubmission,
job_status_key,
metrics_key,
queue_key,
timeout_key,
)
@pytest.mark.unit
@pytest.mark.requires_redis
class TestRedisKeyGeneration:
"""Test Redis key generation functions."""
def test_job_status_key_format(self):
"""Test job status key follows correct format."""
job_id = "550e8400-e29b-41d4-a716-446655440000"
key = job_status_key(job_id)
assert key == f"eq-pdf:job:{job_id}"
assert key.startswith("eq-pdf:job:")
def test_queue_key_formats(self):
"""Test queue key generation for all queue types."""
pii_key = queue_key("pii")
approval_key = queue_key("approval")
processing_key = queue_key("processing")
assert pii_key == "eq-pdf:queue:pii"
assert approval_key == "eq-pdf:queue:approval"
assert processing_key == "eq-pdf:queue:processing"
def test_timeout_key_format(self):
"""Test timeout key generation."""
key = timeout_key("approval")
assert key == "eq-pdf:timeouts:approval"
def test_metrics_key_format(self):
"""Test metrics key generation."""
key = metrics_key("daily")
assert key == "eq-pdf:metrics:daily"
def test_key_prefix_consistency(self):
"""Test all keys use eq-pdf prefix."""
job_id = "550e8400-e29b-41d4-a716-446655440000"
keys = [
job_status_key(job_id),
queue_key("pii"),
timeout_key("approval"),
metrics_key("daily")
]
for key in keys:
assert key.startswith("eq-pdf:")
class TestRedisKeyConstants:
"""Test predefined Redis key constants."""
def test_queue_constants(self):
"""Test queue name constants."""
assert PII_QUEUE == "eq-pdf:queue:pii"
assert APPROVAL_QUEUE == "eq-pdf:queue:approval"
assert PROCESSING_QUEUE == "eq-pdf:queue:processing"
def test_timeout_constants(self):
"""Test timeout key constants."""
assert APPROVAL_TIMEOUTS == "eq-pdf:timeouts:approval"
def test_metrics_constants(self):
"""Test metrics key constants."""
assert DAILY_METRICS == "eq-pdf:metrics:daily"
def test_job_status_prefix(self):
"""Test job status key prefix."""
assert JOB_STATUS_PREFIX == "eq-pdf:job:"
# Verify prefix works with job IDs
job_id = "550e8400-e29b-41d4-a716-446655440000"
full_key = f"{JOB_STATUS_PREFIX}{job_id}"
assert full_key == job_status_key(job_id)
class TestModelRedisCompatibility:
"""Test that models serialize properly for Redis storage."""
def test_job_submission_redis_serialization(self):
"""Test JobSubmission model serializes for Redis."""
submission = JobSubmission(
job_id="550e8400-e29b-41d4-a716-446655440000",
s3_key="temp/550e8400-e29b-41d4-a716-446655440000/test.pdf",
created_at=datetime.now(UTC),
file_size_bytes=1024000,
original_filename="test.pdf"
)
# Serialize to JSON for Redis storage
json_data = submission.model_dump_json()
assert isinstance(json_data, str)
# Deserialize from Redis
restored = JobSubmission.model_validate_json(json_data)
assert restored.job_id == submission.job_id
assert restored.s3_key == submission.s3_key
def test_job_status_redis_serialization(self):
"""Test JobStatus model serializes for Redis."""
now = datetime.now(UTC)
status = JobStatus(
job_id="550e8400-e29b-41d4-a716-446655440000",
status="processing",
created_at=now,
updated_at=now,
markdown_url="https://s3.amazonaws.com/output.md",
confidence_score=0.87
)
# Serialize to JSON for Redis storage
json_data = status.model_dump_json()
assert isinstance(json_data, str)
# Deserialize from Redis
restored = JobStatus.model_validate_json(json_data)
assert restored.status == "processing"
assert restored.confidence_score == 0.87
def test_datetime_serialization(self):
"""Test datetime fields serialize correctly."""
now = datetime(2024, 1, 15, 10, 30, 0)
status = JobStatus(
job_id="550e8400-e29b-41d4-a716-446655440000",
status="completed",
created_at=now,
updated_at=now
)
json_data = status.model_dump_json()
restored = JobStatus.model_validate_json(json_data)
# Datetime should round-trip correctly
assert restored.created_at == now
assert restored.updated_at == now
def test_optional_fields_serialization(self):
"""Test optional fields serialize correctly as None."""
now = datetime.now(UTC)
status = JobStatus(
job_id="550e8400-e29b-41d4-a716-446655440000",
status="pii_scanning",
created_at=now,
updated_at=now
)
json_data = status.model_dump_json()
restored = JobStatus.model_validate_json(json_data)
# Optional fields should be None
assert restored.pii_findings is None
assert restored.approval_token is None
assert restored.error_message is None
def test_nested_model_serialization(self):
"""Test nested models serialize correctly."""
from shared.models import PIIFinding
now = datetime.now(UTC)
findings = [
PIIFinding(
entity_type="PERSON",
start=10,
end=20,
score=0.95,
text="John Doe"
)
]
status = JobStatus(
job_id="550e8400-e29b-41d4-a716-446655440000",
status="awaiting_approval",
created_at=now,
updated_at=now,
pii_findings=findings
)
json_data = status.model_dump_json()
restored = JobStatus.model_validate_json(json_data)
assert len(restored.pii_findings) == 1
assert restored.pii_findings[0].text == "John Doe"
class TestRedisKeyNamespacing:
"""Test Redis key namespacing prevents collisions."""
def test_different_job_ids_unique_keys(self):
"""Test different job IDs generate unique keys."""
job_id_1 = "550e8400-e29b-41d4-a716-446655440000"
job_id_2 = "660e8400-e29b-41d4-a716-446655440001"
key_1 = job_status_key(job_id_1)
key_2 = job_status_key(job_id_2)
assert key_1 != key_2
assert job_id_1 in key_1
assert job_id_2 in key_2
def test_queue_keys_unique(self):
"""Test queue keys are unique."""
queues = [
queue_key("pii"),
queue_key("approval"),
queue_key("processing")
]
# All keys should be unique
assert len(queues) == len(set(queues))
def test_key_patterns_for_scanning(self):
"""Test key patterns work for Redis SCAN operations."""
# Pattern to find all job status keys
# Generate some example keys
job_ids = [
"550e8400-e29b-41d4-a716-446655440000",
"660e8400-e29b-41d4-a716-446655440001",
"770e8400-e29b-41d4-a716-446655440002"
]
keys = [job_status_key(job_id) for job_id in job_ids]
# All keys should match the pattern
for key in keys:
assert key.startswith(JOB_STATUS_PREFIX)