📦 EqualifyEverything / equalify-reflow

📄 test_redis_integration.py · 249 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249"""Tests for Redis integration and key generation."""

from datetime import UTC, datetime

import pytest

from shared.models import (
    APPROVAL_QUEUE,
    APPROVAL_TIMEOUTS,
    DAILY_METRICS,
    JOB_STATUS_PREFIX,
    PII_QUEUE,
    PROCESSING_QUEUE,
    JobStatus,
    JobSubmission,
    job_status_key,
    metrics_key,
    queue_key,
    timeout_key,
)


@pytest.mark.unit
@pytest.mark.requires_redis

class TestRedisKeyGeneration:
    """Test Redis key generation functions."""

    def test_job_status_key_format(self):
        """Test job status key follows correct format."""
        job_id = "550e8400-e29b-41d4-a716-446655440000"
        key = job_status_key(job_id)

        assert key == f"eq-pdf:job:{job_id}"
        assert key.startswith("eq-pdf:job:")

    def test_queue_key_formats(self):
        """Test queue key generation for all queue types."""
        pii_key = queue_key("pii")
        approval_key = queue_key("approval")
        processing_key = queue_key("processing")

        assert pii_key == "eq-pdf:queue:pii"
        assert approval_key == "eq-pdf:queue:approval"
        assert processing_key == "eq-pdf:queue:processing"

    def test_timeout_key_format(self):
        """Test timeout key generation."""
        key = timeout_key("approval")
        assert key == "eq-pdf:timeouts:approval"

    def test_metrics_key_format(self):
        """Test metrics key generation."""
        key = metrics_key("daily")
        assert key == "eq-pdf:metrics:daily"

    def test_key_prefix_consistency(self):
        """Test all keys use eq-pdf prefix."""
        job_id = "550e8400-e29b-41d4-a716-446655440000"

        keys = [
            job_status_key(job_id),
            queue_key("pii"),
            timeout_key("approval"),
            metrics_key("daily")
        ]

        for key in keys:
            assert key.startswith("eq-pdf:")


class TestRedisKeyConstants:
    """Test predefined Redis key constants."""

    def test_queue_constants(self):
        """Test queue name constants."""
        assert PII_QUEUE == "eq-pdf:queue:pii"
        assert APPROVAL_QUEUE == "eq-pdf:queue:approval"
        assert PROCESSING_QUEUE == "eq-pdf:queue:processing"

    def test_timeout_constants(self):
        """Test timeout key constants."""
        assert APPROVAL_TIMEOUTS == "eq-pdf:timeouts:approval"

    def test_metrics_constants(self):
        """Test metrics key constants."""
        assert DAILY_METRICS == "eq-pdf:metrics:daily"

    def test_job_status_prefix(self):
        """Test job status key prefix."""
        assert JOB_STATUS_PREFIX == "eq-pdf:job:"

        # Verify prefix works with job IDs
        job_id = "550e8400-e29b-41d4-a716-446655440000"
        full_key = f"{JOB_STATUS_PREFIX}{job_id}"
        assert full_key == job_status_key(job_id)


class TestModelRedisCompatibility:
    """Test that models serialize properly for Redis storage."""

    def test_job_submission_redis_serialization(self):
        """Test JobSubmission model serializes for Redis."""
        submission = JobSubmission(
            job_id="550e8400-e29b-41d4-a716-446655440000",
            s3_key="temp/550e8400-e29b-41d4-a716-446655440000/test.pdf",
            created_at=datetime.now(UTC),
            file_size_bytes=1024000,
            original_filename="test.pdf"
        )

        # Serialize to JSON for Redis storage
        json_data = submission.model_dump_json()
        assert isinstance(json_data, str)

        # Deserialize from Redis
        restored = JobSubmission.model_validate_json(json_data)
        assert restored.job_id == submission.job_id
        assert restored.s3_key == submission.s3_key

    def test_job_status_redis_serialization(self):
        """Test JobStatus model serializes for Redis."""
        now = datetime.now(UTC)
        status = JobStatus(
            job_id="550e8400-e29b-41d4-a716-446655440000",
            status="processing",
            created_at=now,
            updated_at=now,
            markdown_url="https://s3.amazonaws.com/output.md",
            confidence_score=0.87
        )

        # Serialize to JSON for Redis storage
        json_data = status.model_dump_json()
        assert isinstance(json_data, str)

        # Deserialize from Redis
        restored = JobStatus.model_validate_json(json_data)
        assert restored.status == "processing"
        assert restored.confidence_score == 0.87

    def test_datetime_serialization(self):
        """Test datetime fields serialize correctly."""
        now = datetime(2024, 1, 15, 10, 30, 0)
        status = JobStatus(
            job_id="550e8400-e29b-41d4-a716-446655440000",
            status="completed",
            created_at=now,
            updated_at=now
        )

        json_data = status.model_dump_json()
        restored = JobStatus.model_validate_json(json_data)

        # Datetime should round-trip correctly
        assert restored.created_at == now
        assert restored.updated_at == now

    def test_optional_fields_serialization(self):
        """Test optional fields serialize correctly as None."""
        now = datetime.now(UTC)
        status = JobStatus(
            job_id="550e8400-e29b-41d4-a716-446655440000",
            status="pii_scanning",
            created_at=now,
            updated_at=now
        )

        json_data = status.model_dump_json()
        restored = JobStatus.model_validate_json(json_data)

        # Optional fields should be None
        assert restored.pii_findings is None
        assert restored.approval_token is None
        assert restored.error_message is None

    def test_nested_model_serialization(self):
        """Test nested models serialize correctly."""
        from shared.models import PIIFinding

        now = datetime.now(UTC)
        findings = [
            PIIFinding(
                entity_type="PERSON",
                start=10,
                end=20,
                score=0.95,
                text="John Doe"
            )
        ]

        status = JobStatus(
            job_id="550e8400-e29b-41d4-a716-446655440000",
            status="awaiting_approval",
            created_at=now,
            updated_at=now,
            pii_findings=findings
        )

        json_data = status.model_dump_json()
        restored = JobStatus.model_validate_json(json_data)

        assert len(restored.pii_findings) == 1
        assert restored.pii_findings[0].text == "John Doe"


class TestRedisKeyNamespacing:
    """Test Redis key namespacing prevents collisions."""

    def test_different_job_ids_unique_keys(self):
        """Test different job IDs generate unique keys."""
        job_id_1 = "550e8400-e29b-41d4-a716-446655440000"
        job_id_2 = "660e8400-e29b-41d4-a716-446655440001"

        key_1 = job_status_key(job_id_1)
        key_2 = job_status_key(job_id_2)

        assert key_1 != key_2
        assert job_id_1 in key_1
        assert job_id_2 in key_2

    def test_queue_keys_unique(self):
        """Test queue keys are unique."""
        queues = [
            queue_key("pii"),
            queue_key("approval"),
            queue_key("processing")
        ]

        # All keys should be unique
        assert len(queues) == len(set(queues))

    def test_key_patterns_for_scanning(self):
        """Test key patterns work for Redis SCAN operations."""
        # Pattern to find all job status keys

        # Generate some example keys
        job_ids = [
            "550e8400-e29b-41d4-a716-446655440000",
            "660e8400-e29b-41d4-a716-446655440001",
            "770e8400-e29b-41d4-a716-446655440002"
        ]

        keys = [job_status_key(job_id) for job_id in job_ids]

        # All keys should match the pattern
        for key in keys:
            assert key.startswith(JOB_STATUS_PREFIX)