subcog/observability/
metrics.rs1use crate::config::{MetricsPushGatewaySettings, MetricsSettings};
4use crate::{Error, Result};
5use metrics_exporter_prometheus::PrometheusBuilder;
6use metrics_exporter_prometheus::PrometheusHandle;
7use metrics_exporter_prometheus::PrometheusRecorder;
8use reqwest::blocking::Client;
9use reqwest::header::CONTENT_TYPE;
10use std::net::{IpAddr, Ipv4Addr, SocketAddr};
11use std::sync::{Arc, OnceLock};
12use std::thread;
13
14#[derive(Debug, Clone)]
16pub struct PushGatewayConfig {
17 pub endpoint: String,
19 pub username: Option<String>,
21 pub password: Option<String>,
23 pub use_http_post: bool,
25}
26
27#[derive(Debug, Clone)]
29pub struct MetricsConfig {
30 pub enabled: bool,
32 pub listen_addr: SocketAddr,
34 pub push_gateway: Option<PushGatewayConfig>,
36}
37
38impl MetricsConfig {
39 #[must_use]
41 pub fn from_env() -> Self {
42 Self::from_settings(None)
43 }
44
45 #[must_use]
47 pub fn from_settings(settings: Option<&MetricsSettings>) -> Self {
48 let enabled = settings.and_then(|config| config.enabled).unwrap_or(false);
49 let port = settings.and_then(|config| config.port).unwrap_or(9090);
50 let push_gateway = settings
51 .and_then(|config| config.push_gateway.as_ref())
52 .and_then(parse_push_gateway_settings);
53
54 let mut config = Self {
55 enabled,
56 listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port),
57 push_gateway,
58 };
59
60 if let Some(enabled) = parse_bool_env("SUBCOG_METRICS_ENABLED") {
61 config.enabled = enabled;
62 }
63 if let Some(port) = parse_port_env("SUBCOG_METRICS_PORT") {
64 config.listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
65 }
66 apply_push_gateway_env_overrides(&mut config);
67
68 config
69 }
70}
71
72pub struct Metrics;
74
75impl Metrics {
76 #[must_use]
78 pub const fn new() -> Self {
79 Self
80 }
81}
82
83impl Default for Metrics {
84 fn default() -> Self {
85 Self::new()
86 }
87}
88
89#[derive(Debug)]
91pub struct MetricsHandle {
92 prometheus: PrometheusHandle,
93 push_gateway: Option<PushGatewayConfig>,
94}
95
96static GLOBAL_METRICS: OnceLock<Arc<MetricsHandle>> = OnceLock::new();
98
99static METRICS_INSTANCE: OnceLock<String> = OnceLock::new();
101
102pub fn set_instance_label(label: &str) {
108 let _ = METRICS_INSTANCE.set(label.to_string());
109}
110
111fn get_instance_label() -> Option<&'static str> {
113 METRICS_INSTANCE.get().map(String::as_str)
114}
115
116pub fn flush_global() {
121 if let Some(handle) = GLOBAL_METRICS.get() {
122 flush(handle);
123 }
124}
125
126pub fn install_prometheus(config: &MetricsConfig, expose: bool) -> Result<Option<MetricsHandle>> {
128 if !config.enabled {
129 return Ok(None);
130 }
131
132 let builder = PrometheusBuilder::new();
133 let prometheus_handle = if expose {
134 let builder = builder.with_http_listener(config.listen_addr);
135 install_listener(builder)?
136 } else {
137 builder
138 .install_recorder()
139 .map_err(|e| Error::OperationFailed {
140 operation: "metrics_recorder_install".to_string(),
141 cause: e.to_string(),
142 })?
143 };
144
145 let handle = MetricsHandle {
146 prometheus: prometheus_handle,
147 push_gateway: config.push_gateway.clone(),
148 };
149
150 let _ = GLOBAL_METRICS.set(Arc::new(MetricsHandle {
152 prometheus: handle.prometheus.clone(),
153 push_gateway: handle.push_gateway.clone(),
154 }));
155
156 Ok(Some(handle))
157}
158
159pub fn flush(handle: &MetricsHandle) {
164 let Some(push_gateway) = &handle.push_gateway else {
165 tracing::debug!("No push gateway configured, skipping flush");
166 return;
167 };
168
169 let mut payload = handle.prometheus.render();
170
171 if !payload.ends_with('\n') {
173 payload.push('\n');
174 }
175
176 let endpoint = get_instance_label().map_or_else(
179 || push_gateway.endpoint.clone(),
180 |instance| {
181 format!(
182 "{}/instance/{instance}",
183 push_gateway.endpoint.trim_end_matches('/')
184 )
185 },
186 );
187 let use_http_post = push_gateway.use_http_post;
188 let username = push_gateway.username.clone();
189 let password = push_gateway.password.clone();
190
191 tracing::debug!(
192 bytes = payload.len(),
193 endpoint = %endpoint,
194 instance = ?get_instance_label(),
195 "Pushing metrics to push gateway"
196 );
197
198 if tokio::runtime::Handle::try_current().is_ok() {
201 let handle = thread::spawn(move || {
203 flush_to_gateway(
204 &endpoint,
205 payload,
206 use_http_post,
207 username.as_deref(),
208 password.as_deref(),
209 );
210 });
211 let _ = handle.join();
213 } else {
214 flush_to_gateway(
215 &endpoint,
216 payload,
217 use_http_post,
218 username.as_deref(),
219 password.as_deref(),
220 );
221 }
222}
223
224fn flush_to_gateway(
226 endpoint: &str,
227 payload: String,
228 use_http_post: bool,
229 username: Option<&str>,
230 password: Option<&str>,
231) {
232 let client = Client::new();
233
234 let request = if use_http_post {
235 client.post(endpoint)
236 } else {
237 client.put(endpoint)
238 };
239
240 let request = if let Some(username) = username {
241 request.basic_auth(username, password)
242 } else {
243 request
244 };
245
246 let response = request
248 .header(CONTENT_TYPE, "text/plain; version=0.0.4")
249 .timeout(std::time::Duration::from_secs(5))
250 .body(payload)
251 .send();
252
253 match response {
254 Ok(resp) => {
255 if resp.status().is_success() {
256 tracing::debug!(status = %resp.status(), "Metrics pushed successfully");
257 } else {
258 tracing::warn!(status = %resp.status(), "Metrics push failed");
259 }
260 },
261 Err(err) => {
262 tracing::warn!("Failed to push metrics: {err}");
263 },
264 }
265}
266
267fn install_listener(builder: PrometheusBuilder) -> Result<PrometheusHandle> {
268 if let Ok(handle) = tokio::runtime::Handle::try_current() {
269 return install_with_runtime(builder, &handle);
270 }
271 let runtime = tokio::runtime::Builder::new_current_thread()
272 .enable_all()
273 .build()
274 .map_err(|e| Error::OperationFailed {
275 operation: "metrics_runtime_init".to_string(),
276 cause: e.to_string(),
277 })?;
278 let handle = runtime.handle().clone();
279 let prometheus = install_with_runtime(builder, &handle)?;
280 let thread_name = "metrics-exporter-prometheus-http".to_string();
281 thread::Builder::new()
282 .name(thread_name)
283 .spawn(move || runtime.block_on(async { std::future::pending::<()>().await }))
284 .map_err(|e| Error::OperationFailed {
285 operation: "metrics_runtime_thread".to_string(),
286 cause: e.to_string(),
287 })?;
288 Ok(prometheus)
289}
290
291fn install_with_runtime(
292 builder: PrometheusBuilder,
293 runtime_handle: &tokio::runtime::Handle,
294) -> Result<PrometheusHandle> {
295 let (recorder, exporter) = {
296 let _guard = runtime_handle.enter();
297 builder.build().map_err(|e| Error::OperationFailed {
298 operation: "metrics_exporter_build".to_string(),
299 cause: e.to_string(),
300 })?
301 };
302 let handle = recorder.handle();
303 set_global_recorder(recorder)?;
304 runtime_handle.spawn(exporter);
305 Ok(handle)
306}
307
308fn set_global_recorder(recorder: PrometheusRecorder) -> Result<()> {
309 metrics::set_global_recorder(recorder).map_err(|e| Error::OperationFailed {
310 operation: "metrics_recorder_install".to_string(),
311 cause: e.to_string(),
312 })
313}
314
315fn parse_bool_env(key: &str) -> Option<bool> {
316 std::env::var(key).ok().map(|value| {
317 let value = value.to_lowercase();
318 value == "true" || value == "1" || value == "yes"
319 })
320}
321
322fn parse_port_env(key: &str) -> Option<u16> {
323 std::env::var(key)
324 .ok()
325 .and_then(|value| value.parse::<u16>().ok())
326}
327
328fn parse_string_env(key: &str) -> Option<String> {
329 std::env::var(key)
330 .ok()
331 .map(|value| value.trim().to_string())
332 .filter(|value| !value.is_empty())
333}
334
335fn parse_push_gateway_settings(settings: &MetricsPushGatewaySettings) -> Option<PushGatewayConfig> {
336 let endpoint = settings
337 .endpoint
338 .as_ref()
339 .map(|value| value.trim().to_string())
340 .filter(|value| !value.is_empty())?;
341
342 let username = settings
343 .username
344 .as_ref()
345 .map(|value| value.trim().to_string())
346 .filter(|value| !value.is_empty());
347 let password = settings
348 .password
349 .as_ref()
350 .map(|value| value.trim().to_string())
351 .filter(|value| !value.is_empty());
352 let use_http_post = settings.use_http_post.unwrap_or(true);
355
356 Some(PushGatewayConfig {
357 endpoint,
358 username,
359 password,
360 use_http_post,
361 })
362}
363
364fn apply_push_gateway_env_overrides(config: &mut MetricsConfig) {
365 let endpoint = parse_string_env("SUBCOG_METRICS_PUSH_GATEWAY_ENDPOINT");
366 let username = parse_string_env("SUBCOG_METRICS_PUSH_GATEWAY_USERNAME");
367 let password = parse_string_env("SUBCOG_METRICS_PUSH_GATEWAY_PASSWORD");
368 let use_http_post = parse_bool_env("SUBCOG_METRICS_PUSH_GATEWAY_USE_POST");
369
370 if endpoint.is_none() && username.is_none() && password.is_none() && use_http_post.is_none() {
371 return;
372 }
373
374 let mut current = config.push_gateway.clone().unwrap_or(PushGatewayConfig {
375 endpoint: String::new(),
376 username: None,
377 password: None,
378 use_http_post: false,
379 });
380
381 if let Some(endpoint) = endpoint {
382 current.endpoint = endpoint;
383 }
384 if let Some(username) = username {
385 current.username = Some(username);
386 }
387 if let Some(password) = password {
388 current.password = Some(password);
389 }
390 if let Some(use_http_post) = use_http_post {
391 current.use_http_post = use_http_post;
392 }
393
394 if current.endpoint.trim().is_empty() {
395 return;
396 }
397
398 config.push_gateway = Some(current);
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404
405 #[test]
406 fn test_metrics_registry_smoke() {
407 let recorder = PrometheusBuilder::new().build_recorder();
408 let handle = recorder.handle();
409 if metrics::set_global_recorder(recorder).is_err() {
410 return;
411 }
412
413 metrics::counter!("test_metrics_registry_total").increment(1);
414 let rendered = handle.render();
415 assert!(rendered.contains("test_metrics_registry_total"));
416 }
417}