Skip to main content

subcog/webhooks/
delivery.rs

1//! Webhook delivery backend trait and HTTP implementation.
2//!
3//! This module defines the `WebhookDelivery` trait for abstracting webhook
4//! delivery, and provides an HTTP implementation using `reqwest`.
5//!
6//! # Retry Strategy
7//!
8//! The HTTP delivery backend implements exponential backoff with the following
9//! default configuration:
10//! - Max retries: 3
11//! - Base delay: 1 second
12//! - Delays: 1s, 2s, 4s (exponential backoff)
13//!
14//! # Error Handling
15//!
16//! - Network errors: Retry with backoff
17//! - 4xx client errors: No retry, log failure
18//! - 5xx server errors: Retry with backoff
19//! - Timeout: Retry with backoff
20
21use super::config::{WebhookAuth, WebhookEndpoint};
22use super::payload::WebhookPayload;
23use crate::Result;
24use secrecy::ExposeSecret;
25use std::time::Duration;
26
27/// Result of a webhook delivery attempt.
28#[derive(Debug, Clone)]
29pub struct DeliveryResult {
30    /// Whether the delivery was successful.
31    pub success: bool,
32
33    /// HTTP status code (if available).
34    pub status_code: Option<u16>,
35
36    /// Number of attempts made.
37    pub attempts: u32,
38
39    /// Total duration in milliseconds.
40    pub duration_ms: u64,
41
42    /// Error message (if failed).
43    pub error: Option<String>,
44}
45
46impl DeliveryResult {
47    /// Creates a successful delivery result.
48    #[must_use]
49    pub const fn success(status_code: u16, attempts: u32, duration_ms: u64) -> Self {
50        Self {
51            success: true,
52            status_code: Some(status_code),
53            attempts,
54            duration_ms,
55            error: None,
56        }
57    }
58
59    /// Creates a failed delivery result.
60    #[must_use]
61    pub const fn failure(error: String, attempts: u32, duration_ms: u64) -> Self {
62        Self {
63            success: false,
64            status_code: None,
65            attempts,
66            duration_ms,
67            error: Some(error),
68        }
69    }
70
71    /// Creates a failed delivery result with status code.
72    #[must_use]
73    pub const fn failure_with_status(
74        status_code: u16,
75        error: String,
76        attempts: u32,
77        duration_ms: u64,
78    ) -> Self {
79        Self {
80            success: false,
81            status_code: Some(status_code),
82            attempts,
83            duration_ms,
84            error: Some(error),
85        }
86    }
87}
88
89/// Trait for webhook delivery backends.
90///
91/// This trait allows for different delivery implementations (HTTP, mock for testing).
92pub trait WebhookDelivery: Send + Sync {
93    /// Delivers a webhook payload to the configured endpoint.
94    ///
95    /// # Arguments
96    ///
97    /// * `endpoint` - The webhook endpoint configuration
98    /// * `payload` - The payload to deliver
99    ///
100    /// # Returns
101    ///
102    /// A `DeliveryResult` containing success/failure status and metadata.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if delivery fails after all retries.
107    fn deliver(
108        &self,
109        endpoint: &WebhookEndpoint,
110        payload: &WebhookPayload,
111    ) -> Result<DeliveryResult>;
112}
113
114/// HTTP webhook delivery backend using reqwest.
115pub struct HttpDeliveryBackend {
116    /// HTTP client with connection pooling.
117    client: reqwest::blocking::Client,
118}
119
120impl HttpDeliveryBackend {
121    /// Creates a new HTTP delivery backend.
122    #[must_use]
123    pub fn new() -> Self {
124        let client = reqwest::blocking::Client::builder()
125            .user_agent(format!("Subcog/{}", env!("CARGO_PKG_VERSION")))
126            .pool_max_idle_per_host(10)
127            .build()
128            .unwrap_or_else(|_| reqwest::blocking::Client::new());
129
130        Self { client }
131    }
132
133    /// Creates a new HTTP delivery backend with custom timeout.
134    #[must_use]
135    pub fn with_timeout(timeout: Duration) -> Self {
136        let client = reqwest::blocking::Client::builder()
137            .user_agent(format!("Subcog/{}", env!("CARGO_PKG_VERSION")))
138            .timeout(timeout)
139            .pool_max_idle_per_host(10)
140            .build()
141            .unwrap_or_else(|_| reqwest::blocking::Client::new());
142
143        Self { client }
144    }
145
146    /// Attempts a single delivery without retries.
147    fn attempt_delivery(
148        &self,
149        endpoint: &WebhookEndpoint,
150        payload: &WebhookPayload,
151    ) -> std::result::Result<u16, String> {
152        // Use format-specific JSON based on endpoint configuration
153        let payload_json = payload.to_format_json(endpoint.format);
154
155        let mut request = self
156            .client
157            .post(&endpoint.url)
158            .header("Content-Type", "application/json")
159            .header("X-Subcog-Event", &payload.event_type)
160            .header("X-Subcog-Delivery-Id", &payload.event_id)
161            .timeout(Duration::from_secs(endpoint.retry.timeout_secs));
162
163        // Add authentication headers (use default format for HMAC signature)
164        let signature_json = payload.to_json();
165        request = Self::add_auth_headers(request, &endpoint.auth, &signature_json);
166
167        // Send request
168        let response = request
169            .body(payload_json)
170            .send()
171            .map_err(|e| format!("HTTP request failed: {e}"))?;
172
173        let status = response.status().as_u16();
174
175        if response.status().is_success() {
176            Ok(status)
177        } else {
178            Err(format!("HTTP {status} response"))
179        }
180    }
181
182    /// Adds authentication headers to a request.
183    fn add_auth_headers(
184        mut request: reqwest::blocking::RequestBuilder,
185        auth: &WebhookAuth,
186        payload_json: &str,
187    ) -> reqwest::blocking::RequestBuilder {
188        // Add Bearer token if configured
189        if let Some(token) = auth.bearer_token() {
190            request = request.header("Authorization", format!("Bearer {}", token.expose_secret()));
191        }
192
193        // Add HMAC signature if configured
194        if let Some(secret) = auth.hmac_secret() {
195            let signature =
196                super::payload::compute_hmac_signature(secret.expose_secret(), payload_json);
197            request = request.header("X-Subcog-Signature", signature);
198        }
199
200        request
201    }
202
203    /// Delivers with retry logic.
204    fn deliver_with_retry(
205        &self,
206        endpoint: &WebhookEndpoint,
207        payload: &WebhookPayload,
208    ) -> DeliveryResult {
209        let start = std::time::Instant::now();
210        let retry_config = &endpoint.retry;
211        let max_attempts = retry_config.max_retries + 1;
212
213        for attempt in 1..=max_attempts {
214            if let Some(result) = self.try_single_delivery(
215                endpoint,
216                payload,
217                attempt,
218                max_attempts,
219                start,
220                retry_config,
221            ) {
222                return result;
223            }
224        }
225
226        // Should not reach here
227        let duration_ms = Self::elapsed_ms(start);
228        DeliveryResult::failure(
229            "Max retries exceeded".to_string(),
230            max_attempts,
231            duration_ms,
232        )
233    }
234
235    /// Attempts a single delivery, returning Some if done (success or final failure).
236    fn try_single_delivery(
237        &self,
238        endpoint: &WebhookEndpoint,
239        payload: &WebhookPayload,
240        attempt: u32,
241        max_attempts: u32,
242        start: std::time::Instant,
243        retry_config: &super::config::RetryConfig,
244    ) -> Option<DeliveryResult> {
245        match self.attempt_delivery(endpoint, payload) {
246            Ok(status_code) => {
247                let duration_ms = Self::elapsed_ms(start);
248                Some(DeliveryResult::success(status_code, attempt, duration_ms))
249            },
250            Err(ref error) => {
251                Self::handle_delivery_error(error, attempt, max_attempts, start, retry_config)
252            },
253        }
254    }
255
256    /// Gets elapsed milliseconds, saturating at `u64::MAX`.
257    #[allow(clippy::cast_possible_truncation)]
258    fn elapsed_ms(start: std::time::Instant) -> u64 {
259        // Duration in ms will not realistically exceed u64::MAX
260        start.elapsed().as_millis() as u64
261    }
262
263    /// Handles delivery errors, returning Some if we should stop retrying.
264    fn handle_delivery_error(
265        error: &str,
266        attempt: u32,
267        max_attempts: u32,
268        start: std::time::Instant,
269        retry_config: &super::config::RetryConfig,
270    ) -> Option<DeliveryResult> {
271        // Check if this is a client error (4xx) - don't retry
272        if error.contains("HTTP 4") {
273            let duration_ms = Self::elapsed_ms(start);
274            let status_code = Self::extract_status_code(error);
275            return Some(DeliveryResult::failure_with_status(
276                status_code.unwrap_or(400),
277                error.to_string(),
278                attempt,
279                duration_ms,
280            ));
281        }
282
283        // Last attempt - return failure
284        if attempt >= max_attempts {
285            let duration_ms = Self::elapsed_ms(start);
286            return Some(DeliveryResult::failure(
287                error.to_string(),
288                attempt,
289                duration_ms,
290            ));
291        }
292
293        // Sleep before retry
294        let delay_ms = retry_config.delay_for_attempt(attempt);
295        std::thread::sleep(Duration::from_millis(delay_ms));
296
297        None
298    }
299
300    /// Extracts status code from error message.
301    fn extract_status_code(error: &str) -> Option<u16> {
302        // Pattern: "HTTP 4xx" or "HTTP 5xx"
303        if let Some(start) = error.find("HTTP ") {
304            let code_str = &error[start + 5..];
305            if code_str.len() >= 3 {
306                return code_str[..3].parse().ok();
307            }
308        }
309        None
310    }
311}
312
313impl Default for HttpDeliveryBackend {
314    fn default() -> Self {
315        Self::new()
316    }
317}
318
319impl WebhookDelivery for HttpDeliveryBackend {
320    fn deliver(
321        &self,
322        endpoint: &WebhookEndpoint,
323        payload: &WebhookPayload,
324    ) -> Result<DeliveryResult> {
325        Ok(self.deliver_with_retry(endpoint, payload))
326    }
327}
328
329/// Mock delivery backend for testing.
330#[cfg(test)]
331pub struct MockDeliveryBackend {
332    /// Responses to return for each delivery.
333    responses: std::sync::Mutex<Vec<Result<DeliveryResult>>>,
334    /// Payloads that were delivered.
335    pub delivered: std::sync::Mutex<Vec<(String, WebhookPayload)>>,
336}
337
338#[cfg(test)]
339impl MockDeliveryBackend {
340    /// Creates a new mock backend.
341    pub fn new() -> Self {
342        Self {
343            responses: std::sync::Mutex::new(Vec::new()),
344            delivered: std::sync::Mutex::new(Vec::new()),
345        }
346    }
347
348    /// Queues a response for the next delivery.
349    pub fn queue_response(&self, result: Result<DeliveryResult>) {
350        self.responses.lock().expect("lock").push(result);
351    }
352
353    /// Returns the number of deliveries made.
354    pub fn delivery_count(&self) -> usize {
355        self.delivered.lock().expect("lock").len()
356    }
357}
358
359#[cfg(test)]
360impl WebhookDelivery for MockDeliveryBackend {
361    fn deliver(
362        &self,
363        endpoint: &WebhookEndpoint,
364        payload: &WebhookPayload,
365    ) -> Result<DeliveryResult> {
366        self.delivered
367            .lock()
368            .expect("lock")
369            .push((endpoint.name.clone(), payload.clone()));
370
371        self.responses
372            .lock()
373            .expect("lock")
374            .pop()
375            .unwrap_or_else(|| Ok(DeliveryResult::success(200, 1, 10)))
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::super::config::{PayloadFormat, RetryConfig};
382    use super::*;
383
384    #[test]
385    fn test_delivery_result_success() {
386        let result = DeliveryResult::success(200, 1, 100);
387
388        assert!(result.success);
389        assert_eq!(result.status_code, Some(200));
390        assert_eq!(result.attempts, 1);
391        assert!(result.error.is_none());
392    }
393
394    #[test]
395    fn test_delivery_result_failure() {
396        let result = DeliveryResult::failure("Connection refused".to_string(), 3, 5000);
397
398        assert!(!result.success);
399        assert!(result.status_code.is_none());
400        assert_eq!(result.attempts, 3);
401        assert!(result.error.is_some());
402    }
403
404    #[test]
405    fn test_extract_status_code() {
406        assert_eq!(
407            HttpDeliveryBackend::extract_status_code("HTTP 404 response"),
408            Some(404)
409        );
410        assert_eq!(
411            HttpDeliveryBackend::extract_status_code("HTTP 500 response"),
412            Some(500)
413        );
414        assert_eq!(
415            HttpDeliveryBackend::extract_status_code("Connection refused"),
416            None
417        );
418    }
419
420    #[test]
421    fn test_mock_delivery_backend() {
422        let mock = MockDeliveryBackend::new();
423        mock.queue_response(Ok(DeliveryResult::success(201, 1, 50)));
424
425        let endpoint = WebhookEndpoint {
426            name: "test".to_string(),
427            url: "https://example.com".to_string(),
428            auth: WebhookAuth::None,
429            events: vec![],
430            scopes: vec![],
431            enabled: true,
432            retry: RetryConfig::default(),
433            format: PayloadFormat::Default,
434        };
435
436        let payload = WebhookPayload::test_event();
437        let result = mock.deliver(&endpoint, &payload).expect("delivery");
438
439        assert!(result.success);
440        assert_eq!(result.status_code, Some(201));
441        assert_eq!(mock.delivery_count(), 1);
442    }
443}