Skip to main content

subcog/observability/
mod.rs

1//! Observability and telemetry.
2
3mod event_bus;
4mod logging;
5mod metrics;
6mod otlp;
7mod request_context;
8mod tracing;
9
10pub use event_bus::{EventBus, global_event_bus};
11use logging::RedactingJsonFields;
12pub use logging::{LogFormat, Logger, LoggingConfig};
13pub use metrics::{Metrics, MetricsConfig, flush_global as flush_metrics, set_instance_label};
14pub use otlp::{OtlpConfig, OtlpExporter, OtlpProtocol};
15pub use request_context::{
16    RequestContext, current_request_id, enter_request_context, scope_request_context,
17};
18pub use tracing::{Tracer, TracingConfig};
19
20use crate::config::ObservabilitySettings;
21use crate::{Error, Result};
22use std::fs::{File, OpenOptions};
23use std::io::{self, Write};
24use std::path::Path;
25use std::sync::{Arc, Mutex, OnceLock};
26use tracing_subscriber::layer::SubscriberExt;
27use tracing_subscriber::util::SubscriberInitExt;
28
29/// Full observability configuration.
30#[derive(Debug, Clone)]
31pub struct ObservabilityConfig {
32    /// Logging configuration.
33    pub logging: LoggingConfig,
34    /// Tracing configuration.
35    pub tracing: TracingConfig,
36    /// Metrics configuration.
37    pub metrics: MetricsConfig,
38    /// Whether to expose metrics via HTTP listener.
39    pub metrics_expose: bool,
40}
41
42/// Options for environment-based initialization.
43#[derive(Debug, Clone, Copy)]
44pub struct InitOptions {
45    /// Whether verbose output was requested via CLI.
46    pub verbose: bool,
47    /// Whether to expose metrics via HTTP listener.
48    pub metrics_expose: bool,
49}
50
51/// Handle for observability runtime components.
52pub struct ObservabilityHandle {
53    tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
54    logger_provider: Option<opentelemetry_sdk::logs::SdkLoggerProvider>,
55    metrics_handle: Option<metrics::MetricsHandle>,
56    tracing_runtime: Option<tokio::runtime::Runtime>,
57}
58
59static OBSERVABILITY_INIT: OnceLock<()> = OnceLock::new();
60
61impl ObservabilityHandle {
62    /// Explicitly shuts down observability components.
63    ///
64    /// This should be called before dropping the handle when running inside
65    /// an async context to avoid panics from blocking operations.
66    pub fn shutdown(&mut self) {
67        if let Some(handle) = self.metrics_handle.take() {
68            metrics::flush(&handle);
69        }
70
71        // Force flush and shutdown providers
72        let tracer = self.tracer_provider.take();
73        let logger = self.logger_provider.take();
74
75        if tracer.is_none() && logger.is_none() {
76            let _ = self.tracing_runtime.take();
77            return;
78        }
79
80        // Take the runtime regardless of context - we need to handle it properly
81        let tracing_runtime = self.tracing_runtime.take();
82
83        // Check if we're in an async context
84        if let Ok(handle) = tokio::runtime::Handle::try_current() {
85            // Use block_in_place to safely run blocking operations from async context
86            // This avoids "Cannot drop a runtime in a context where blocking is not allowed"
87            tokio::task::block_in_place(|| {
88                Self::wait_for_batch_export(&handle);
89                Self::flush_and_shutdown(tracer, logger);
90                Self::shutdown_runtime(tracing_runtime);
91            });
92        } else if let Some(rt) = tracing_runtime {
93            // Use block_on to ensure gRPC batch exporter can flush
94            rt.block_on(async {
95                // Give the runtime a moment to process pending exports
96                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
97            });
98            // Now shutdown synchronously within the runtime context
99            {
100                let _guard = rt.enter();
101                Self::flush_and_shutdown(tracer, logger);
102            }
103            // Let runtime finish any remaining work
104            rt.shutdown_timeout(std::time::Duration::from_secs(2));
105        } else {
106            Self::flush_and_shutdown(tracer, logger);
107        }
108    }
109
110    /// Flushes and shuts down tracer and logger providers.
111    fn flush_and_shutdown(
112        tracer: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
113        logger: Option<opentelemetry_sdk::logs::SdkLoggerProvider>,
114    ) {
115        if let Some(ref t) = tracer {
116            let _ = t.force_flush();
117        }
118        if let Some(ref l) = logger {
119            let _ = l.force_flush();
120        }
121        Self::shutdown_provider(tracer);
122        Self::shutdown_provider(logger);
123    }
124
125    /// Shuts down the tracing runtime with a timeout.
126    fn shutdown_runtime(runtime: Option<tokio::runtime::Runtime>) {
127        if let Some(rt) = runtime {
128            rt.shutdown_timeout(std::time::Duration::from_secs(2));
129        }
130    }
131
132    /// Waits briefly for the batch exporter to process pending exports.
133    fn wait_for_batch_export(handle: &tokio::runtime::Handle) {
134        handle.block_on(async {
135            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
136        });
137    }
138
139    /// Shuts down a tracer provider if present.
140    fn shutdown_provider<T: ShutdownProvider>(provider: Option<T>) {
141        if let Some(p) = provider {
142            let _ = p.shutdown();
143        }
144    }
145}
146
147/// Trait for providers that can be shut down.
148trait ShutdownProvider {
149    /// Shuts down the provider.
150    fn shutdown(self) -> opentelemetry_sdk::error::OTelSdkResult;
151}
152
153impl ShutdownProvider for opentelemetry_sdk::trace::SdkTracerProvider {
154    fn shutdown(self) -> opentelemetry_sdk::error::OTelSdkResult {
155        Self::shutdown(&self)
156    }
157}
158
159impl ShutdownProvider for opentelemetry_sdk::logs::SdkLoggerProvider {
160    fn shutdown(self) -> opentelemetry_sdk::error::OTelSdkResult {
161        Self::shutdown(&self)
162    }
163}
164
165impl Drop for ObservabilityHandle {
166    fn drop(&mut self) {
167        // If components weren't already shut down, do it now
168        // This handles the case where shutdown() wasn't called explicitly
169        if self.metrics_handle.is_some()
170            || self.tracer_provider.is_some()
171            || self.logger_provider.is_some()
172        {
173            self.shutdown();
174        }
175    }
176}
177
178/// Initializes observability using environment variables.
179///
180/// # Errors
181///
182/// Returns an error if observability has already been initialized or if any
183/// telemetry components fail to initialize.
184pub fn init_from_env(options: InitOptions) -> Result<ObservabilityHandle> {
185    let config = build_config(None, options);
186
187    init(config)
188}
189
190/// Initializes observability from config settings with env overrides.
191///
192/// # Errors
193///
194/// Returns an error if observability has already been initialized or if any
195/// telemetry components fail to initialize.
196pub fn init_from_config(
197    settings: &ObservabilitySettings,
198    options: InitOptions,
199) -> Result<ObservabilityHandle> {
200    let config = build_config(Some(settings), options);
201
202    init(config)
203}
204
205fn build_config(
206    settings: Option<&ObservabilitySettings>,
207    options: InitOptions,
208) -> ObservabilityConfig {
209    let logging = LoggingConfig::from_settings(
210        settings.and_then(|cfg| cfg.logging.as_ref()),
211        options.verbose,
212    );
213    let tracing = TracingConfig::from_settings(settings.and_then(|cfg| cfg.tracing.as_ref()));
214    let metrics = MetricsConfig::from_settings(settings.and_then(|cfg| cfg.metrics.as_ref()));
215
216    ObservabilityConfig {
217        logging,
218        tracing,
219        metrics,
220        metrics_expose: options.metrics_expose,
221    }
222}
223
224/// Initializes logging, tracing, and metrics for the process.
225///
226/// # Errors
227///
228/// Returns an error if observability has already been initialized or if any
229/// telemetry components fail to initialize.
230#[allow(clippy::too_many_lines)]
231pub fn init(config: ObservabilityConfig) -> Result<ObservabilityHandle> {
232    if OBSERVABILITY_INIT.get().is_some() {
233        return Err(Error::OperationFailed {
234            operation: "observability_init".to_string(),
235            cause: "observability already initialized".to_string(),
236        });
237    }
238
239    let metrics_handle = metrics::install_prometheus(&config.metrics, config.metrics_expose)?;
240
241    let tracing_init = tracing::build_tracing(&config.tracing)?;
242    let (tracing_layer, tracer_provider, logs_layer, logger_provider, tracing_runtime) =
243        match tracing_init {
244            Some(init) => (
245                Some(init.layer),
246                Some(init.provider),
247                Some(init.logs_layer),
248                Some(init.logger_provider),
249                init.runtime,
250            ),
251            None => (None, None, None, None, None),
252        };
253
254    // Initialize logging based on format and optional file output
255    match (&config.logging.file, config.logging.format) {
256        (Some(log_file), LogFormat::Json) => {
257            let writer = open_log_file(log_file)?;
258            let json_format = tracing_subscriber::fmt::format()
259                .json()
260                .with_current_span(true)
261                .with_span_list(true);
262            tracing_subscriber::registry()
263                .with(tracing_layer)
264                .with(logs_layer)
265                .with(
266                    tracing_subscriber::fmt::layer()
267                        .event_format(json_format)
268                        .fmt_fields(RedactingJsonFields::default())
269                        .with_writer(writer)
270                        .with_target(true)
271                        .with_thread_ids(true)
272                        .with_thread_names(true),
273                )
274                .with(config.logging.filter)
275                .try_init()
276                .map_err(init_error)?;
277        },
278        (Some(log_file), LogFormat::Pretty) => {
279            let writer = open_log_file(log_file)?;
280            tracing_subscriber::registry()
281                .with(tracing_layer)
282                .with(logs_layer)
283                .with(
284                    tracing_subscriber::fmt::layer()
285                        .with_writer(writer)
286                        .with_ansi(false)
287                        .with_target(true)
288                        .with_thread_ids(true)
289                        .with_thread_names(true),
290                )
291                .with(config.logging.filter)
292                .try_init()
293                .map_err(init_error)?;
294        },
295        (None, LogFormat::Json) => {
296            let json_format = tracing_subscriber::fmt::format()
297                .json()
298                .with_current_span(true)
299                .with_span_list(true);
300            tracing_subscriber::registry()
301                .with(tracing_layer)
302                .with(logs_layer)
303                .with(
304                    tracing_subscriber::fmt::layer()
305                        .event_format(json_format)
306                        .fmt_fields(RedactingJsonFields::default())
307                        .with_target(true)
308                        .with_thread_ids(true)
309                        .with_thread_names(true),
310                )
311                .with(config.logging.filter)
312                .try_init()
313                .map_err(init_error)?;
314        },
315        (None, LogFormat::Pretty) => {
316            tracing_subscriber::registry()
317                .with(tracing_layer)
318                .with(logs_layer)
319                .with(
320                    tracing_subscriber::fmt::layer()
321                        .pretty()
322                        .with_target(true)
323                        .with_thread_ids(true)
324                        .with_thread_names(true),
325                )
326                .with(config.logging.filter)
327                .try_init()
328                .map_err(init_error)?;
329        },
330    }
331
332    OBSERVABILITY_INIT
333        .set(())
334        .map_err(|()| Error::OperationFailed {
335            operation: "observability_init".to_string(),
336            cause: "failed to mark observability initialized".to_string(),
337        })?;
338
339    Ok(ObservabilityHandle {
340        tracer_provider,
341        logger_provider,
342        metrics_handle,
343        tracing_runtime,
344    })
345}
346
347/// Thread-safe file writer for logging.
348#[derive(Clone)]
349struct LogFileWriter {
350    file: Arc<Mutex<File>>,
351}
352
353impl Write for LogFileWriter {
354    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
355        let mut guard = self
356            .file
357            .lock()
358            .map_err(|e| io::Error::other(e.to_string()))?;
359        guard.write(buf)
360    }
361
362    fn flush(&mut self) -> io::Result<()> {
363        let mut guard = self
364            .file
365            .lock()
366            .map_err(|e| io::Error::other(e.to_string()))?;
367        guard.flush()
368    }
369}
370
371impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for LogFileWriter {
372    type Writer = Self;
373
374    fn make_writer(&'a self) -> Self::Writer {
375        self.clone()
376    }
377}
378
379/// Opens a log file for appending.
380fn open_log_file(path: &Path) -> Result<LogFileWriter> {
381    // Create parent directories if they don't exist
382    if let Some(parent) = path.parent() {
383        std::fs::create_dir_all(parent).map_err(|e| Error::OperationFailed {
384            operation: "create_log_dir".to_string(),
385            cause: e.to_string(),
386        })?;
387    }
388
389    let file = OpenOptions::new()
390        .create(true)
391        .append(true)
392        .open(path)
393        .map_err(|e| Error::OperationFailed {
394            operation: "open_log_file".to_string(),
395            cause: format!("{}: {}", path.display(), e),
396        })?;
397
398    Ok(LogFileWriter {
399        file: Arc::new(Mutex::new(file)),
400    })
401}
402
403/// Helper to convert init errors.
404#[allow(clippy::needless_pass_by_value)]
405fn init_error(e: tracing_subscriber::util::TryInitError) -> Error {
406    Error::OperationFailed {
407        operation: "observability_init".to_string(),
408        cause: e.to_string(),
409    }
410}