📦 EqualifyEverything / equalify-reflow

📄 test_resource_management.py · 373 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373"""Tests for resource management in dependency injection and workers.

This test suite verifies that:
1. Redis and S3 clients are properly created and cleaned up
2. Service dependency functions enforce proper client management
3. Workers initialize resources correctly without leaks
4. Multiple worker start/stop cycles don't leak connections
"""

import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from redis.asyncio import Redis
from src.dependencies import (
    get_redis_client,
    get_s3_client,
)
from src.services.job_service import JobService
from src.services.queue_service import QueueService
from src.services.s3_url_service import S3URLService
from src.services.storage_service import StorageService


class TestClientLifecycle:
    """Tests for Redis and S3 client lifecycle management."""

    @pytest.mark.asyncio
    async def test_redis_client_cleanup(self):
        """Test that Redis client is properly closed after use."""
        client = None
        async for c in get_redis_client():
            client = c
            assert client is not None
            assert isinstance(client, Redis)
            # Client should be open during use
            # Verify it works
            try:
                await client.ping()
            except Exception:
                # Redis might not be running in test context
                pass
            break

        # After exiting async generator, client should be closed
        # The client's connection pool is closed, making it unusable
        # We can verify the client object exists but won't test operations
        # since they might fail for reasons other than being closed
        assert client is not None

    @pytest.mark.asyncio
    async def test_redis_client_cleanup_on_exception(self):
        """Test that Redis client generator handles exceptions in finally block."""
        exception_raised = False

        try:
            async for client in get_redis_client():
                # Simulate exception during usage
                exception_raised = True
                raise RuntimeError("Simulated error")
        except RuntimeError:
            pass

        # Verify exception was raised (generator handled it properly)
        assert exception_raised

    @pytest.mark.asyncio
    async def test_s3_client_creation(self):
        """Test that S3 client is properly created."""
        client = None
        async for c in get_s3_client():
            client = c
            assert client is not None
            # S3 client should have expected methods
            assert hasattr(client, 'put_object')
            assert hasattr(client, 'get_object')
            break

    @pytest.mark.asyncio
    async def test_multiple_redis_clients_independent(self):
        """Test that multiple Redis clients are independent."""
        client1 = await anext(get_redis_client())
        client2 = await anext(get_redis_client())

        # Should be different client instances
        assert client1 is not client2


class TestServiceDependencyInjection:
    """Tests for service dependency injection patterns."""

    @pytest.mark.asyncio
    async def test_services_use_dependency_injection(self):
        """Test that services are created with auto-injected clients via Depends()."""
        # Services should use Depends() for auto-injection in FastAPI context
        # The dependencies use sub-dependencies that automatically provide clients
        # This test verifies the pattern is correctly set up
        redis_client = await anext(get_redis_client())
        s3_client = await anext(get_s3_client())

        # Create services directly (as workers do)
        queue_service = QueueService(redis_client=redis_client)
        job_service = JobService(redis_client=redis_client)
        storage_service = StorageService(
            s3_client=s3_client,
            temp_bucket="test-temp",
            results_bucket="test-results"
        )

        assert isinstance(queue_service, QueueService)
        assert isinstance(job_service, JobService)
        assert isinstance(storage_service, StorageService)


class TestWorkerResourceManagement:
    """Tests for worker resource management patterns."""

    @pytest.mark.asyncio
    @pytest.mark.timeout(10)  # Worker startup and shutdown can take time
    async def test_pii_worker_initialization_pattern(self):
        """Test that PII worker uses correct initialization pattern."""
        from src.workers.pii_worker import start_pii_worker

        with patch('src.dependencies.get_redis_client') as mock_redis_gen, \
             patch('src.dependencies.get_s3_client') as mock_s3_gen:

            # Create mock clients
            mock_redis = AsyncMock()
            mock_s3 = MagicMock()

            # Mock async generators
            async def redis_gen():
                yield mock_redis

            async def s3_gen():
                yield mock_s3

            mock_redis_gen.return_value = redis_gen()
            mock_s3_gen.return_value = s3_gen()

            # Start worker in background (it will run indefinitely)
            worker_task = asyncio.create_task(start_pii_worker())

            # Wait for worker to be ready (with timeout)
            from src.workers.pii_worker import get_pii_worker
            worker = None
            for _ in range(50):  # 50 * 0.01s = 0.5s max wait
                worker = get_pii_worker()
                if worker and worker.running:
                    break
                await asyncio.sleep(0.01)

            # Stop worker
            if worker:
                worker.stop()

            # Wait for worker to finish
            try:
                await asyncio.wait_for(worker_task, timeout=2.0)
            except TimeoutError:
                worker_task.cancel()

            # Verify clients were created using anext pattern
            # (mock_redis_gen and mock_s3_gen were called)
            mock_redis_gen.assert_called_once()
            mock_s3_gen.assert_called_once()

    # Note: test_processing_worker_initialization_pattern was removed
    # because ProcessingWorker has been deleted as part of the agentic pipeline refactor.
    # Processing is now triggered directly via ProcessingService, not through a queue worker.


