📦 EqualifyEverything / equalify-reflow

📄 test_pii_service.py · 176 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176"""Unit tests for PIIDetectionService.

Tests PII routing logic: documents with PII go to approval queue,
clean documents trigger processing directly.

These are Tier 1 tests - catching compliance violations (PII routing wrong).
"""

from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from src.services.pii_service import PIIDetectionService
from src.shared.constants.queues import APPROVAL_QUEUE
from src.shared.constants.statuses import (
    STATUS_AWAITING_APPROVAL,
    STATUS_PROCESSING,
)
from src.shared.models.pii import PIIFinding

from tests.conftest_fixtures.data_factories import create_pii_queue_payload

pytestmark = [pytest.mark.unit, pytest.mark.asyncio]


class TestPIIDetectionRouting:
    """Test PII detection routing logic - the core compliance-critical behavior."""

    @pytest.fixture
    def mock_storage_service(self):
        """Mock StorageService that returns fake PDF content."""
        mock = MagicMock()
        mock.download_temp_file = AsyncMock(return_value=b"fake_pdf_content")
        return mock

    @pytest.fixture
    def mock_queue_service(self):
        """Mock QueueService for queue operations."""
        mock = MagicMock()
        mock.enqueue = AsyncMock()
        mock.add_to_timeout_tracking = AsyncMock()
        # Add redis attribute for processing service
        mock.redis = MagicMock()
        return mock

    @pytest.fixture
    def mock_job_service(self):
        """Mock JobService for status updates."""
        mock = MagicMock()
        mock.update_job_status = AsyncMock()
        mock.store_approval_token_mapping = AsyncMock()
        # Add get_job for processing flow
        mock.get_job = AsyncMock(return_value={
            "job_id": "test-job-id",
            "original_filename": "test.pdf",
            "review_mode": "auto",
        })
        return mock

    @pytest.fixture
    def mock_s3_url_service(self):
        """Mock S3URLService for URL generation."""
        mock = MagicMock()
        mock.generate_url = AsyncMock(return_value="http://example.com/test.pdf")
        mock.temp_bucket = "temp-bucket"
        mock.results_bucket = "results-bucket"
        return mock

    @pytest.fixture
    def pii_service(self, mock_storage_service, mock_queue_service, mock_job_service, mock_s3_url_service):
        """Create PIIDetectionService with mocked dependencies."""
        return PIIDetectionService(
            storage_service=mock_storage_service,
            queue_service=mock_queue_service,
            job_service=mock_job_service,
            s3_url_service=mock_s3_url_service,
        )

    async def test_pii_found_routes_to_approval_queue(
        self, pii_service, mock_queue_service, mock_job_service
    ):
        """PDF with detected PII goes to approval queue, not processing.

        This test catches: compliance violations where PII documents
        are processed without human approval.
        """
        job = create_pii_queue_payload()
        pii_finding = PIIFinding(
            entity_type="EMAIL_ADDRESS",
            start=0,
            end=20,
            score=0.95,
            text="student@example.com",
        )

        # Mock PII analyzer to find PII
        with patch.object(
            pii_service.pii_analyzer,
            "analyze_text",
            return_value=[pii_finding],
        ):
            # Mock PDF extraction
            with patch(
                "src.services.pii_service.extract_pdf_text",
                return_value="Contact: student@example.com",
            ):
                await pii_service.process_pii_job(job)

        # Verify: job queued to APPROVAL_QUEUE (not PROCESSING_QUEUE)
        enqueue_calls = mock_queue_service.enqueue.call_args_list
        assert len(enqueue_calls) == 1
        queue_name, payload = enqueue_calls[0][0]
        assert queue_name == APPROVAL_QUEUE

        # Verify: job status set to awaiting_approval
        status_calls = mock_job_service.update_job_status.call_args_list
        final_status_call = status_calls[-1]
        assert final_status_call[0][1] == STATUS_AWAITING_APPROVAL

        # Verify: timeout tracking added (for approval expiration)
        mock_queue_service.add_to_timeout_tracking.assert_called_once()

    async def test_clean_pdf_triggers_processing_directly(
        self, pii_service, mock_queue_service, mock_job_service
    ):
        """PDF without PII skips approval, triggers processing directly.

        This test catches: clean documents incorrectly going to approval
        queue, causing unnecessary delays.

        Note: Processing is now triggered directly via asyncio.create_task
        instead of queuing to a processing queue (which had no consumer).
        """
        job = create_pii_queue_payload()

        # Mock PII analyzer to find NO PII
        with patch.object(
            pii_service.pii_analyzer,
            "analyze_text",
            return_value=[],  # No PII findings
        ):
            # Mock PDF extraction
            with patch(
                "src.services.pii_service.extract_pdf_text",
                return_value="Chapter 1: Introduction to Mathematics",
            ):
                # Mock DocumentProcessingService (imported inside the function)
                with patch(
                    "src.services.document_processing_service.DocumentProcessingService"
                ) as mock_processing_service_class:
                    mock_processing_service = MagicMock()
                    mock_processing_service.process_document = AsyncMock()
                    mock_processing_service_class.return_value = mock_processing_service

                    # Mock asyncio.create_task to capture the call
                    with patch("asyncio.create_task") as mock_create_task:
                        await pii_service.process_pii_job(job)

                        # Verify: processing service was instantiated
                        mock_processing_service_class.assert_called_once()

                        # Verify: create_task was called to trigger processing
                        mock_create_task.assert_called_once()

        # Verify: NO enqueue to any queue (processing triggered directly)
        enqueue_calls = mock_queue_service.enqueue.call_args_list
        assert len(enqueue_calls) == 0

        # Verify: job status set to processing
        status_calls = mock_job_service.update_job_status.call_args_list
        # First call is STATUS_PII_SCANNING, second is STATUS_PROCESSING
        processing_status_call = [c for c in status_calls if c[0][1] == STATUS_PROCESSING]
        assert len(processing_status_call) == 1

        # Verify: NO timeout tracking (not needed for processing)
        mock_queue_service.add_to_timeout_tracking.assert_not_called()