1mod event_bus;
4mod logging;
5mod metrics;
6mod otlp;
7mod request_context;
8mod tracing;
9
10pub use event_bus::{EventBus, global_event_bus};
11use logging::RedactingJsonFields;
12pub use logging::{LogFormat, Logger, LoggingConfig};
13pub use metrics::{Metrics, MetricsConfig, flush_global as flush_metrics, set_instance_label};
14pub use otlp::{OtlpConfig, OtlpExporter, OtlpProtocol};
15pub use request_context::{
16 RequestContext, current_request_id, enter_request_context, scope_request_context,
17};
18pub use tracing::{Tracer, TracingConfig};
19
20use crate::config::ObservabilitySettings;
21use crate::{Error, Result};
22use std::fs::{File, OpenOptions};
23use std::io::{self, Write};
24use std::path::Path;
25use std::sync::{Arc, Mutex, OnceLock};
26use tracing_subscriber::layer::SubscriberExt;
27use tracing_subscriber::util::SubscriberInitExt;
28
29#[derive(Debug, Clone)]
31pub struct ObservabilityConfig {
32 pub logging: LoggingConfig,
34 pub tracing: TracingConfig,
36 pub metrics: MetricsConfig,
38 pub metrics_expose: bool,
40}
41
42#[derive(Debug, Clone, Copy)]
44pub struct InitOptions {
45 pub verbose: bool,
47 pub metrics_expose: bool,
49}
50
51pub struct ObservabilityHandle {
53 tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
54 logger_provider: Option<opentelemetry_sdk::logs::SdkLoggerProvider>,
55 metrics_handle: Option<metrics::MetricsHandle>,
56 tracing_runtime: Option<tokio::runtime::Runtime>,
57}
58
59static OBSERVABILITY_INIT: OnceLock<()> = OnceLock::new();
60
61impl ObservabilityHandle {
62 pub fn shutdown(&mut self) {
67 if let Some(handle) = self.metrics_handle.take() {
68 metrics::flush(&handle);
69 }
70
71 let tracer = self.tracer_provider.take();
73 let logger = self.logger_provider.take();
74
75 if tracer.is_none() && logger.is_none() {
76 let _ = self.tracing_runtime.take();
77 return;
78 }
79
80 let tracing_runtime = self.tracing_runtime.take();
82
83 if let Ok(handle) = tokio::runtime::Handle::try_current() {
85 tokio::task::block_in_place(|| {
88 Self::wait_for_batch_export(&handle);
89 Self::flush_and_shutdown(tracer, logger);
90 Self::shutdown_runtime(tracing_runtime);
91 });
92 } else if let Some(rt) = tracing_runtime {
93 rt.block_on(async {
95 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
97 });
98 {
100 let _guard = rt.enter();
101 Self::flush_and_shutdown(tracer, logger);
102 }
103 rt.shutdown_timeout(std::time::Duration::from_secs(2));
105 } else {
106 Self::flush_and_shutdown(tracer, logger);
107 }
108 }
109
110 fn flush_and_shutdown(
112 tracer: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
113 logger: Option<opentelemetry_sdk::logs::SdkLoggerProvider>,
114 ) {
115 if let Some(ref t) = tracer {
116 let _ = t.force_flush();
117 }
118 if let Some(ref l) = logger {
119 let _ = l.force_flush();
120 }
121 Self::shutdown_provider(tracer);
122 Self::shutdown_provider(logger);
123 }
124
125 fn shutdown_runtime(runtime: Option<tokio::runtime::Runtime>) {
127 if let Some(rt) = runtime {
128 rt.shutdown_timeout(std::time::Duration::from_secs(2));
129 }
130 }
131
132 fn wait_for_batch_export(handle: &tokio::runtime::Handle) {
134 handle.block_on(async {
135 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
136 });
137 }
138
139 fn shutdown_provider<T: ShutdownProvider>(provider: Option<T>) {
141 if let Some(p) = provider {
142 let _ = p.shutdown();
143 }
144 }
145}
146
147trait ShutdownProvider {
149 fn shutdown(self) -> opentelemetry_sdk::error::OTelSdkResult;
151}
152
153impl ShutdownProvider for opentelemetry_sdk::trace::SdkTracerProvider {
154 fn shutdown(self) -> opentelemetry_sdk::error::OTelSdkResult {
155 Self::shutdown(&self)
156 }
157}
158
159impl ShutdownProvider for opentelemetry_sdk::logs::SdkLoggerProvider {
160 fn shutdown(self) -> opentelemetry_sdk::error::OTelSdkResult {
161 Self::shutdown(&self)
162 }
163}
164
165impl Drop for ObservabilityHandle {
166 fn drop(&mut self) {
167 if self.metrics_handle.is_some()
170 || self.tracer_provider.is_some()
171 || self.logger_provider.is_some()
172 {
173 self.shutdown();
174 }
175 }
176}
177
178pub fn init_from_env(options: InitOptions) -> Result<ObservabilityHandle> {
185 let config = build_config(None, options);
186
187 init(config)
188}
189
190pub fn init_from_config(
197 settings: &ObservabilitySettings,
198 options: InitOptions,
199) -> Result<ObservabilityHandle> {
200 let config = build_config(Some(settings), options);
201
202 init(config)
203}
204
205fn build_config(
206 settings: Option<&ObservabilitySettings>,
207 options: InitOptions,
208) -> ObservabilityConfig {
209 let logging = LoggingConfig::from_settings(
210 settings.and_then(|cfg| cfg.logging.as_ref()),
211 options.verbose,
212 );
213 let tracing = TracingConfig::from_settings(settings.and_then(|cfg| cfg.tracing.as_ref()));
214 let metrics = MetricsConfig::from_settings(settings.and_then(|cfg| cfg.metrics.as_ref()));
215
216 ObservabilityConfig {
217 logging,
218 tracing,
219 metrics,
220 metrics_expose: options.metrics_expose,
221 }
222}
223
224#[allow(clippy::too_many_lines)]
231pub fn init(config: ObservabilityConfig) -> Result<ObservabilityHandle> {
232 if OBSERVABILITY_INIT.get().is_some() {
233 return Err(Error::OperationFailed {
234 operation: "observability_init".to_string(),
235 cause: "observability already initialized".to_string(),
236 });
237 }
238
239 let metrics_handle = metrics::install_prometheus(&config.metrics, config.metrics_expose)?;
240
241 let tracing_init = tracing::build_tracing(&config.tracing)?;
242 let (tracing_layer, tracer_provider, logs_layer, logger_provider, tracing_runtime) =
243 match tracing_init {
244 Some(init) => (
245 Some(init.layer),
246 Some(init.provider),
247 Some(init.logs_layer),
248 Some(init.logger_provider),
249 init.runtime,
250 ),
251 None => (None, None, None, None, None),
252 };
253
254 match (&config.logging.file, config.logging.format) {
256 (Some(log_file), LogFormat::Json) => {
257 let writer = open_log_file(log_file)?;
258 let json_format = tracing_subscriber::fmt::format()
259 .json()
260 .with_current_span(true)
261 .with_span_list(true);
262 tracing_subscriber::registry()
263 .with(tracing_layer)
264 .with(logs_layer)
265 .with(
266 tracing_subscriber::fmt::layer()
267 .event_format(json_format)
268 .fmt_fields(RedactingJsonFields::default())
269 .with_writer(writer)
270 .with_target(true)
271 .with_thread_ids(true)
272 .with_thread_names(true),
273 )
274 .with(config.logging.filter)
275 .try_init()
276 .map_err(init_error)?;
277 },
278 (Some(log_file), LogFormat::Pretty) => {
279 let writer = open_log_file(log_file)?;
280 tracing_subscriber::registry()
281 .with(tracing_layer)
282 .with(logs_layer)
283 .with(
284 tracing_subscriber::fmt::layer()
285 .with_writer(writer)
286 .with_ansi(false)
287 .with_target(true)
288 .with_thread_ids(true)
289 .with_thread_names(true),
290 )
291 .with(config.logging.filter)
292 .try_init()
293 .map_err(init_error)?;
294 },
295 (None, LogFormat::Json) => {
296 let json_format = tracing_subscriber::fmt::format()
297 .json()
298 .with_current_span(true)
299 .with_span_list(true);
300 tracing_subscriber::registry()
301 .with(tracing_layer)
302 .with(logs_layer)
303 .with(
304 tracing_subscriber::fmt::layer()
305 .event_format(json_format)
306 .fmt_fields(RedactingJsonFields::default())
307 .with_target(true)
308 .with_thread_ids(true)
309 .with_thread_names(true),
310 )
311 .with(config.logging.filter)
312 .try_init()
313 .map_err(init_error)?;
314 },
315 (None, LogFormat::Pretty) => {
316 tracing_subscriber::registry()
317 .with(tracing_layer)
318 .with(logs_layer)
319 .with(
320 tracing_subscriber::fmt::layer()
321 .pretty()
322 .with_target(true)
323 .with_thread_ids(true)
324 .with_thread_names(true),
325 )
326 .with(config.logging.filter)
327 .try_init()
328 .map_err(init_error)?;
329 },
330 }
331
332 OBSERVABILITY_INIT
333 .set(())
334 .map_err(|()| Error::OperationFailed {
335 operation: "observability_init".to_string(),
336 cause: "failed to mark observability initialized".to_string(),
337 })?;
338
339 Ok(ObservabilityHandle {
340 tracer_provider,
341 logger_provider,
342 metrics_handle,
343 tracing_runtime,
344 })
345}
346
347#[derive(Clone)]
349struct LogFileWriter {
350 file: Arc<Mutex<File>>,
351}
352
353impl Write for LogFileWriter {
354 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
355 let mut guard = self
356 .file
357 .lock()
358 .map_err(|e| io::Error::other(e.to_string()))?;
359 guard.write(buf)
360 }
361
362 fn flush(&mut self) -> io::Result<()> {
363 let mut guard = self
364 .file
365 .lock()
366 .map_err(|e| io::Error::other(e.to_string()))?;
367 guard.flush()
368 }
369}
370
371impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for LogFileWriter {
372 type Writer = Self;
373
374 fn make_writer(&'a self) -> Self::Writer {
375 self.clone()
376 }
377}
378
379fn open_log_file(path: &Path) -> Result<LogFileWriter> {
381 if let Some(parent) = path.parent() {
383 std::fs::create_dir_all(parent).map_err(|e| Error::OperationFailed {
384 operation: "create_log_dir".to_string(),
385 cause: e.to_string(),
386 })?;
387 }
388
389 let file = OpenOptions::new()
390 .create(true)
391 .append(true)
392 .open(path)
393 .map_err(|e| Error::OperationFailed {
394 operation: "open_log_file".to_string(),
395 cause: format!("{}: {}", path.display(), e),
396 })?;
397
398 Ok(LogFileWriter {
399 file: Arc::new(Mutex::new(file)),
400 })
401}
402
403#[allow(clippy::needless_pass_by_value)]
405fn init_error(e: tracing_subscriber::util::TryInitError) -> Error {
406 Error::OperationFailed {
407 operation: "observability_init".to_string(),
408 cause: e.to_string(),
409 }
410}