1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168"""Service for monitoring and processing approval timeouts.
This service detects jobs that have exceeded the approval deadline
and performs cleanup operations (status updates, temp file removal).
"""
import logging
from .cleanup_service import CleanupService
from .job_service import JobService
from .metrics_service import MetricsService
from .queue_service import QueueService
logger = logging.getLogger(__name__)
class TimeoutService:
"""Service for approval timeout monitoring and processing."""
def __init__(
self,
queue_service: QueueService,
job_service: JobService,
cleanup_service: CleanupService,
metrics_service: MetricsService,
):
"""Initialize timeout service.
Args:
queue_service: Redis queue operations service
job_service: Job status management service
cleanup_service: S3 cleanup service
metrics_service: Metrics tracking service
"""
self.queue_service = queue_service
self.job_service = job_service
self.cleanup_service = cleanup_service
self.metrics_service = metrics_service
async def process_expired_approvals(self) -> int:
"""Process all jobs that have exceeded the approval deadline.
Returns:
Number of expired approvals processed
"""
try:
# Query Redis sorted set for expired timeouts
# Returns [(job_id, timestamp), ...] tuples
expired_timeouts = await self.queue_service.get_expired_timeouts()
if not expired_timeouts:
logger.debug("No expired approvals found")
return 0
logger.info(f"Found {len(expired_timeouts)} expired approvals to process")
# Process each expired job
processed_count = 0
for job_id, expiration_timestamp in expired_timeouts:
try:
success = await self._process_expired_job(job_id)
if success:
processed_count += 1
except Exception as e:
logger.error(
f"Failed to process expired approval for job {job_id}: {e}",
exc_info=True
)
# Continue processing other jobs
# Update metrics
if processed_count > 0:
await self.metrics_service.increment_metric(
"approval_timeouts_processed",
processed_count
)
logger.info(
f"Processed {processed_count}/{len(expired_timeouts)} expired approvals"
)
return processed_count
except Exception as e:
logger.error(f"Error processing expired approvals: {e}", exc_info=True)
await self.metrics_service.increment_metric("approval_timeout_errors", 1)
raise
async def _process_expired_job(self, job_id: str) -> bool:
"""Process a single expired approval job.
Args:
job_id: Job ID to process
Returns:
True if successfully processed, False otherwise
"""
try:
# Check current job status (idempotency - might already be processed)
job_data = await self.job_service.get_job_status(job_id)
if not job_data:
logger.warning(
f"Job {job_id} not found in Redis, removing from timeout tracking"
)
await self.queue_service.remove_from_timeout_tracking(job_id)
return False
current_status = job_data.get("status")
# Only process jobs still awaiting approval
if current_status != "awaiting_approval":
logger.debug(
f"Job {job_id} status is {current_status}, "
f"skipping timeout processing"
)
await self.queue_service.remove_from_timeout_tracking(job_id)
return False
logger.info(f"Processing timeout for job {job_id}")
# Emit audit log before forced timeout failure
await self.job_service.emit_job_audit_log(job_id, "approval_timeout")
# Update job status to failed with timeout reason
await self.job_service.update_job_status(
job_id,
status="failed",
error_message="Approval deadline exceeded - no response received"
)
# Cleanup temp files from S3
s3_key = job_data.get("s3_key")
cleanup_success = False
if s3_key:
cleanup_success = await self.cleanup_service.cleanup_job_files(s3_key)
else:
logger.warning(f"Job {job_id} missing s3_key, cannot cleanup files")
if not cleanup_success:
logger.warning(
f"Cleanup failed for timed-out job {job_id}, "
f"but status updated to failed"
)
# Remove from timeout tracking sorted set
await self.queue_service.remove_from_timeout_tracking(job_id)
logger.info(f"Successfully processed timeout for job {job_id}")
return True
except Exception as e:
logger.error(f"Failed to process expired job {job_id}: {e}", exc_info=True)
return False
async def get_pending_approval_count(self) -> int:
"""Get count of jobs currently awaiting approval.
Returns:
Number of jobs in timeout tracking
"""
try:
count = await self.queue_service.get_timeout_count()
logger.debug(f"Pending approval count: {count}")
return count
except Exception as e:
logger.error(f"Failed to get pending approval count: {e}", exc_info=True)
return 0