Skip to main content

subcog/observability/
metrics.rs

1//! Prometheus metrics.
2
3use crate::config::{MetricsPushGatewaySettings, MetricsSettings};
4use crate::{Error, Result};
5use metrics_exporter_prometheus::PrometheusBuilder;
6use metrics_exporter_prometheus::PrometheusHandle;
7use metrics_exporter_prometheus::PrometheusRecorder;
8use reqwest::blocking::Client;
9use reqwest::header::CONTENT_TYPE;
10use std::net::{IpAddr, Ipv4Addr, SocketAddr};
11use std::sync::{Arc, OnceLock};
12use std::thread;
13
14/// Push gateway configuration.
15#[derive(Debug, Clone)]
16pub struct PushGatewayConfig {
17    /// Push gateway endpoint URI.
18    pub endpoint: String,
19    /// Optional username for basic auth.
20    pub username: Option<String>,
21    /// Optional password for basic auth.
22    pub password: Option<String>,
23    /// Whether to use HTTP POST instead of PUT.
24    pub use_http_post: bool,
25}
26
27/// Metrics configuration.
28#[derive(Debug, Clone)]
29pub struct MetricsConfig {
30    /// Whether metrics are enabled.
31    pub enabled: bool,
32    /// Address to bind the metrics exporter.
33    pub listen_addr: SocketAddr,
34    /// Optional push gateway configuration.
35    pub push_gateway: Option<PushGatewayConfig>,
36}
37
38impl MetricsConfig {
39    /// Builds metrics configuration from environment variables.
40    #[must_use]
41    pub fn from_env() -> Self {
42        Self::from_settings(None)
43    }
44
45    /// Builds metrics configuration from config settings with env overrides.
46    #[must_use]
47    pub fn from_settings(settings: Option<&MetricsSettings>) -> Self {
48        let enabled = settings.and_then(|config| config.enabled).unwrap_or(false);
49        let port = settings.and_then(|config| config.port).unwrap_or(9090);
50        let push_gateway = settings
51            .and_then(|config| config.push_gateway.as_ref())
52            .and_then(parse_push_gateway_settings);
53
54        let mut config = Self {
55            enabled,
56            listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port),
57            push_gateway,
58        };
59
60        if let Some(enabled) = parse_bool_env("SUBCOG_METRICS_ENABLED") {
61            config.enabled = enabled;
62        }
63        if let Some(port) = parse_port_env("SUBCOG_METRICS_PORT") {
64            config.listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
65        }
66        apply_push_gateway_env_overrides(&mut config);
67
68        config
69    }
70}
71
72/// Metrics collector.
73pub struct Metrics;
74
75impl Metrics {
76    /// Creates a new metrics collector.
77    #[must_use]
78    pub const fn new() -> Self {
79        Self
80    }
81}
82
83impl Default for Metrics {
84    fn default() -> Self {
85        Self::new()
86    }
87}
88
89/// Metrics handle for flushing on shutdown.
90#[derive(Debug)]
91pub struct MetricsHandle {
92    prometheus: PrometheusHandle,
93    push_gateway: Option<PushGatewayConfig>,
94}
95
96/// Global metrics handle for flush-on-demand.
97static GLOBAL_METRICS: OnceLock<Arc<MetricsHandle>> = OnceLock::new();
98
99/// Global metrics instance label for push gateway grouping.
100static METRICS_INSTANCE: OnceLock<String> = OnceLock::new();
101
102/// Sets the metrics instance label for push gateway grouping.
103///
104/// This allows hooks and MCP server to push to separate metric groups.
105/// Must be called before any metrics are flushed. Only the first call
106/// takes effect (subsequent calls are ignored).
107pub fn set_instance_label(label: &str) {
108    let _ = METRICS_INSTANCE.set(label.to_string());
109}
110
111/// Gets the configured instance label, if any.
112fn get_instance_label() -> Option<&'static str> {
113    METRICS_INSTANCE.get().map(String::as_str)
114}
115
116/// Flushes metrics to the push gateway if configured.
117///
118/// This can be called from anywhere to push metrics immediately,
119/// useful for short-lived processes like MCP server requests.
120pub fn flush_global() {
121    if let Some(handle) = GLOBAL_METRICS.get() {
122        flush(handle);
123    }
124}
125
126/// Installs the Prometheus metrics recorder and HTTP listener.
127pub fn install_prometheus(config: &MetricsConfig, expose: bool) -> Result<Option<MetricsHandle>> {
128    if !config.enabled {
129        return Ok(None);
130    }
131
132    let builder = PrometheusBuilder::new();
133    let prometheus_handle = if expose {
134        let builder = builder.with_http_listener(config.listen_addr);
135        install_listener(builder)?
136    } else {
137        builder
138            .install_recorder()
139            .map_err(|e| Error::OperationFailed {
140                operation: "metrics_recorder_install".to_string(),
141                cause: e.to_string(),
142            })?
143    };
144
145    let handle = MetricsHandle {
146        prometheus: prometheus_handle,
147        push_gateway: config.push_gateway.clone(),
148    };
149
150    // Store globally for flush_global() access
151    let _ = GLOBAL_METRICS.set(Arc::new(MetricsHandle {
152        prometheus: handle.prometheus.clone(),
153        push_gateway: handle.push_gateway.clone(),
154    }));
155
156    Ok(Some(handle))
157}
158
159/// Flushes metrics to the push gateway if configured.
160///
161/// When called from within a tokio runtime, this spawns a separate thread
162/// to avoid runtime nesting issues with `reqwest::blocking::Client`.
163pub fn flush(handle: &MetricsHandle) {
164    let Some(push_gateway) = &handle.push_gateway else {
165        tracing::debug!("No push gateway configured, skipping flush");
166        return;
167    };
168
169    let mut payload = handle.prometheus.render();
170
171    // Ensure payload ends with newline (required by push gateway)
172    if !payload.ends_with('\n') {
173        payload.push('\n');
174    }
175
176    // Support instance label for push gateway grouping
177    // This allows hooks and MCP server to push to separate metric groups
178    let endpoint = get_instance_label().map_or_else(
179        || push_gateway.endpoint.clone(),
180        |instance| {
181            format!(
182                "{}/instance/{instance}",
183                push_gateway.endpoint.trim_end_matches('/')
184            )
185        },
186    );
187    let use_http_post = push_gateway.use_http_post;
188    let username = push_gateway.username.clone();
189    let password = push_gateway.password.clone();
190
191    tracing::debug!(
192        bytes = payload.len(),
193        endpoint = %endpoint,
194        instance = ?get_instance_label(),
195        "Pushing metrics to push gateway"
196    );
197
198    // Check if we're in a tokio runtime - if so, spawn a thread to avoid
199    // runtime nesting issues with reqwest::blocking::Client
200    if tokio::runtime::Handle::try_current().is_ok() {
201        // Spawn thread and wait for completion to ensure metrics are pushed
202        let handle = thread::spawn(move || {
203            flush_to_gateway(
204                &endpoint,
205                payload,
206                use_http_post,
207                username.as_deref(),
208                password.as_deref(),
209            );
210        });
211        // Wait for the flush to complete (with timeout)
212        let _ = handle.join();
213    } else {
214        flush_to_gateway(
215            &endpoint,
216            payload,
217            use_http_post,
218            username.as_deref(),
219            password.as_deref(),
220        );
221    }
222}
223
224/// Internal function to push metrics to the gateway.
225fn flush_to_gateway(
226    endpoint: &str,
227    payload: String,
228    use_http_post: bool,
229    username: Option<&str>,
230    password: Option<&str>,
231) {
232    let client = Client::new();
233
234    let request = if use_http_post {
235        client.post(endpoint)
236    } else {
237        client.put(endpoint)
238    };
239
240    let request = if let Some(username) = username {
241        request.basic_auth(username, password)
242    } else {
243        request
244    };
245
246    // Use timeout to ensure connection is properly established
247    let response = request
248        .header(CONTENT_TYPE, "text/plain; version=0.0.4")
249        .timeout(std::time::Duration::from_secs(5))
250        .body(payload)
251        .send();
252
253    match response {
254        Ok(resp) => {
255            if resp.status().is_success() {
256                tracing::debug!(status = %resp.status(), "Metrics pushed successfully");
257            } else {
258                tracing::warn!(status = %resp.status(), "Metrics push failed");
259            }
260        },
261        Err(err) => {
262            tracing::warn!("Failed to push metrics: {err}");
263        },
264    }
265}
266
267fn install_listener(builder: PrometheusBuilder) -> Result<PrometheusHandle> {
268    if let Ok(handle) = tokio::runtime::Handle::try_current() {
269        return install_with_runtime(builder, &handle);
270    }
271    let runtime = tokio::runtime::Builder::new_current_thread()
272        .enable_all()
273        .build()
274        .map_err(|e| Error::OperationFailed {
275            operation: "metrics_runtime_init".to_string(),
276            cause: e.to_string(),
277        })?;
278    let handle = runtime.handle().clone();
279    let prometheus = install_with_runtime(builder, &handle)?;
280    let thread_name = "metrics-exporter-prometheus-http".to_string();
281    thread::Builder::new()
282        .name(thread_name)
283        .spawn(move || runtime.block_on(async { std::future::pending::<()>().await }))
284        .map_err(|e| Error::OperationFailed {
285            operation: "metrics_runtime_thread".to_string(),
286            cause: e.to_string(),
287        })?;
288    Ok(prometheus)
289}
290
291fn install_with_runtime(
292    builder: PrometheusBuilder,
293    runtime_handle: &tokio::runtime::Handle,
294) -> Result<PrometheusHandle> {
295    let (recorder, exporter) = {
296        let _guard = runtime_handle.enter();
297        builder.build().map_err(|e| Error::OperationFailed {
298            operation: "metrics_exporter_build".to_string(),
299            cause: e.to_string(),
300        })?
301    };
302    let handle = recorder.handle();
303    set_global_recorder(recorder)?;
304    runtime_handle.spawn(exporter);
305    Ok(handle)
306}
307
308fn set_global_recorder(recorder: PrometheusRecorder) -> Result<()> {
309    metrics::set_global_recorder(recorder).map_err(|e| Error::OperationFailed {
310        operation: "metrics_recorder_install".to_string(),
311        cause: e.to_string(),
312    })
313}
314
315fn parse_bool_env(key: &str) -> Option<bool> {
316    std::env::var(key).ok().map(|value| {
317        let value = value.to_lowercase();
318        value == "true" || value == "1" || value == "yes"
319    })
320}
321
322fn parse_port_env(key: &str) -> Option<u16> {
323    std::env::var(key)
324        .ok()
325        .and_then(|value| value.parse::<u16>().ok())
326}
327
328fn parse_string_env(key: &str) -> Option<String> {
329    std::env::var(key)
330        .ok()
331        .map(|value| value.trim().to_string())
332        .filter(|value| !value.is_empty())
333}
334
335fn parse_push_gateway_settings(settings: &MetricsPushGatewaySettings) -> Option<PushGatewayConfig> {
336    let endpoint = settings
337        .endpoint
338        .as_ref()
339        .map(|value| value.trim().to_string())
340        .filter(|value| !value.is_empty())?;
341
342    let username = settings
343        .username
344        .as_ref()
345        .map(|value| value.trim().to_string())
346        .filter(|value| !value.is_empty());
347    let password = settings
348        .password
349        .as_ref()
350        .map(|value| value.trim().to_string())
351        .filter(|value| !value.is_empty());
352    // Default to POST (accumulates metrics) instead of PUT (replaces all metrics)
353    // POST is better for multi-hook scenarios where each hook pushes independently
354    let use_http_post = settings.use_http_post.unwrap_or(true);
355
356    Some(PushGatewayConfig {
357        endpoint,
358        username,
359        password,
360        use_http_post,
361    })
362}
363
364fn apply_push_gateway_env_overrides(config: &mut MetricsConfig) {
365    let endpoint = parse_string_env("SUBCOG_METRICS_PUSH_GATEWAY_ENDPOINT");
366    let username = parse_string_env("SUBCOG_METRICS_PUSH_GATEWAY_USERNAME");
367    let password = parse_string_env("SUBCOG_METRICS_PUSH_GATEWAY_PASSWORD");
368    let use_http_post = parse_bool_env("SUBCOG_METRICS_PUSH_GATEWAY_USE_POST");
369
370    if endpoint.is_none() && username.is_none() && password.is_none() && use_http_post.is_none() {
371        return;
372    }
373
374    let mut current = config.push_gateway.clone().unwrap_or(PushGatewayConfig {
375        endpoint: String::new(),
376        username: None,
377        password: None,
378        use_http_post: false,
379    });
380
381    if let Some(endpoint) = endpoint {
382        current.endpoint = endpoint;
383    }
384    if let Some(username) = username {
385        current.username = Some(username);
386    }
387    if let Some(password) = password {
388        current.password = Some(password);
389    }
390    if let Some(use_http_post) = use_http_post {
391        current.use_http_post = use_http_post;
392    }
393
394    if current.endpoint.trim().is_empty() {
395        return;
396    }
397
398    config.push_gateway = Some(current);
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    #[test]
406    fn test_metrics_registry_smoke() {
407        let recorder = PrometheusBuilder::new().build_recorder();
408        let handle = recorder.handle();
409        if metrics::set_global_recorder(recorder).is_err() {
410            return;
411        }
412
413        metrics::counter!("test_metrics_registry_total").increment(1);
414        let rendered = handle.render();
415        assert!(rendered.contains("test_metrics_registry_total"));
416    }
417}