๐Ÿ“ฆ EqualifyEverything / equalify

๐Ÿ“„ migrateStaleBlockers.ts ยท 288 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288import { db, event } from "#src/utils";

//
// One-time migration to denormalize counts and move historical blockers
// to the stale_blockers graveyard table.
//
// POST /internal/migrateStaleBlockers
// body: {
//   phase: "ignored_hashes" | "scan_counts" | "move_stale",
//   scansPerBatch?: number,        // for move_stale, default 5
//   maxBatches?: number,            // for move_stale, default 20 (caps work per invocation)
//   auditId?: string,               // optional scope to one audit (testing)
//   dryRun?: boolean
// }
//
// Run phases in order: ignored_hashes โ†’ scan_counts โ†’ move_stale (repeat until done).
//

export const migrateStaleBlockers = async () => {
    const body = (event.body as any) || {};
    const phase: string = body.phase;
    const auditId: string | undefined = body.auditId;
    const dryRun: boolean = !!body.dryRun;

    if (!phase) {
        return {
            statusCode: 400,
            body: { error: "phase is required: 'ignored_hashes' | 'scan_counts' | 'move_stale'" },
        };
    }

    await db.connect();
    const t0 = Date.now();

    try {
        if (phase === "ignored_hashes") {
            // Backfill content_hash_id on existing ignored_blockers rows by joining
            // back to blockers. Safe to re-run (NULL-only update).
            const auditFilter = auditId ? `AND ib.audit_id = $1` : "";
            const values = auditId ? [auditId] : [];

            const result = await db.query({
                text: `
                    ${dryRun ? "SELECT COUNT(*) AS updated FROM" : "UPDATE"} ignored_blockers ib
                    ${dryRun ? "JOIN" : "SET content_hash_id = b.content_hash_id FROM"} blockers b
                    ${dryRun ? "ON" : "WHERE"} ib.blocker_id = b.id
                    AND ib.content_hash_id IS NULL
                    ${auditFilter}
                `,
                values,
            });

            const updated = dryRun ? result.rows[0]?.updated : result.rowCount;
            await db.clean();
            return {
                statusCode: 200,
                body: { phase, dryRun, updated, ms: Date.now() - t0 },
            };
        }

        if (phase === "scan_counts") {
            // Backfill blocker_count + equalified_count on every scan row from the
            // active blockers table. MUST run BEFORE any blockers are moved to stale.
            const auditFilter = auditId ? `WHERE s.audit_id = $1` : "";
            const values = auditId ? [auditId] : [];

            if (dryRun) {
                const result = await db.query({
                    text: `SELECT COUNT(*) AS scan_count FROM scans s ${auditFilter}`,
                    values,
                });
                await db.clean();
                return {
                    statusCode: 200,
                    body: { phase, dryRun, scansToUpdate: result.rows[0]?.scan_count, ms: Date.now() - t0 },
                };
            }

            const result = await db.query({
                text: `
                    UPDATE scans s SET
                        blocker_count = COALESCE((SELECT COUNT(*) FROM blockers WHERE scan_id = s.id), 0),
                        equalified_count = COALESCE((SELECT COUNT(*) FROM blockers WHERE scan_id = s.id AND equalified = true), 0)
                    ${auditFilter}
                `,
                values,
            });

            await db.clean();
            return {
                statusCode: 200,
                body: { phase, scansUpdated: result.rowCount, ms: Date.now() - t0 },
            };
        }

        if (phase === "move_stale") {
            // Move blockers for non-latest scans (per audit) from blockers โ†’ stale_blockers.
            // Batched: process N stale scans per Lambda invocation. Caller re-invokes until done.
            const scansPerBatch: number = body.scansPerBatch ?? 5;
            const maxBatches: number = body.maxBatches ?? 20;
            // Stop starting new batches with this much headroom left in the Lambda budget.
            // Default 100s (fits 2-min Lambda). Bump for longer Lambdas via body.softDeadlineMs.
            const softDeadlineMs: number = body.softDeadlineMs ?? 100000;

            // Pre-check: how many stale scans remain to migrate (have blockers still in active table)
            const remainingResult = await db.query({
                text: `
                    WITH latest AS (
                        SELECT DISTINCT ON (audit_id) id
                        FROM scans
                        ${auditId ? "WHERE audit_id = $1" : ""}
                        ORDER BY audit_id, created_at DESC
                    )
                    SELECT COUNT(DISTINCT b.scan_id) AS remaining
                    FROM blockers b
                    WHERE b.scan_id IS NOT NULL
                    AND b.scan_id NOT IN (SELECT id FROM latest)
                    ${auditId ? "AND b.audit_id = $1" : ""}
                `,
                values: auditId ? [auditId] : [],
            });
            const remainingStaleScans = parseInt(remainingResult.rows[0]?.remaining ?? "0", 10);

            if (dryRun) {
                await db.clean();
                return {
                    statusCode: 200,
                    body: { phase, dryRun, remainingStaleScans, ms: Date.now() - t0 },
                };
            }

            let batchesRun = 0;
            let totalRowsMoved = 0;
            const batchResults: any[] = [];

            while (batchesRun < maxBatches && Date.now() - t0 < softDeadlineMs) {
                const moveResult = await db.query({
                    text: `
                        WITH latest AS (
                            SELECT DISTINCT ON (audit_id) id
                            FROM scans
                            ${auditId ? "WHERE audit_id = $1" : ""}
                            ORDER BY audit_id, created_at DESC
                        ),
                        target_scans AS (
                            SELECT DISTINCT b.scan_id
                            FROM blockers b
                            WHERE b.scan_id IS NOT NULL
                            AND b.scan_id NOT IN (SELECT id FROM latest)
                            ${auditId ? "AND b.audit_id = $1" : ""}
                            LIMIT ${scansPerBatch}
                        ),
                        moved AS (
                            DELETE FROM blockers
                            WHERE scan_id IN (SELECT scan_id FROM target_scans)
                            RETURNING id, created_at, updated_at, audit_id, content, content_normalized,
                                      content_hash_id, targets, equalified, url_id, scan_id, short_id
                        )
                        INSERT INTO stale_blockers
                            (id, created_at, updated_at, audit_id, content, content_normalized,
                             content_hash_id, targets, equalified, url_id, scan_id, short_id)
                        SELECT id, created_at, updated_at, audit_id, content, content_normalized,
                               content_hash_id, targets, equalified, url_id, scan_id, short_id
                        FROM moved
                    `,
                    values: auditId ? [auditId] : [],
                });

                const rowsMoved = moveResult.rowCount ?? 0;
                batchesRun++;
                totalRowsMoved += rowsMoved;
                batchResults.push({ batch: batchesRun, rowsMoved });

                if (rowsMoved === 0) break; // nothing left
            }

            await db.clean();
            return {
                statusCode: 200,
                body: {
                    phase,
                    batchesRun,
                    totalRowsMoved,
                    remainingStaleScansBefore: remainingStaleScans,
                    ms: Date.now() - t0,
                    stoppedReason: batchesRun >= maxBatches
                        ? "max_batches_reached"
                        : Date.now() - t0 >= softDeadlineMs
                            ? "soft_deadline"
                            : "no_more_work",
                    batches: batchResults,
                },
            };
        }

        if (phase === "move_orphan_blocker_messages") {
            // After move_stale, blocker_messages rows still reference the moved blockers.
            // Move those orphans to stale_blocker_messages so the link table shrinks too.
            // Batched: process rowsPerBatch rows per round.
            const rowsPerBatch: number = body.rowsPerBatch ?? 200000;
            const maxBatches: number = body.maxBatches ?? 50;
            const softDeadlineMs: number = body.softDeadlineMs ?? 100000;

            let totalRowsMoved = 0;
            let batchesRun = 0;
            const batchResults: any[] = [];

            while (batchesRun < maxBatches && Date.now() - t0 < softDeadlineMs) {
                const moveResult = await db.query({
                    text: `
                        WITH targets AS (
                            SELECT bm.id
                            FROM blocker_messages bm
                            WHERE NOT EXISTS (
                                SELECT 1 FROM blockers b WHERE b.id = bm.blocker_id
                            )
                            LIMIT ${rowsPerBatch}
                        ),
                        moved AS (
                            DELETE FROM blocker_messages
                            WHERE id IN (SELECT id FROM targets)
                            RETURNING id, created_at, updated_at, message_id, blocker_id
                        )
                        INSERT INTO stale_blocker_messages
                            (id, created_at, updated_at, message_id, blocker_id)
                        SELECT id, created_at, updated_at, message_id, blocker_id FROM moved
                    `,
                });

                const rowsMoved = moveResult.rowCount ?? 0;
                batchesRun++;
                totalRowsMoved += rowsMoved;
                batchResults.push({ batch: batchesRun, rowsMoved });

                if (rowsMoved === 0) break;
            }

            await db.clean();
            return {
                statusCode: 200,
                body: {
                    phase,
                    batchesRun,
                    totalRowsMoved,
                    ms: Date.now() - t0,
                    stoppedReason: batchesRun >= maxBatches
                        ? "max_batches_reached"
                        : Date.now() - t0 >= softDeadlineMs
                            ? "soft_deadline"
                            : "no_more_work",
                    batches: batchResults,
                },
            };
        }

        if (phase === "vacuum_full") {
            // VACUUM FULL rewrites the heap to reclaim space from dead tuples.
            // Takes an ACCESS EXCLUSIVE lock for the duration โ€” table is unavailable.
            // Hasura SQL console won't let you run this (TX wrapper); this route works
            // because serverless-postgres runs each query non-transactionally.
            const tableName: string = body.tableName;
            if (!tableName || !/^[a-z_]+$/.test(tableName)) {
                await db.clean();
                return { statusCode: 400, body: { error: "tableName required, lowercase letters/underscores only" } };
            }
            await db.query({ text: `VACUUM FULL public.${tableName}` });
            await db.clean();
            return {
                statusCode: 200,
                body: { phase, tableName, ms: Date.now() - t0 },
            };
        }

        await db.clean();
        return {
            statusCode: 400,
            body: { error: `unknown phase '${phase}'` },
        };
    } catch (err: any) {
        try { await db.clean(); } catch {}
        console.error("migrateStaleBlockers error:", err);
        return {
            statusCode: 500,
            body: { error: err?.message || String(err), phase, ms: Date.now() - t0 },
        };
    }
};