📦 EqualifyEverything / equalify-reflow

📄 data_factories.py · 177 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177"""
Test data factories to eliminate hardcoded values.

Provides generators for:
- UUIDs (job_id, document_id)
- Job models (with realistic defaults)
- Document metadata
- Queue payloads
- S3 keys

All functions return unique values per call unless specified.
"""

import uuid
from datetime import UTC, datetime
from io import BytesIO

from src.shared.models.queue import PIIQueuePayload, ProcessingQueuePayload


def generate_job_id() -> str:
    """Generate unique UUID for test jobs.

    Returns:
        str: UUID v4 string
    """
    return str(uuid.uuid4())


def generate_document_id() -> str:
    """Generate unique UUID for test documents.

    Returns:
        str: UUID v4 string
    """
    return str(uuid.uuid4())


def create_test_job_dict(
    job_id: str | None = None,
    status: str = "pending",
    confidence_score: float | None = None,
    created_at: datetime | None = None,
    **kwargs
) -> dict:
    """Create test job dictionary with realistic defaults.

    Args:
        job_id: Job UUID (generated if not provided)
        status: Job status string (default: "pending")
        confidence_score: Confidence score 0-100 (optional)
        created_at: Creation timestamp (defaults to now)
        **kwargs: Additional job fields to override

    Returns:
        dict: Test job data
    """
    job_id = job_id or generate_job_id()
    created_at = created_at or datetime.now(UTC)

    defaults = {
        "job_id": job_id,
        "status": status,
        "s3_temp_key": f"temp/{job_id}/document.pdf",
        "created_at": created_at.isoformat(),
        "updated_at": created_at.isoformat(),
    }

    if confidence_score is not None:
        defaults["confidence_score"] = confidence_score

    # Merge with any additional kwargs
    defaults.update(kwargs)

    return defaults


def create_pii_queue_payload(
    job_id: str | None = None,
    s3_key: str | None = None,
    created_at: datetime | None = None,
) -> PIIQueuePayload:
    """Create test PII queue payload.

    Args:
        job_id: Job UUID (generated if not provided)
        s3_key: S3 key for document (generated if not provided)
        created_at: Creation timestamp (defaults to now)

    Returns:
        PIIQueuePayload: Test payload
    """
    job_id = job_id or generate_job_id()
    s3_key = s3_key or f"temp/{job_id}/document.pdf"
    created_at = created_at or datetime.now(UTC)

    return PIIQueuePayload(
        job_id=job_id,
        s3_key=s3_key,
        created_at=created_at,
    )


def create_processing_queue_payload(
    job_id: str | None = None,
    s3_key: str | None = None,
    pii_approved: bool = True,
    created_at: datetime | None = None,
) -> ProcessingQueuePayload:
    """Create test processing queue payload.

    Args:
        job_id: Job UUID (generated if not provided)
        s3_key: S3 key for document (generated if not provided)
        pii_approved: Whether PII scan approved
        created_at: Creation timestamp (defaults to now)

    Returns:
        ProcessingQueuePayload: Test payload
    """
    job_id = job_id or generate_job_id()
    s3_key = s3_key or f"temp/{job_id}/document.pdf"
    created_at = created_at or datetime.now(UTC)

    return ProcessingQueuePayload(
        job_id=job_id,
        s3_key=s3_key,
        pii_approved=pii_approved,
        created_at=created_at,
    )


def create_s3_key(job_id: str, stage: str = "temp", filename: str = "document.pdf") -> str:
    """Generate S3 key following project conventions.

    Args:
        job_id: Job UUID
        stage: Stage prefix (temp/results)
        filename: Document filename

    Returns:
        str: S3 key (e.g., "temp/uuid/document.pdf")
    """
    return f"{stage}/{job_id}/{filename}"


def create_test_pdf_content() -> bytes:
    """Generate minimal valid PDF content for testing.

    Returns:
        bytes: Minimal PDF file content (> 100 bytes)
    """
    pdf_content = b"%PDF-1.4\n" + b"%Test PDF content line\n" * 10 + b"%%EOF"
    return pdf_content


def create_test_upload_file(mocker, filename: str = "test.pdf", content: bytes | None = None):
    """Create mock UploadFile for FastAPI endpoints.

    Args:
        mocker: pytest-mock mocker fixture
        filename: Filename for upload
        content: File content (uses create_test_pdf_content if not provided)

    Returns:
        Mock UploadFile instance
    """
    content = content or create_test_pdf_content()
    file_obj = BytesIO(content)

    upload_file = mocker.Mock()
    upload_file.filename = filename
    upload_file.file = file_obj
    upload_file.content_type = "application/pdf"

    return upload_file