subcog/observability/
tracing.rs1use crate::config::TracingSettings;
4use crate::{Error, Result};
5use opentelemetry::KeyValue;
6use opentelemetry::global;
7use opentelemetry::trace::TracerProvider as _;
8use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
9use opentelemetry_otlp::{LogExporter, Protocol, SpanExporter, WithExportConfig};
10use opentelemetry_sdk::Resource;
11use opentelemetry_sdk::logs::SdkLoggerProvider;
12use opentelemetry_sdk::propagation::TraceContextPropagator;
13use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracerProvider};
14use tracing_opentelemetry::OpenTelemetryLayer;
15use tracing_subscriber::Registry;
16
17use super::otlp::{OtlpConfig, OtlpProtocol, endpoint_from_env};
18
19const DEFAULT_TRACE_SAMPLE_RATIO: f64 = 1.0;
20
21#[derive(Debug, Clone)]
23pub struct TracingConfig {
24 pub enabled: bool,
26 pub otlp: OtlpConfig,
28 pub sample_ratio: f64,
30 pub service_name: String,
32 pub service_version: String,
34 pub resource_attributes: Vec<KeyValue>,
36}
37
38impl TracingConfig {
39 #[must_use]
41 pub fn from_env() -> Self {
42 Self::from_settings(None)
43 }
44
45 #[must_use]
47 pub fn from_settings(settings: Option<&TracingSettings>) -> Self {
48 let otlp = OtlpConfig::from_settings(settings.and_then(|config| config.otlp.as_ref()));
49 let endpoint_present = otlp.endpoint.is_some();
50
51 let enabled = settings
52 .and_then(|config| config.enabled)
53 .unwrap_or(endpoint_present);
54 let sample_ratio = settings
55 .and_then(|config| config.sample_ratio)
56 .unwrap_or(DEFAULT_TRACE_SAMPLE_RATIO);
57 let service_name = settings
58 .and_then(|config| config.service_name.clone())
59 .unwrap_or_else(|| env!("CARGO_PKG_NAME").to_string());
60 let service_version = env!("CARGO_PKG_VERSION").to_string();
61 let resource_attributes = settings
62 .and_then(|config| config.resource_attributes.clone())
63 .map(parse_resource_attributes_from_settings)
64 .unwrap_or_default();
65
66 let mut config = Self {
67 enabled,
68 otlp,
69 sample_ratio,
70 service_name,
71 service_version,
72 resource_attributes,
73 };
74
75 apply_env_overrides(&mut config);
76 config
77 }
78}
79
80pub struct TracingInit {
82 pub layer: OpenTelemetryLayer<Registry, opentelemetry_sdk::trace::Tracer>,
84 pub provider: SdkTracerProvider,
86 pub logs_layer:
88 OpenTelemetryTracingBridge<SdkLoggerProvider, opentelemetry_sdk::logs::SdkLogger>,
89 pub logger_provider: SdkLoggerProvider,
91 pub runtime: Option<tokio::runtime::Runtime>,
93}
94
95pub struct Tracer;
97
98impl Tracer {
99 #[must_use]
101 pub const fn new() -> Self {
102 Self
103 }
104}
105
106impl Default for Tracer {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112pub fn build_tracing(config: &TracingConfig) -> Result<Option<TracingInit>> {
114 if !config.enabled {
115 return Ok(None);
116 }
117
118 let endpoint = config
119 .otlp
120 .endpoint
121 .clone()
122 .ok_or_else(|| Error::OperationFailed {
123 operation: "tracing_init".to_string(),
124 cause: "OTLP endpoint required when tracing is enabled".to_string(),
125 })?;
126
127 let runtime = match (config.otlp.protocol, tokio::runtime::Handle::try_current()) {
128 (OtlpProtocol::Grpc, Err(_)) => Some(
129 tokio::runtime::Builder::new_multi_thread()
130 .enable_all()
131 .build()
132 .map_err(|e| Error::OperationFailed {
133 operation: "otlp_runtime_init".to_string(),
134 cause: e.to_string(),
135 })?,
136 ),
137 _ => None,
138 };
139
140 let _guard = runtime.as_ref().map(tokio::runtime::Runtime::enter);
141
142 let trace_exporter = match config.otlp.protocol {
144 OtlpProtocol::Grpc => SpanExporter::builder()
145 .with_tonic()
146 .with_endpoint(&endpoint)
147 .build()
148 .map_err(|e| Error::OperationFailed {
149 operation: "otlp_exporter_build".to_string(),
150 cause: e.to_string(),
151 })?,
152 OtlpProtocol::Http => SpanExporter::builder()
153 .with_http()
154 .with_protocol(Protocol::HttpBinary)
155 .with_endpoint(&endpoint)
156 .build()
157 .map_err(|e| Error::OperationFailed {
158 operation: "otlp_exporter_build".to_string(),
159 cause: e.to_string(),
160 })?,
161 };
162
163 let log_exporter = match config.otlp.protocol {
165 OtlpProtocol::Grpc => LogExporter::builder()
166 .with_tonic()
167 .with_endpoint(&endpoint)
168 .build()
169 .map_err(|e| Error::OperationFailed {
170 operation: "otlp_log_exporter_build".to_string(),
171 cause: e.to_string(),
172 })?,
173 OtlpProtocol::Http => LogExporter::builder()
174 .with_http()
175 .with_protocol(Protocol::HttpBinary)
176 .with_endpoint(&endpoint)
177 .build()
178 .map_err(|e| Error::OperationFailed {
179 operation: "otlp_log_exporter_build".to_string(),
180 cause: e.to_string(),
181 })?,
182 };
183
184 let mut attributes = vec![
185 KeyValue::new("service.name", config.service_name.clone()),
186 KeyValue::new("service.version", config.service_version.clone()),
187 ];
188 attributes.extend(config.resource_attributes.clone());
189
190 let sampler = build_sampler(config.sample_ratio);
191 let resource = Resource::builder()
192 .with_attributes(attributes.clone())
193 .build();
194 let provider = SdkTracerProvider::builder()
195 .with_sampler(sampler)
196 .with_id_generator(RandomIdGenerator::default())
197 .with_resource(resource.clone())
198 .with_batch_exporter(trace_exporter)
199 .build();
200
201 let logger_provider = SdkLoggerProvider::builder()
203 .with_resource(resource)
204 .with_batch_exporter(log_exporter)
205 .build();
206
207 global::set_text_map_propagator(TraceContextPropagator::new());
208 global::set_tracer_provider(provider.clone());
209
210 let tracer = provider.tracer(config.service_name.clone());
211 let layer = OpenTelemetryLayer::new(tracer);
212 let logs_layer = OpenTelemetryTracingBridge::new(&logger_provider);
213
214 Ok(Some(TracingInit {
215 layer,
216 provider,
217 logs_layer,
218 logger_provider,
219 runtime,
220 }))
221}
222
223fn parse_resource_attributes() -> Vec<KeyValue> {
224 let Ok(raw) = std::env::var("OTEL_RESOURCE_ATTRIBUTES") else {
225 return Vec::new();
226 };
227
228 raw.split(',')
229 .filter_map(|pair| {
230 let (key, value) = pair.split_once('=')?;
231 let key = key.trim();
232 let value = value.trim();
233 if key.is_empty() || value.is_empty() {
234 return None;
235 }
236 Some(KeyValue::new(key.to_string(), value.to_string()))
237 })
238 .collect()
239}
240
241fn parse_resource_attributes_from_settings(values: Vec<String>) -> Vec<KeyValue> {
242 values
243 .into_iter()
244 .filter_map(|pair| {
245 let (key, value) = pair.split_once('=')?;
246 let key = key.trim();
247 let value = value.trim();
248 if key.is_empty() || value.is_empty() {
249 return None;
250 }
251 Some(KeyValue::new(key.to_string(), value.to_string()))
252 })
253 .collect()
254}
255
256fn parse_sample_ratio() -> Option<f64> {
257 if let Ok(value) = std::env::var("SUBCOG_TRACE_SAMPLE_RATIO") {
258 return value.parse::<f64>().ok().map(|v| v.clamp(0.0, 1.0));
259 }
260
261 if let Ok(value) = std::env::var("OTEL_TRACES_SAMPLER_ARG") {
262 return value.parse::<f64>().ok().map(|v| v.clamp(0.0, 1.0));
263 }
264
265 None
266}
267
268fn build_sampler(sample_ratio: f64) -> Sampler {
269 let sampler_env = std::env::var("SUBCOG_TRACING_SAMPLER")
270 .ok()
271 .or_else(|| std::env::var("OTEL_TRACES_SAMPLER").ok());
272 match sampler_env.as_deref() {
273 Some("always_on") => Sampler::AlwaysOn,
274 Some("always_off") => Sampler::AlwaysOff,
275 Some("traceidratio") => Sampler::TraceIdRatioBased(sample_ratio),
276 Some("parentbased_traceidratio") => {
277 Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(sample_ratio)))
278 },
279 Some("parentbased_always_on") => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
280 Some("parentbased_always_off") => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)),
281 _ => Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(sample_ratio))),
282 }
283}
284
285fn parse_bool_env(key: &str) -> Option<bool> {
286 std::env::var(key).ok().map(|value| {
287 let value = value.to_lowercase();
288 value == "true" || value == "1" || value == "yes"
289 })
290}
291
292fn apply_env_overrides(config: &mut TracingConfig) {
293 if let Some(enabled) = parse_bool_env("SUBCOG_TRACING_ENABLED") {
294 config.enabled = enabled;
295 } else if endpoint_from_env().is_some() {
296 config.enabled = true;
297 }
298
299 if let Some(sample_ratio) = parse_sample_ratio() {
300 config.sample_ratio = sample_ratio;
301 }
302
303 if let Ok(service_name) = std::env::var("OTEL_SERVICE_NAME") {
304 config.service_name = service_name;
305 }
306
307 if std::env::var("OTEL_RESOURCE_ATTRIBUTES").is_ok() {
308 config.resource_attributes = parse_resource_attributes();
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 #[test]
317 fn test_otlp_tracing_init_smoke() {
318 let config = TracingConfig {
319 enabled: true,
320 otlp: OtlpConfig {
321 endpoint: Some("http://localhost:4318".to_string()),
322 protocol: OtlpProtocol::Http,
323 },
324 sample_ratio: 1.0,
325 service_name: "subcog-test".to_string(),
326 service_version: "0.0.0".to_string(),
327 resource_attributes: Vec::new(),
328 };
329
330 let init = build_tracing(&config)
331 .expect("build tracing")
332 .expect("init tracing");
333 let _ = init.provider.shutdown();
334 let _ = init.logger_provider.shutdown();
335 }
336}