📦 EqualifyEverything / equalify-reflow

📄 test_concurrent_requests.py · 341 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341"""Integration tests for concurrent request handling and race conditions.

Tests scenarios using REAL Redis (via testcontainers):
- Multiple simultaneous PDF submissions
- Race conditions: double approval attempts
- Race conditions: duplicate processing attempts
- Concurrent status updates
- Token validation under concurrent load

These tests validate system behavior under concurrent access patterns
to ensure data consistency and prevent race conditions.

NOTE: These tests require Docker resources and should be run with limited
parallelism to avoid testcontainers conflicts. Use: pytest -n 2 tests/integration/workflows/
"""

import asyncio
import uuid
from datetime import UTC, datetime, timedelta

import pytest
from src.shared.constants.queues import PROCESSING_QUEUE
from src.shared.constants.statuses import (
    STATUS_AWAITING_APPROVAL,
    STATUS_COMPLETED,
    STATUS_DENIED,
    STATUS_PII_SCANNING,
    STATUS_PROCESSING,
)


@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.xdist_group(name="concurrent_integration")  # Limit parallelism for testcontainers
class TestConcurrentSubmissions:
    """Tests for concurrent PDF submissions using REAL Redis."""

    @pytest.mark.asyncio
    async def test_concurrent_pdf_submissions_unique_job_ids(self, job_service, queue_service):
        """Test that concurrent submissions create unique job IDs in REAL Redis."""
        # Create 10 jobs concurrently
        job_ids = [str(uuid.uuid4()) for _ in range(10)]
        s3_keys = [f"temp/{job_id}/test.pdf" for job_id in job_ids]

        # Submit all jobs concurrently to REAL Redis
        tasks = [
            job_service.create_job(job_id, s3_key, STATUS_PII_SCANNING) for job_id, s3_key in zip(job_ids, s3_keys)
        ]
        await asyncio.gather(*tasks)

        # Verify all jobs created in REAL Redis
        for job_id in job_ids:
            job = await job_service.get_job(job_id)
            assert job is not None
            assert job["job_id"] == job_id
            assert job["status"] == STATUS_PII_SCANNING

    # DELETED: test_concurrent_queue_enqueue_operations
    # This test is incompatible with live workers that consume queue items.
    # Queue concurrency is tested at unit level instead.

    @pytest.mark.asyncio
    async def test_concurrent_job_status_reads(self, job_service, sample_job_id):
        """Test that concurrent status reads don't cause issues in REAL Redis."""
        # Create a job in REAL Redis
        await job_service.create_job(sample_job_id, f"temp/{sample_job_id}/test.pdf", STATUS_PII_SCANNING)

        # Read status 50 times concurrently from REAL Redis
        tasks = [job_service.get_job(sample_job_id) for _ in range(50)]
        results = await asyncio.gather(*tasks)

        # Verify all reads succeeded
        assert len(results) == 50
        assert all(r is not None for r in results)
        assert all(r["job_id"] == sample_job_id for r in results)
        assert all(r["status"] == STATUS_PII_SCANNING for r in results)