class TestConnectionPoolManagement:
    """Tests for connection pool tracking and leak detection."""

    @pytest.mark.asyncio
    async def test_redis_connection_pool_cleanup(self):
        """Test that Redis connection pool can be created and cleaned up multiple times."""
        # Create and cleanup multiple clients
        clients = []
        for _ in range(3):
            async for client in get_redis_client():
                clients.append(client)
                # Use client briefly
                break

        # Verify all clients were created
        assert len(clients) == 3
        # Each should be a Redis client instance
        for client in clients:
            assert client is not None

    @pytest.mark.asyncio
    async def test_no_leaked_connections_after_operations(self):
        """Test that services can be created and used with proper client management."""
        # Simulate multiple operations
        async for redis_client in get_redis_client():
            queue_service = QueueService(redis_client=redis_client)
            job_service = JobService(redis_client=redis_client)

            # Simulate some operations
            assert queue_service is not None
            assert job_service is not None
            break

        # Test completed - client cleanup happens in finally block of generator


class TestMultipleWorkerCycles:
    """Tests for multiple worker start/stop cycles."""

    @pytest.mark.asyncio
    async def test_worker_can_restart_multiple_times(self):
        """Test that worker can be started and stopped multiple times without leaks."""
        from src.services.job_service import JobService
        from src.services.queue_service import QueueService
        from src.services.storage_service import StorageService
        from src.workers.pii_worker import PIIWorker

        for cycle in range(3):
            # Create mock services
            mock_redis = AsyncMock()
            mock_s3 = MagicMock()

            storage_service = StorageService(
                s3_client=mock_s3,
                temp_bucket="test-temp",
                results_bucket="test-results"
            )
            queue_service = QueueService(redis_client=mock_redis)
            job_service = JobService(redis_client=mock_redis)
            s3_url_service = S3URLService(
                s3_client=mock_s3,
                temp_bucket="test-temp",
                results_bucket="test-results"
            )

            # Create worker
            worker = PIIWorker(
                storage_service=storage_service,
                queue_service=queue_service,
                job_service=job_service,
                s3_url_service=s3_url_service
            )

            # Start worker
            worker_task = asyncio.create_task(worker.start())

            # Wait for worker to be running (with timeout)
            for _ in range(50):  # 50 * 0.01s = 0.5s max wait
                if worker.running:
                    break
                await asyncio.sleep(0.01)

            # Verify worker started
            assert worker.running is True

            # Stop worker
            worker.stop()

            # Wait for worker to fully stop
            for _ in range(100):  # 100 * 0.01s = 1s max wait
                if not worker.running:
                    break
                await asyncio.sleep(0.01)

            # Cancel the task if still running
            if not worker_task.done():
                worker_task.cancel()
                try:
                    await worker_task
                except asyncio.CancelledError:
                    pass

            # Verify worker stopped
            assert worker.running is False

    @pytest.mark.asyncio
    async def test_exception_during_init_doesnt_leak(self):
        """Test that exceptions during initialization are handled properly."""
        exception_caught = False

        try:
            async for redis_client in get_redis_client():
                # Simulate exception during service initialization
                raise RuntimeError("Initialization failed")
        except RuntimeError:
            exception_caught = True

        # Verify exception was properly handled
        assert exception_caught


class TestResourceCleanupOnExceptions:
    """Tests for resource cleanup when exceptions occur."""

    @pytest.mark.asyncio
    async def test_cleanup_on_service_exception(self):
        """Test that generator cleanup happens when service raises exception."""
        exception_caught = False

        try:
            async for redis_client in get_redis_client():
                QueueService(redis_client=redis_client)
                # Simulate exception during usage
                raise ValueError("Service error")
        except ValueError:
            exception_caught = True

        # Verify exception was properly handled - cleanup happens in finally block
        assert exception_caught

    @pytest.mark.asyncio
    @pytest.mark.timeout(15)  # Extend timeout for this test (worker sleeps 5s on error)
    async def test_cleanup_on_worker_exception(self, mocker):
        """Test that resources are cleaned up when worker raises exception."""
        from src.workers.pii_worker import PIIWorker

        # Mock settings.worker_error_sleep_seconds to avoid long waits
        mocker.patch("src.config.settings.worker_error_sleep_seconds", 0.1)

        mock_redis = AsyncMock()
        mock_s3 = MagicMock()

        storage_service = StorageService(
            s3_client=mock_s3,
            temp_bucket="test",
            results_bucket="test"
        )
        queue_service = QueueService(redis_client=mock_redis)
        job_service = JobService(redis_client=mock_redis)
        s3_url_service = S3URLService(
            s3_client=mock_s3,
            temp_bucket="test",
            results_bucket="test"
        )

        # Mock dequeue to raise exception
        queue_service.dequeue = AsyncMock(side_effect=RuntimeError("Queue error"))

        worker = PIIWorker(
            storage_service=storage_service,
            queue_service=queue_service,
            job_service=job_service,
            s3_url_service=s3_url_service
        )

        # Start worker
        worker_task = asyncio.create_task(worker.start())

        # Wait for worker to start and hit error at least once
        # With mocked 0.1s sleep, this should complete quickly
        for _ in range(50):  # 50 * 0.05s = 2.5s max wait
            await asyncio.sleep(0.05)
            if queue_service.dequeue.call_count >= 1:  # Hit error at least once
                break

        # Stop worker (worker continues running despite errors)
        worker.stop()

        # Wait for worker to finish (increased timeout for safety)
        try:
            await asyncio.wait_for(worker_task, timeout=2.0)
        except TimeoutError:
            worker_task.cancel()
            try:
                await worker_task
            except asyncio.CancelledError:
                pass

        # Worker should have stopped despite errors
        assert worker.running is False