1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93# How to add a new S3 operation
Use this recipe when adding a `StorageService` method that calls S3. For the design rationale (why circuit breakers, why `S3CleanupService` is excluded), see [S3 resilience](../explanation/s3-resilience.md).
## 1. Pick the right circuit breaker
| Operation class | Use | File |
|---|---|---|
| Uploads (`put_object`, `upload_fileobj`) | `self.upload_circuit` | `src/services/storage_service.py` |
| Reads (`get_object`, `head_object`, `list_objects`) | `self.download_circuit` | same |
| Deletes | Put it in `S3CleanupService` | `src/services/s3_cleanup_service.py` |
`S3CleanupService` has **no** circuit breakers on purpose — cleanup is best-effort and non-blocking. If you need a delete operation that's critical-path, that's a design-review conversation, not a code change.
## 2. Check the circuit before the operation
```python
self.upload_circuit.check_state()
if self.upload_circuit.is_open:
raise CircuitBreakerOpen("S3 upload circuit breaker is open")
```
## 3. Wrap the S3 call with retry
```python
from src.utils.retry_helpers import retry_with_backoff_for_sync_func
try:
result = await retry_with_backoff_for_sync_func(
lambda: self.s3_client.put_object(...),
max_attempts=3,
base_delay=1.0,
operation_name="upload result",
)
self.upload_circuit.record_success()
return result
except CircuitBreakerOpen:
raise
except Exception:
self.upload_circuit.record_failure()
raise
```
Defaults: 3 attempts with 1s / 2s / 4s backoff. Boto3 adaptive retry is already enabled in the client factory.
## 4. Add metrics (recommended)
```python
from ..services.metrics_service import (
s3_operations_total,
s3_operation_duration_seconds,
)
import time
start = time.time()
try:
# ... S3 operation ...
s3_operations_total.labels(operation='put_object', bucket=bucket, result='success').inc()
s3_operation_duration_seconds.labels(operation='put_object', bucket=bucket).observe(time.time() - start)
except CircuitBreakerOpen:
s3_operations_total.labels(operation='put_object', bucket=bucket, result='circuit_open').inc()
raise
except Exception:
s3_operations_total.labels(operation='put_object', bucket=bucket, result='error').inc()
raise
```
These metrics are exposed to Prometheus at `/metrics` (scraped every 15s) and are queryable in Grafana's Explore view. Add a panel to an existing dashboard if the question you care about recurs often enough to want a chart.
## 5. Test retry and circuit behaviour
Unit tests mock S3 error responses:
```python
from botocore.exceptions import ClientError
from src.utils.circuit_breaker import CircuitBreakerOpen
# Retry: fails twice, succeeds on third attempt
mock_s3.put_object.side_effect = [
ClientError({'Error': {'Code': 'SlowDown'}}, 'put_object'),
ClientError({'Error': {'Code': 'SlowDown'}}, 'put_object'),
{'ETag': 'success'},
]
# Circuit opens after 5 consecutive failures
for _ in range(5):
storage.upload_circuit.record_failure()
with pytest.raises(CircuitBreakerOpen):
await storage.upload_result(job_id, content, 'md')
```
Integration tests use Floci to exercise the real wire protocol — see `tests/unit/utils/test_circuit_breaker.py` and `tests/unit/services/test_storage_service.py` for examples.