@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.xdist_group(name="concurrent_integration")  # Limit parallelism for testcontainers
class TestRaceConditionDoubleApproval:
    """Tests for race conditions in approval workflow using REAL Redis."""

    @pytest.mark.asyncio
    async def test_double_approval_attempt_handled_safely(
        self, approval_service, job_service, queue_service, sample_job_id, sample_s3_key
    ):
        """Test that two concurrent approval attempts don't cause duplicate processing in REAL Redis."""
        # Setup: Job awaiting approval in REAL Redis
        approval_token = "test-approval-token-123"
        await job_service.create_job(sample_job_id, sample_s3_key, STATUS_AWAITING_APPROVAL)
        # Add approval token and expiration via update
        await job_service.update_job_status(
            sample_job_id,
            STATUS_AWAITING_APPROVAL,
            approval_token=approval_token,
            approval_expires_at=(datetime.now(UTC) + timedelta(hours=2)).isoformat(),
        )
        # Store token mapping for lookup
        await job_service.store_approval_token_mapping(approval_token, sample_job_id)

        # Simulate two concurrent approval attempts
        approval_tasks = [
            approval_service.process_approval_decision(
                sample_job_id, "approved", f"Approval attempt {i}", f"reviewer{i}@uic.edu"
            )
            for i in range(2)
        ]

        # Run concurrently and handle potential exceptions
        await asyncio.gather(*approval_tasks, return_exceptions=True)

        # Verify: Check processing queue depth in REAL Redis
        # Should have at most 1 entry (idempotent approval)
        processing_queue_depth = await queue_service.queue_depth(PROCESSING_QUEUE)
        assert processing_queue_depth <= 1

        # Verify: Job status in REAL Redis
        final_job = await job_service.get_job(sample_job_id)
        assert final_job["status"] == STATUS_PROCESSING

    @pytest.mark.asyncio
    async def test_approval_and_timeout_race_condition(
        self, approval_service, job_service, queue_service, sample_job_id, sample_s3_key
    ):
        """Test race condition: approval granted while timeout occurs in REAL Redis."""
        # Setup: Job awaiting approval, close to timeout
        approval_token = "timeout-race-token"
        await job_service.create_job(sample_job_id, sample_s3_key, STATUS_AWAITING_APPROVAL)
        # Add approval token and expiration via update
        await job_service.update_job_status(
            sample_job_id,
            STATUS_AWAITING_APPROVAL,
            approval_token=approval_token,
            approval_expires_at=(datetime.now(UTC) + timedelta(seconds=1)).isoformat(),
        )
        # Store token mapping for lookup
        await job_service.store_approval_token_mapping(approval_token, sample_job_id)

        # Simulate concurrent approval and timeout
        approval_task = approval_service.process_approval_decision(
            sample_job_id, "approved", "Last-minute approval", "reviewer@uic.edu"
        )

        # Simulate timeout worker (would normally set to failed/denied)
        timeout_task = job_service.update_job_status(sample_job_id, STATUS_DENIED, error_message="Approval timeout")

        # Run concurrently
        await asyncio.gather(approval_task, timeout_task, return_exceptions=True)

        # Verify: One of the status updates won in REAL Redis (last write wins)
        final_job = await job_service.get_job(sample_job_id)
        assert final_job["status"] in [STATUS_PROCESSING, STATUS_DENIED]


# DELETED: TestRaceConditionDuplicateProcessing class
# This test class is incompatible with live workers that consume queue items.
# Queue dequeue atomicity is guaranteed by Redis BRPOP and tested at unit level.


@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.xdist_group(name="concurrent_integration")  # Limit parallelism for testcontainers
class TestConcurrentStatusUpdates:
    """Tests for concurrent job status transitions in REAL Redis."""

    @pytest.mark.asyncio
    async def test_concurrent_status_updates_same_job(self, job_service, sample_job_id, sample_s3_key):
        """Test concurrent status updates to same job in REAL Redis (last write wins)."""
        # Create job in REAL Redis
        await job_service.create_job(sample_job_id, sample_s3_key, STATUS_PII_SCANNING)

        # Update status concurrently to different values
        statuses = [
            STATUS_PII_SCANNING,
            STATUS_PROCESSING,
            STATUS_AWAITING_APPROVAL,
            STATUS_PROCESSING,
            STATUS_COMPLETED,
        ]

        tasks = [job_service.update_job_status(sample_job_id, status) for status in statuses]

        await asyncio.gather(*tasks)

        # Verify: One of the statuses won in REAL Redis (last write wins)
        final_job = await job_service.get_job(sample_job_id)
        assert final_job["status"] in statuses

    @pytest.mark.asyncio
    async def test_concurrent_field_updates(self, job_service, sample_job_id, sample_s3_key):
        """Test concurrent field updates in REAL Redis (last write wins)."""
        # Create job in REAL Redis
        await job_service.create_job(sample_job_id, sample_s3_key, STATUS_PROCESSING)

        # Update with different field values concurrently
        tasks = [
            job_service.update_job_status(sample_job_id, STATUS_PROCESSING, confidence_score=0.95, attempt_number=i)
            for i in range(10)
        ]

        await asyncio.gather(*tasks)

        # Verify: Final job has one of the values in REAL Redis (last write wins)
        final_job = await job_service.get_job(sample_job_id)
        assert "attempt_number" in final_job
        assert 0 <= int(final_job["attempt_number"]) < 10
        assert final_job["confidence_score"] == "0.95"


