1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133"""OpenTelemetry configuration and initialization.
Provides distributed tracing for the application with:
- Automatic FastAPI instrumentation
- Custom spans for agent execution
- Optional OTLP export for Jaeger/etc.
"""
import logging
from typing import TYPE_CHECKING
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor
if TYPE_CHECKING:
from fastapi import FastAPI
logger = logging.getLogger(__name__)
# Global tracer instance
_tracer: trace.Tracer | None = None
_initialized: bool = False
def get_tracer(name: str = "equalify-reflow") -> trace.Tracer:
"""Get a tracer instance.
Args:
name: Tracer name for instrumentation scope
Returns:
OpenTelemetry Tracer instance
"""
return trace.get_tracer(name)
def init_telemetry(app: "FastAPI | None" = None) -> None:
"""Initialize OpenTelemetry with configured exporters.
Args:
app: Optional FastAPI app instance for automatic instrumentation
"""
global _initialized
if _initialized:
logger.debug("Telemetry already initialized")
return
from src.config import settings
if not settings.telemetry_enabled:
logger.info("Telemetry disabled")
_initialized = True
return
# Create resource with service info
resource = Resource.create({
SERVICE_NAME: "equalify-reflow",
"service.version": "1.0.0",
"deployment.environment": settings.environment,
})
# Set up trace provider
provider = TracerProvider(resource=resource)
# Add console exporter for development (uses SimpleSpanProcessor for immediate output)
if settings.telemetry_console_export:
provider.add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)
logger.info("OpenTelemetry console exporter enabled (immediate export)")
# Add OTLP exporter if configured (for Jaeger, etc.)
if settings.telemetry_otlp_endpoint:
try:
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(
endpoint=settings.telemetry_otlp_endpoint,
insecure=True, # Required for Docker networking without TLS
)
)
)
logger.info(f"OpenTelemetry OTLP exporter enabled: {settings.telemetry_otlp_endpoint}")
except ImportError:
logger.warning(
"OTLP exporter not available. Install opentelemetry-exporter-otlp "
"to enable OTLP export."
)
trace.set_tracer_provider(provider)
# Instrument FastAPI automatically
if app:
try:
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
# Check if already instrumented (e.g., from hot reload)
if not getattr(app, "_otel_instrumented", False):
FastAPIInstrumentor.instrument_app(app)
app._otel_instrumented = True # type: ignore[attr-defined]
logger.info("FastAPI instrumented with OpenTelemetry")
else:
logger.info("FastAPI already instrumented, skipping")
except ImportError:
logger.warning(
"FastAPI instrumentation not available. Install "
"opentelemetry-instrumentation-fastapi to enable."
)
_initialized = True
logger.info(
f"OpenTelemetry initialized "
f"(console={settings.telemetry_console_export}, "
f"otlp={settings.telemetry_otlp_endpoint or 'disabled'})"
)
def shutdown_telemetry() -> None:
"""Shutdown telemetry and flush pending spans."""
global _initialized
provider = trace.get_tracer_provider()
if hasattr(provider, "shutdown"):
provider.shutdown()
logger.info("OpenTelemetry shutdown complete")
_initialized = False