Skip to main content

subcog/observability/
tracing.rs

1//! Distributed tracing and OTLP logging.
2
3use crate::config::TracingSettings;
4use crate::{Error, Result};
5use opentelemetry::KeyValue;
6use opentelemetry::global;
7use opentelemetry::trace::TracerProvider as _;
8use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
9use opentelemetry_otlp::{LogExporter, Protocol, SpanExporter, WithExportConfig};
10use opentelemetry_sdk::Resource;
11use opentelemetry_sdk::logs::SdkLoggerProvider;
12use opentelemetry_sdk::propagation::TraceContextPropagator;
13use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracerProvider};
14use tracing_opentelemetry::OpenTelemetryLayer;
15use tracing_subscriber::Registry;
16
17use super::otlp::{OtlpConfig, OtlpProtocol, endpoint_from_env};
18
19const DEFAULT_TRACE_SAMPLE_RATIO: f64 = 1.0;
20
21/// Tracing configuration.
22#[derive(Debug, Clone)]
23pub struct TracingConfig {
24    /// Whether tracing is enabled.
25    pub enabled: bool,
26    /// OTLP exporter configuration.
27    pub otlp: OtlpConfig,
28    /// Sample ratio for trace sampling (0.0 - 1.0).
29    pub sample_ratio: f64,
30    /// Service name for telemetry.
31    pub service_name: String,
32    /// Service version for telemetry.
33    pub service_version: String,
34    /// Additional resource attributes.
35    pub resource_attributes: Vec<KeyValue>,
36}
37
38impl TracingConfig {
39    /// Builds tracing configuration from environment variables.
40    #[must_use]
41    pub fn from_env() -> Self {
42        Self::from_settings(None)
43    }
44
45    /// Builds tracing configuration from config settings with env overrides.
46    #[must_use]
47    pub fn from_settings(settings: Option<&TracingSettings>) -> Self {
48        let otlp = OtlpConfig::from_settings(settings.and_then(|config| config.otlp.as_ref()));
49        let endpoint_present = otlp.endpoint.is_some();
50
51        let enabled = settings
52            .and_then(|config| config.enabled)
53            .unwrap_or(endpoint_present);
54        let sample_ratio = settings
55            .and_then(|config| config.sample_ratio)
56            .unwrap_or(DEFAULT_TRACE_SAMPLE_RATIO);
57        let service_name = settings
58            .and_then(|config| config.service_name.clone())
59            .unwrap_or_else(|| env!("CARGO_PKG_NAME").to_string());
60        let service_version = env!("CARGO_PKG_VERSION").to_string();
61        let resource_attributes = settings
62            .and_then(|config| config.resource_attributes.clone())
63            .map(parse_resource_attributes_from_settings)
64            .unwrap_or_default();
65
66        let mut config = Self {
67            enabled,
68            otlp,
69            sample_ratio,
70            service_name,
71            service_version,
72            resource_attributes,
73        };
74
75        apply_env_overrides(&mut config);
76        config
77    }
78}
79
80/// Tracing initialization output.
81pub struct TracingInit {
82    /// `OpenTelemetry` layer for tracing subscriber.
83    pub layer: OpenTelemetryLayer<Registry, opentelemetry_sdk::trace::Tracer>,
84    /// Tracer provider for shutdown flushing.
85    pub provider: SdkTracerProvider,
86    /// OTLP logging layer for tracing subscriber.
87    pub logs_layer:
88        OpenTelemetryTracingBridge<SdkLoggerProvider, opentelemetry_sdk::logs::SdkLogger>,
89    /// Logger provider for shutdown flushing.
90    pub logger_provider: SdkLoggerProvider,
91    /// Tokio runtime for gRPC exporters when no runtime exists.
92    pub runtime: Option<tokio::runtime::Runtime>,
93}
94
95/// Tracer for distributed tracing.
96pub struct Tracer;
97
98impl Tracer {
99    /// Creates a new tracer.
100    #[must_use]
101    pub const fn new() -> Self {
102        Self
103    }
104}
105
106impl Default for Tracer {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112/// Builds tracing layer and provider for the given configuration.
113pub fn build_tracing(config: &TracingConfig) -> Result<Option<TracingInit>> {
114    if !config.enabled {
115        return Ok(None);
116    }
117
118    let endpoint = config
119        .otlp
120        .endpoint
121        .clone()
122        .ok_or_else(|| Error::OperationFailed {
123            operation: "tracing_init".to_string(),
124            cause: "OTLP endpoint required when tracing is enabled".to_string(),
125        })?;
126
127    let runtime = match (config.otlp.protocol, tokio::runtime::Handle::try_current()) {
128        (OtlpProtocol::Grpc, Err(_)) => Some(
129            tokio::runtime::Builder::new_multi_thread()
130                .enable_all()
131                .build()
132                .map_err(|e| Error::OperationFailed {
133                    operation: "otlp_runtime_init".to_string(),
134                    cause: e.to_string(),
135                })?,
136        ),
137        _ => None,
138    };
139
140    let _guard = runtime.as_ref().map(tokio::runtime::Runtime::enter);
141
142    // Build trace exporter
143    let trace_exporter = match config.otlp.protocol {
144        OtlpProtocol::Grpc => SpanExporter::builder()
145            .with_tonic()
146            .with_endpoint(&endpoint)
147            .build()
148            .map_err(|e| Error::OperationFailed {
149                operation: "otlp_exporter_build".to_string(),
150                cause: e.to_string(),
151            })?,
152        OtlpProtocol::Http => SpanExporter::builder()
153            .with_http()
154            .with_protocol(Protocol::HttpBinary)
155            .with_endpoint(&endpoint)
156            .build()
157            .map_err(|e| Error::OperationFailed {
158                operation: "otlp_exporter_build".to_string(),
159                cause: e.to_string(),
160            })?,
161    };
162
163    // Build logs exporter
164    let log_exporter = match config.otlp.protocol {
165        OtlpProtocol::Grpc => LogExporter::builder()
166            .with_tonic()
167            .with_endpoint(&endpoint)
168            .build()
169            .map_err(|e| Error::OperationFailed {
170                operation: "otlp_log_exporter_build".to_string(),
171                cause: e.to_string(),
172            })?,
173        OtlpProtocol::Http => LogExporter::builder()
174            .with_http()
175            .with_protocol(Protocol::HttpBinary)
176            .with_endpoint(&endpoint)
177            .build()
178            .map_err(|e| Error::OperationFailed {
179                operation: "otlp_log_exporter_build".to_string(),
180                cause: e.to_string(),
181            })?,
182    };
183
184    let mut attributes = vec![
185        KeyValue::new("service.name", config.service_name.clone()),
186        KeyValue::new("service.version", config.service_version.clone()),
187    ];
188    attributes.extend(config.resource_attributes.clone());
189
190    let sampler = build_sampler(config.sample_ratio);
191    let resource = Resource::builder()
192        .with_attributes(attributes.clone())
193        .build();
194    let provider = SdkTracerProvider::builder()
195        .with_sampler(sampler)
196        .with_id_generator(RandomIdGenerator::default())
197        .with_resource(resource.clone())
198        .with_batch_exporter(trace_exporter)
199        .build();
200
201    // Build logger provider
202    let logger_provider = SdkLoggerProvider::builder()
203        .with_resource(resource)
204        .with_batch_exporter(log_exporter)
205        .build();
206
207    global::set_text_map_propagator(TraceContextPropagator::new());
208    global::set_tracer_provider(provider.clone());
209
210    let tracer = provider.tracer(config.service_name.clone());
211    let layer = OpenTelemetryLayer::new(tracer);
212    let logs_layer = OpenTelemetryTracingBridge::new(&logger_provider);
213
214    Ok(Some(TracingInit {
215        layer,
216        provider,
217        logs_layer,
218        logger_provider,
219        runtime,
220    }))
221}
222
223fn parse_resource_attributes() -> Vec<KeyValue> {
224    let Ok(raw) = std::env::var("OTEL_RESOURCE_ATTRIBUTES") else {
225        return Vec::new();
226    };
227
228    raw.split(',')
229        .filter_map(|pair| {
230            let (key, value) = pair.split_once('=')?;
231            let key = key.trim();
232            let value = value.trim();
233            if key.is_empty() || value.is_empty() {
234                return None;
235            }
236            Some(KeyValue::new(key.to_string(), value.to_string()))
237        })
238        .collect()
239}
240
241fn parse_resource_attributes_from_settings(values: Vec<String>) -> Vec<KeyValue> {
242    values
243        .into_iter()
244        .filter_map(|pair| {
245            let (key, value) = pair.split_once('=')?;
246            let key = key.trim();
247            let value = value.trim();
248            if key.is_empty() || value.is_empty() {
249                return None;
250            }
251            Some(KeyValue::new(key.to_string(), value.to_string()))
252        })
253        .collect()
254}
255
256fn parse_sample_ratio() -> Option<f64> {
257    if let Ok(value) = std::env::var("SUBCOG_TRACE_SAMPLE_RATIO") {
258        return value.parse::<f64>().ok().map(|v| v.clamp(0.0, 1.0));
259    }
260
261    if let Ok(value) = std::env::var("OTEL_TRACES_SAMPLER_ARG") {
262        return value.parse::<f64>().ok().map(|v| v.clamp(0.0, 1.0));
263    }
264
265    None
266}
267
268fn build_sampler(sample_ratio: f64) -> Sampler {
269    let sampler_env = std::env::var("SUBCOG_TRACING_SAMPLER")
270        .ok()
271        .or_else(|| std::env::var("OTEL_TRACES_SAMPLER").ok());
272    match sampler_env.as_deref() {
273        Some("always_on") => Sampler::AlwaysOn,
274        Some("always_off") => Sampler::AlwaysOff,
275        Some("traceidratio") => Sampler::TraceIdRatioBased(sample_ratio),
276        Some("parentbased_traceidratio") => {
277            Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(sample_ratio)))
278        },
279        Some("parentbased_always_on") => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
280        Some("parentbased_always_off") => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)),
281        _ => Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(sample_ratio))),
282    }
283}
284
285fn parse_bool_env(key: &str) -> Option<bool> {
286    std::env::var(key).ok().map(|value| {
287        let value = value.to_lowercase();
288        value == "true" || value == "1" || value == "yes"
289    })
290}
291
292fn apply_env_overrides(config: &mut TracingConfig) {
293    if let Some(enabled) = parse_bool_env("SUBCOG_TRACING_ENABLED") {
294        config.enabled = enabled;
295    } else if endpoint_from_env().is_some() {
296        config.enabled = true;
297    }
298
299    if let Some(sample_ratio) = parse_sample_ratio() {
300        config.sample_ratio = sample_ratio;
301    }
302
303    if let Ok(service_name) = std::env::var("OTEL_SERVICE_NAME") {
304        config.service_name = service_name;
305    }
306
307    if std::env::var("OTEL_RESOURCE_ATTRIBUTES").is_ok() {
308        config.resource_attributes = parse_resource_attributes();
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315
316    #[test]
317    fn test_otlp_tracing_init_smoke() {
318        let config = TracingConfig {
319            enabled: true,
320            otlp: OtlpConfig {
321                endpoint: Some("http://localhost:4318".to_string()),
322                protocol: OtlpProtocol::Http,
323            },
324            sample_ratio: 1.0,
325            service_name: "subcog-test".to_string(),
326            service_version: "0.0.0".to_string(),
327            resource_attributes: Vec::new(),
328        };
329
330        let init = build_tracing(&config)
331            .expect("build tracing")
332            .expect("init tracing");
333        let _ = init.provider.shutdown();
334        let _ = init.logger_provider.shutdown();
335    }
336}