@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.xdist_group(name="concurrent_integration")  # Limit parallelism for testcontainers
class TestConcurrentTokenValidation:
    """Tests for concurrent approval token validation using REAL Redis."""

    @pytest.mark.asyncio
    async def test_concurrent_token_validation_requests(
        self, approval_service, job_service, sample_job_id, sample_s3_key
    ):
        """Test that concurrent token validations work correctly with REAL Redis."""
        # Setup: Job with approval token in REAL Redis
        approval_token = "concurrent-test-token-789"

        await job_service.create_job(sample_job_id, sample_s3_key, STATUS_AWAITING_APPROVAL)
        # Add approval token and expiration via update
        await job_service.update_job_status(
            sample_job_id,
            STATUS_AWAITING_APPROVAL,
            approval_token=approval_token,
            approval_expires_at=(datetime.now(UTC) + timedelta(hours=2)).isoformat(),
        )
        # Store token mapping for lookup
        await job_service.store_approval_token_mapping(approval_token, sample_job_id)

        # Validate token 100 times concurrently (simulating multiple users)
        tasks = [approval_service.validate_approval_token(approval_token) for _ in range(100)]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # All validations should succeed from REAL Redis
        successful = [r for r in results if not isinstance(r, Exception) and r is not None]
        assert len(successful) == 100
        assert all(r["job_id"] == sample_job_id for r in successful)

    @pytest.mark.asyncio
    async def test_token_validation_during_status_change(
        self, approval_service, job_service, sample_job_id, sample_s3_key
    ):
        """Test token validation while job status is changing in REAL Redis."""
        approval_token = "status-change-token"

        # Create job in REAL Redis: awaiting approval
        await job_service.create_job(sample_job_id, sample_s3_key, STATUS_AWAITING_APPROVAL)
        # Add approval token and expiration via update
        await job_service.update_job_status(
            sample_job_id,
            STATUS_AWAITING_APPROVAL,
            approval_token=approval_token,
            approval_expires_at=(datetime.now(UTC) + timedelta(hours=2)).isoformat(),
        )
        # Store token mapping for lookup
        await job_service.store_approval_token_mapping(approval_token, sample_job_id)

        # Validate concurrently while changing status
        async def change_status():
            # Run a few validations first, then change status mid-flight
            await asyncio.gather(*[approval_service.validate_approval_token(approval_token) for _ in range(5)])
            await job_service.update_job_status(sample_job_id, STATUS_PROCESSING)

        validation_tasks = [
            approval_service.validate_approval_token(approval_token)
            for _ in range(45)  # Reduced since change_status does 5
        ]

        # Run validations and status change concurrently
        results = await asyncio.gather(*validation_tasks, change_status(), return_exceptions=True)

        # Most validations should succeed (some may see transitional state)
        successful_validations = [
            r for r in results if not isinstance(r, Exception) and r is not None and isinstance(r, dict)
        ]
        assert len(successful_validations) >= 25  # At least half should succeed


@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.xdist_group(name="concurrent_integration")  # Limit parallelism for testcontainers
class TestHighConcurrency:
    """Tests for high concurrency scenarios using REAL Redis."""

    @pytest.mark.asyncio
    async def test_high_concurrency_job_creation(self, job_service, queue_service):
        """Test system handles high concurrency (100 simultaneous operations) in REAL Redis."""
        # Create 100 jobs concurrently
        job_data = [(str(uuid.uuid4()), f"temp/{uuid.uuid4()}/test{i}.pdf") for i in range(100)]

        tasks = [job_service.create_job(job_id, s3_key, STATUS_PII_SCANNING) for job_id, s3_key in job_data]

        # Should complete without errors
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Verify all succeeded in REAL Redis
        errors = [r for r in results if isinstance(r, Exception)]
        assert len(errors) == 0

        # Verify all jobs exist in REAL Redis
        for job_id, _ in job_data:
            job = await job_service.get_job(job_id)
            assert job is not None
            assert job["job_id"] == job_id

    @pytest.mark.asyncio
    @pytest.mark.slow
    async def test_sustained_concurrent_load(self, job_service, queue_service):
        """Test system stability under sustained concurrent load in REAL Redis."""
        # Simulate sustained load: 50 operations over 5 batches
        total_operations = 0
        all_job_ids = []

        for batch in range(5):
            job_ids = [str(uuid.uuid4()) for _ in range(50)]
            s3_keys = [f"temp/{job_id}/batch{batch}.pdf" for job_id in job_ids]
            all_job_ids.extend(job_ids)

            tasks = [
                job_service.create_job(job_id, s3_key, STATUS_PII_SCANNING) for job_id, s3_key in zip(job_ids, s3_keys)
            ]

            await asyncio.gather(*tasks, return_exceptions=True)
            total_operations += 50

        # Verify total operations attempted
        assert total_operations == 250

        # Verify all jobs exist in REAL Redis
        for job_id in all_job_ids:
            job = await job_service.get_job(job_id)
            assert job is not None