1use super::config::{WebhookAuth, WebhookEndpoint};
22use super::payload::WebhookPayload;
23use crate::Result;
24use secrecy::ExposeSecret;
25use std::time::Duration;
26
27#[derive(Debug, Clone)]
29pub struct DeliveryResult {
30 pub success: bool,
32
33 pub status_code: Option<u16>,
35
36 pub attempts: u32,
38
39 pub duration_ms: u64,
41
42 pub error: Option<String>,
44}
45
46impl DeliveryResult {
47 #[must_use]
49 pub const fn success(status_code: u16, attempts: u32, duration_ms: u64) -> Self {
50 Self {
51 success: true,
52 status_code: Some(status_code),
53 attempts,
54 duration_ms,
55 error: None,
56 }
57 }
58
59 #[must_use]
61 pub const fn failure(error: String, attempts: u32, duration_ms: u64) -> Self {
62 Self {
63 success: false,
64 status_code: None,
65 attempts,
66 duration_ms,
67 error: Some(error),
68 }
69 }
70
71 #[must_use]
73 pub const fn failure_with_status(
74 status_code: u16,
75 error: String,
76 attempts: u32,
77 duration_ms: u64,
78 ) -> Self {
79 Self {
80 success: false,
81 status_code: Some(status_code),
82 attempts,
83 duration_ms,
84 error: Some(error),
85 }
86 }
87}
88
89pub trait WebhookDelivery: Send + Sync {
93 fn deliver(
108 &self,
109 endpoint: &WebhookEndpoint,
110 payload: &WebhookPayload,
111 ) -> Result<DeliveryResult>;
112}
113
114pub struct HttpDeliveryBackend {
116 client: reqwest::blocking::Client,
118}
119
120impl HttpDeliveryBackend {
121 #[must_use]
123 pub fn new() -> Self {
124 let client = reqwest::blocking::Client::builder()
125 .user_agent(format!("Subcog/{}", env!("CARGO_PKG_VERSION")))
126 .pool_max_idle_per_host(10)
127 .build()
128 .unwrap_or_else(|_| reqwest::blocking::Client::new());
129
130 Self { client }
131 }
132
133 #[must_use]
135 pub fn with_timeout(timeout: Duration) -> Self {
136 let client = reqwest::blocking::Client::builder()
137 .user_agent(format!("Subcog/{}", env!("CARGO_PKG_VERSION")))
138 .timeout(timeout)
139 .pool_max_idle_per_host(10)
140 .build()
141 .unwrap_or_else(|_| reqwest::blocking::Client::new());
142
143 Self { client }
144 }
145
146 fn attempt_delivery(
148 &self,
149 endpoint: &WebhookEndpoint,
150 payload: &WebhookPayload,
151 ) -> std::result::Result<u16, String> {
152 let payload_json = payload.to_format_json(endpoint.format);
154
155 let mut request = self
156 .client
157 .post(&endpoint.url)
158 .header("Content-Type", "application/json")
159 .header("X-Subcog-Event", &payload.event_type)
160 .header("X-Subcog-Delivery-Id", &payload.event_id)
161 .timeout(Duration::from_secs(endpoint.retry.timeout_secs));
162
163 let signature_json = payload.to_json();
165 request = Self::add_auth_headers(request, &endpoint.auth, &signature_json);
166
167 let response = request
169 .body(payload_json)
170 .send()
171 .map_err(|e| format!("HTTP request failed: {e}"))?;
172
173 let status = response.status().as_u16();
174
175 if response.status().is_success() {
176 Ok(status)
177 } else {
178 Err(format!("HTTP {status} response"))
179 }
180 }
181
182 fn add_auth_headers(
184 mut request: reqwest::blocking::RequestBuilder,
185 auth: &WebhookAuth,
186 payload_json: &str,
187 ) -> reqwest::blocking::RequestBuilder {
188 if let Some(token) = auth.bearer_token() {
190 request = request.header("Authorization", format!("Bearer {}", token.expose_secret()));
191 }
192
193 if let Some(secret) = auth.hmac_secret() {
195 let signature =
196 super::payload::compute_hmac_signature(secret.expose_secret(), payload_json);
197 request = request.header("X-Subcog-Signature", signature);
198 }
199
200 request
201 }
202
203 fn deliver_with_retry(
205 &self,
206 endpoint: &WebhookEndpoint,
207 payload: &WebhookPayload,
208 ) -> DeliveryResult {
209 let start = std::time::Instant::now();
210 let retry_config = &endpoint.retry;
211 let max_attempts = retry_config.max_retries + 1;
212
213 for attempt in 1..=max_attempts {
214 if let Some(result) = self.try_single_delivery(
215 endpoint,
216 payload,
217 attempt,
218 max_attempts,
219 start,
220 retry_config,
221 ) {
222 return result;
223 }
224 }
225
226 let duration_ms = Self::elapsed_ms(start);
228 DeliveryResult::failure(
229 "Max retries exceeded".to_string(),
230 max_attempts,
231 duration_ms,
232 )
233 }
234
235 fn try_single_delivery(
237 &self,
238 endpoint: &WebhookEndpoint,
239 payload: &WebhookPayload,
240 attempt: u32,
241 max_attempts: u32,
242 start: std::time::Instant,
243 retry_config: &super::config::RetryConfig,
244 ) -> Option<DeliveryResult> {
245 match self.attempt_delivery(endpoint, payload) {
246 Ok(status_code) => {
247 let duration_ms = Self::elapsed_ms(start);
248 Some(DeliveryResult::success(status_code, attempt, duration_ms))
249 },
250 Err(ref error) => {
251 Self::handle_delivery_error(error, attempt, max_attempts, start, retry_config)
252 },
253 }
254 }
255
256 #[allow(clippy::cast_possible_truncation)]
258 fn elapsed_ms(start: std::time::Instant) -> u64 {
259 start.elapsed().as_millis() as u64
261 }
262
263 fn handle_delivery_error(
265 error: &str,
266 attempt: u32,
267 max_attempts: u32,
268 start: std::time::Instant,
269 retry_config: &super::config::RetryConfig,
270 ) -> Option<DeliveryResult> {
271 if error.contains("HTTP 4") {
273 let duration_ms = Self::elapsed_ms(start);
274 let status_code = Self::extract_status_code(error);
275 return Some(DeliveryResult::failure_with_status(
276 status_code.unwrap_or(400),
277 error.to_string(),
278 attempt,
279 duration_ms,
280 ));
281 }
282
283 if attempt >= max_attempts {
285 let duration_ms = Self::elapsed_ms(start);
286 return Some(DeliveryResult::failure(
287 error.to_string(),
288 attempt,
289 duration_ms,
290 ));
291 }
292
293 let delay_ms = retry_config.delay_for_attempt(attempt);
295 std::thread::sleep(Duration::from_millis(delay_ms));
296
297 None
298 }
299
300 fn extract_status_code(error: &str) -> Option<u16> {
302 if let Some(start) = error.find("HTTP ") {
304 let code_str = &error[start + 5..];
305 if code_str.len() >= 3 {
306 return code_str[..3].parse().ok();
307 }
308 }
309 None
310 }
311}
312
313impl Default for HttpDeliveryBackend {
314 fn default() -> Self {
315 Self::new()
316 }
317}
318
319impl WebhookDelivery for HttpDeliveryBackend {
320 fn deliver(
321 &self,
322 endpoint: &WebhookEndpoint,
323 payload: &WebhookPayload,
324 ) -> Result<DeliveryResult> {
325 Ok(self.deliver_with_retry(endpoint, payload))
326 }
327}
328
329#[cfg(test)]
331pub struct MockDeliveryBackend {
332 responses: std::sync::Mutex<Vec<Result<DeliveryResult>>>,
334 pub delivered: std::sync::Mutex<Vec<(String, WebhookPayload)>>,
336}
337
338#[cfg(test)]
339impl MockDeliveryBackend {
340 pub fn new() -> Self {
342 Self {
343 responses: std::sync::Mutex::new(Vec::new()),
344 delivered: std::sync::Mutex::new(Vec::new()),
345 }
346 }
347
348 pub fn queue_response(&self, result: Result<DeliveryResult>) {
350 self.responses.lock().expect("lock").push(result);
351 }
352
353 pub fn delivery_count(&self) -> usize {
355 self.delivered.lock().expect("lock").len()
356 }
357}
358
359#[cfg(test)]
360impl WebhookDelivery for MockDeliveryBackend {
361 fn deliver(
362 &self,
363 endpoint: &WebhookEndpoint,
364 payload: &WebhookPayload,
365 ) -> Result<DeliveryResult> {
366 self.delivered
367 .lock()
368 .expect("lock")
369 .push((endpoint.name.clone(), payload.clone()));
370
371 self.responses
372 .lock()
373 .expect("lock")
374 .pop()
375 .unwrap_or_else(|| Ok(DeliveryResult::success(200, 1, 10)))
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::super::config::{PayloadFormat, RetryConfig};
382 use super::*;
383
384 #[test]
385 fn test_delivery_result_success() {
386 let result = DeliveryResult::success(200, 1, 100);
387
388 assert!(result.success);
389 assert_eq!(result.status_code, Some(200));
390 assert_eq!(result.attempts, 1);
391 assert!(result.error.is_none());
392 }
393
394 #[test]
395 fn test_delivery_result_failure() {
396 let result = DeliveryResult::failure("Connection refused".to_string(), 3, 5000);
397
398 assert!(!result.success);
399 assert!(result.status_code.is_none());
400 assert_eq!(result.attempts, 3);
401 assert!(result.error.is_some());
402 }
403
404 #[test]
405 fn test_extract_status_code() {
406 assert_eq!(
407 HttpDeliveryBackend::extract_status_code("HTTP 404 response"),
408 Some(404)
409 );
410 assert_eq!(
411 HttpDeliveryBackend::extract_status_code("HTTP 500 response"),
412 Some(500)
413 );
414 assert_eq!(
415 HttpDeliveryBackend::extract_status_code("Connection refused"),
416 None
417 );
418 }
419
420 #[test]
421 fn test_mock_delivery_backend() {
422 let mock = MockDeliveryBackend::new();
423 mock.queue_response(Ok(DeliveryResult::success(201, 1, 50)));
424
425 let endpoint = WebhookEndpoint {
426 name: "test".to_string(),
427 url: "https://example.com".to_string(),
428 auth: WebhookAuth::None,
429 events: vec![],
430 scopes: vec![],
431 enabled: true,
432 retry: RetryConfig::default(),
433 format: PayloadFormat::Default,
434 };
435
436 let payload = WebhookPayload::test_event();
437 let result = mock.deliver(&endpoint, &payload).expect("delivery");
438
439 assert!(result.success);
440 assert_eq!(result.status_code, Some(201));
441 assert_eq!(mock.delivery_count(), 1);
442 }
443}