1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176"""Unit tests for PIIDetectionService.
Tests PII routing logic: documents with PII go to approval queue,
clean documents trigger processing directly.
These are Tier 1 tests - catching compliance violations (PII routing wrong).
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.services.pii_service import PIIDetectionService
from src.shared.constants.queues import APPROVAL_QUEUE
from src.shared.constants.statuses import (
STATUS_AWAITING_APPROVAL,
STATUS_PROCESSING,
)
from src.shared.models.pii import PIIFinding
from tests.conftest_fixtures.data_factories import create_pii_queue_payload
pytestmark = [pytest.mark.unit, pytest.mark.asyncio]
class TestPIIDetectionRouting:
"""Test PII detection routing logic - the core compliance-critical behavior."""
@pytest.fixture
def mock_storage_service(self):
"""Mock StorageService that returns fake PDF content."""
mock = MagicMock()
mock.download_temp_file = AsyncMock(return_value=b"fake_pdf_content")
return mock
@pytest.fixture
def mock_queue_service(self):
"""Mock QueueService for queue operations."""
mock = MagicMock()
mock.enqueue = AsyncMock()
mock.add_to_timeout_tracking = AsyncMock()
# Add redis attribute for processing service
mock.redis = MagicMock()
return mock
@pytest.fixture
def mock_job_service(self):
"""Mock JobService for status updates."""
mock = MagicMock()
mock.update_job_status = AsyncMock()
mock.store_approval_token_mapping = AsyncMock()
# Add get_job for processing flow
mock.get_job = AsyncMock(return_value={
"job_id": "test-job-id",
"original_filename": "test.pdf",
"review_mode": "auto",
})
return mock
@pytest.fixture
def mock_s3_url_service(self):
"""Mock S3URLService for URL generation."""
mock = MagicMock()
mock.generate_url = AsyncMock(return_value="http://example.com/test.pdf")
mock.temp_bucket = "temp-bucket"
mock.results_bucket = "results-bucket"
return mock
@pytest.fixture
def pii_service(self, mock_storage_service, mock_queue_service, mock_job_service, mock_s3_url_service):
"""Create PIIDetectionService with mocked dependencies."""
return PIIDetectionService(
storage_service=mock_storage_service,
queue_service=mock_queue_service,
job_service=mock_job_service,
s3_url_service=mock_s3_url_service,
)
async def test_pii_found_routes_to_approval_queue(
self, pii_service, mock_queue_service, mock_job_service
):
"""PDF with detected PII goes to approval queue, not processing.
This test catches: compliance violations where PII documents
are processed without human approval.
"""
job = create_pii_queue_payload()
pii_finding = PIIFinding(
entity_type="EMAIL_ADDRESS",
start=0,
end=20,
score=0.95,
text="student@example.com",
)
# Mock PII analyzer to find PII
with patch.object(
pii_service.pii_analyzer,
"analyze_text",
return_value=[pii_finding],
):
# Mock PDF extraction
with patch(
"src.services.pii_service.extract_pdf_text",
return_value="Contact: student@example.com",
):
await pii_service.process_pii_job(job)
# Verify: job queued to APPROVAL_QUEUE (not PROCESSING_QUEUE)
enqueue_calls = mock_queue_service.enqueue.call_args_list
assert len(enqueue_calls) == 1
queue_name, payload = enqueue_calls[0][0]
assert queue_name == APPROVAL_QUEUE
# Verify: job status set to awaiting_approval
status_calls = mock_job_service.update_job_status.call_args_list
final_status_call = status_calls[-1]
assert final_status_call[0][1] == STATUS_AWAITING_APPROVAL
# Verify: timeout tracking added (for approval expiration)
mock_queue_service.add_to_timeout_tracking.assert_called_once()
async def test_clean_pdf_triggers_processing_directly(
self, pii_service, mock_queue_service, mock_job_service
):
"""PDF without PII skips approval, triggers processing directly.
This test catches: clean documents incorrectly going to approval
queue, causing unnecessary delays.
Note: Processing is now triggered directly via asyncio.create_task
instead of queuing to a processing queue (which had no consumer).
"""
job = create_pii_queue_payload()
# Mock PII analyzer to find NO PII
with patch.object(
pii_service.pii_analyzer,
"analyze_text",
return_value=[], # No PII findings
):
# Mock PDF extraction
with patch(
"src.services.pii_service.extract_pdf_text",
return_value="Chapter 1: Introduction to Mathematics",
):
# Mock DocumentProcessingService (imported inside the function)
with patch(
"src.services.document_processing_service.DocumentProcessingService"
) as mock_processing_service_class:
mock_processing_service = MagicMock()
mock_processing_service.process_document = AsyncMock()
mock_processing_service_class.return_value = mock_processing_service
# Mock asyncio.create_task to capture the call
with patch("asyncio.create_task") as mock_create_task:
await pii_service.process_pii_job(job)
# Verify: processing service was instantiated
mock_processing_service_class.assert_called_once()
# Verify: create_task was called to trigger processing
mock_create_task.assert_called_once()
# Verify: NO enqueue to any queue (processing triggered directly)
enqueue_calls = mock_queue_service.enqueue.call_args_list
assert len(enqueue_calls) == 0
# Verify: job status set to processing
status_calls = mock_job_service.update_job_status.call_args_list
# First call is STATUS_PII_SCANNING, second is STATUS_PROCESSING
processing_status_call = [c for c in status_calls if c[0][1] == STATUS_PROCESSING]
assert len(processing_status_call) == 1
# Verify: NO timeout tracking (not needed for processing)
mock_queue_service.add_to_timeout_tracking.assert_not_called()