1use crate::{Error, Result};
29use rusqlite::{Connection, params};
30use serde::{Deserialize, Serialize};
31use std::path::Path;
32use std::sync::Mutex;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "lowercase")]
37pub enum DeliveryStatus {
38 Success,
40 Failed,
42 Timeout,
44}
45
46impl DeliveryStatus {
47 #[must_use]
49 pub const fn as_str(&self) -> &'static str {
50 match self {
51 Self::Success => "success",
52 Self::Failed => "failed",
53 Self::Timeout => "timeout",
54 }
55 }
56}
57
58impl std::fmt::Display for DeliveryStatus {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", self.as_str())
61 }
62}
63
64impl std::str::FromStr for DeliveryStatus {
65 type Err = Error;
66
67 fn from_str(s: &str) -> Result<Self> {
68 match s {
69 "success" => Ok(Self::Success),
70 "failed" => Ok(Self::Failed),
71 "timeout" => Ok(Self::Timeout),
72 _ => Err(Error::InvalidInput(format!("Invalid delivery status: {s}"))),
73 }
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct DeliveryRecord {
80 pub id: String,
82 pub webhook_name: String,
84 pub event_type: String,
86 pub event_id: String,
88 pub domain: String,
90 pub url: String,
92 pub status: DeliveryStatus,
94 pub status_code: Option<i32>,
96 pub attempts: i32,
98 pub duration_ms: i64,
100 pub error: Option<String>,
102 pub timestamp: i64,
104}
105
106impl DeliveryRecord {
107 #[must_use]
109 pub fn new(
110 webhook_name: &str,
111 event_type: &str,
112 event_id: &str,
113 domain: &str,
114 url: &str,
115 result: &super::delivery::DeliveryResult,
116 ) -> Self {
117 Self {
118 id: uuid::Uuid::new_v4().to_string(),
119 webhook_name: webhook_name.to_string(),
120 event_type: event_type.to_string(),
121 event_id: event_id.to_string(),
122 domain: domain.to_string(),
123 url: url.to_string(),
124 status: if result.success {
125 DeliveryStatus::Success
126 } else {
127 DeliveryStatus::Failed
128 },
129 status_code: result.status_code.map(i32::from),
130 attempts: i32::try_from(result.attempts).unwrap_or(0),
131 duration_ms: i64::try_from(result.duration_ms).unwrap_or(0),
132 error: result.error.clone(),
133 timestamp: chrono::Utc::now().timestamp(),
134 }
135 }
136}
137
138pub trait WebhookAuditBackend: Send + Sync {
140 fn store(&self, record: &DeliveryRecord) -> Result<()>;
146
147 fn get_history(&self, webhook_name: &str, limit: usize) -> Result<Vec<DeliveryRecord>>;
158
159 fn export_domain_logs(&self, domain: &str) -> Result<Vec<DeliveryRecord>>;
169
170 fn delete_domain_logs(&self, domain: &str) -> Result<usize>;
184
185 fn count_by_status(&self, webhook_name: &str) -> Result<WebhookStats>;
191}
192
193#[derive(Debug, Clone, Default, Serialize, Deserialize)]
195pub struct WebhookStats {
196 pub total: usize,
198 pub success: usize,
200 pub failed: usize,
202 pub avg_duration_ms: f64,
204}
205
206pub struct WebhookAuditLogger {
208 conn: Mutex<Connection>,
210}
211
212#[allow(clippy::significant_drop_tightening)]
214impl WebhookAuditLogger {
215 pub fn new(db_path: &Path) -> Result<Self> {
225 if let Some(parent) = db_path.parent() {
227 std::fs::create_dir_all(parent).map_err(|e| Error::OperationFailed {
228 operation: "create_audit_dir".to_string(),
229 cause: e.to_string(),
230 })?;
231 }
232
233 let conn = Connection::open(db_path).map_err(|e| Error::OperationFailed {
234 operation: "open_audit_db".to_string(),
235 cause: e.to_string(),
236 })?;
237
238 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")
240 .map_err(|e| Error::OperationFailed {
241 operation: "configure_audit_db".to_string(),
242 cause: e.to_string(),
243 })?;
244
245 let logger = Self {
246 conn: Mutex::new(conn),
247 };
248
249 logger.create_schema()?;
250
251 Ok(logger)
252 }
253
254 #[cfg(test)]
260 pub fn in_memory() -> Result<Self> {
261 let conn = Connection::open_in_memory().map_err(|e| Error::OperationFailed {
262 operation: "open_memory_db".to_string(),
263 cause: e.to_string(),
264 })?;
265
266 let logger = Self {
267 conn: Mutex::new(conn),
268 };
269
270 logger.create_schema()?;
271
272 Ok(logger)
273 }
274
275 fn create_schema(&self) -> Result<()> {
277 let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
278 operation: "lock_audit_db".to_string(),
279 cause: e.to_string(),
280 })?;
281
282 conn.execute_batch(
283 r"
284 CREATE TABLE IF NOT EXISTS webhook_deliveries (
285 id TEXT PRIMARY KEY,
286 webhook_name TEXT NOT NULL,
287 event_type TEXT NOT NULL,
288 event_id TEXT NOT NULL,
289 domain TEXT NOT NULL,
290 url TEXT NOT NULL,
291 status TEXT NOT NULL,
292 status_code INTEGER,
293 attempts INTEGER NOT NULL,
294 duration_ms INTEGER NOT NULL,
295 error TEXT,
296 timestamp INTEGER NOT NULL
297 );
298
299 CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_webhook_name
300 ON webhook_deliveries(webhook_name);
301 CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_domain
302 ON webhook_deliveries(domain);
303 CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_timestamp
304 ON webhook_deliveries(timestamp);
305 CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_event_id
306 ON webhook_deliveries(event_id);
307 ",
308 )
309 .map_err(|e| Error::OperationFailed {
310 operation: "create_audit_schema".to_string(),
311 cause: e.to_string(),
312 })?;
313
314 Ok(())
315 }
316
317 pub fn log_delivery(
332 &self,
333 webhook_name: &str,
334 event_type: &str,
335 event_id: &str,
336 domain: &str,
337 url: &str,
338 result: &super::delivery::DeliveryResult,
339 ) -> Result<()> {
340 let record = DeliveryRecord::new(webhook_name, event_type, event_id, domain, url, result);
341 self.store(&record)
342 }
343}
344
345#[allow(clippy::significant_drop_tightening)]
347impl WebhookAuditBackend for WebhookAuditLogger {
348 fn store(&self, record: &DeliveryRecord) -> Result<()> {
349 let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
350 operation: "lock_audit_db".to_string(),
351 cause: e.to_string(),
352 })?;
353
354 conn.execute(
355 r"
356 INSERT INTO webhook_deliveries
357 (id, webhook_name, event_type, event_id, domain, url, status,
358 status_code, attempts, duration_ms, error, timestamp)
359 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
360 ",
361 params![
362 record.id,
363 record.webhook_name,
364 record.event_type,
365 record.event_id,
366 record.domain,
367 record.url,
368 record.status.as_str(),
369 record.status_code,
370 record.attempts,
371 record.duration_ms,
372 record.error,
373 record.timestamp,
374 ],
375 )
376 .map_err(|e| Error::OperationFailed {
377 operation: "store_delivery_record".to_string(),
378 cause: e.to_string(),
379 })?;
380
381 Ok(())
382 }
383
384 fn get_history(&self, webhook_name: &str, limit: usize) -> Result<Vec<DeliveryRecord>> {
385 let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
386 operation: "lock_audit_db".to_string(),
387 cause: e.to_string(),
388 })?;
389
390 let mut stmt = conn
391 .prepare(
392 r"
393 SELECT id, webhook_name, event_type, event_id, domain, url, status,
394 status_code, attempts, duration_ms, error, timestamp
395 FROM webhook_deliveries
396 WHERE webhook_name = ?1
397 ORDER BY timestamp DESC
398 LIMIT ?2
399 ",
400 )
401 .map_err(|e| Error::OperationFailed {
402 operation: "prepare_history_query".to_string(),
403 cause: e.to_string(),
404 })?;
405
406 let limit_i64 = i64::try_from(limit).unwrap_or(i64::MAX);
407 let records = stmt
408 .query_map(params![webhook_name, limit_i64], |row| {
409 Ok(DeliveryRecord {
410 id: row.get(0)?,
411 webhook_name: row.get(1)?,
412 event_type: row.get(2)?,
413 event_id: row.get(3)?,
414 domain: row.get(4)?,
415 url: row.get(5)?,
416 status: row
417 .get::<_, String>(6)?
418 .parse()
419 .unwrap_or(DeliveryStatus::Failed),
420 status_code: row.get(7)?,
421 attempts: row.get(8)?,
422 duration_ms: row.get(9)?,
423 error: row.get(10)?,
424 timestamp: row.get(11)?,
425 })
426 })
427 .map_err(|e| Error::OperationFailed {
428 operation: "query_history".to_string(),
429 cause: e.to_string(),
430 })?
431 .filter_map(std::result::Result::ok)
432 .collect();
433
434 Ok(records)
435 }
436
437 fn export_domain_logs(&self, domain: &str) -> Result<Vec<DeliveryRecord>> {
438 let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
439 operation: "lock_audit_db".to_string(),
440 cause: e.to_string(),
441 })?;
442
443 let mut stmt = conn
444 .prepare(
445 r"
446 SELECT id, webhook_name, event_type, event_id, domain, url, status,
447 status_code, attempts, duration_ms, error, timestamp
448 FROM webhook_deliveries
449 WHERE domain = ?1
450 ORDER BY timestamp DESC
451 ",
452 )
453 .map_err(|e| Error::OperationFailed {
454 operation: "prepare_export_query".to_string(),
455 cause: e.to_string(),
456 })?;
457
458 let records = stmt
459 .query_map(params![domain], |row| {
460 Ok(DeliveryRecord {
461 id: row.get(0)?,
462 webhook_name: row.get(1)?,
463 event_type: row.get(2)?,
464 event_id: row.get(3)?,
465 domain: row.get(4)?,
466 url: row.get(5)?,
467 status: row
468 .get::<_, String>(6)?
469 .parse()
470 .unwrap_or(DeliveryStatus::Failed),
471 status_code: row.get(7)?,
472 attempts: row.get(8)?,
473 duration_ms: row.get(9)?,
474 error: row.get(10)?,
475 timestamp: row.get(11)?,
476 })
477 })
478 .map_err(|e| Error::OperationFailed {
479 operation: "export_logs".to_string(),
480 cause: e.to_string(),
481 })?
482 .filter_map(std::result::Result::ok)
483 .collect();
484
485 Ok(records)
486 }
487
488 fn delete_domain_logs(&self, domain: &str) -> Result<usize> {
489 let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
490 operation: "lock_audit_db".to_string(),
491 cause: e.to_string(),
492 })?;
493
494 let count = conn
495 .execute(
496 "DELETE FROM webhook_deliveries WHERE domain = ?1",
497 params![domain],
498 )
499 .map_err(|e| Error::OperationFailed {
500 operation: "delete_domain_logs".to_string(),
501 cause: e.to_string(),
502 })?;
503
504 Ok(count)
505 }
506
507 fn count_by_status(&self, webhook_name: &str) -> Result<WebhookStats> {
508 let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
509 operation: "lock_audit_db".to_string(),
510 cause: e.to_string(),
511 })?;
512
513 let mut stmt = conn
514 .prepare(
515 r"
516 SELECT
517 COUNT(*) as total,
518 SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
519 SUM(CASE WHEN status != 'success' THEN 1 ELSE 0 END) as failed,
520 AVG(duration_ms) as avg_duration
521 FROM webhook_deliveries
522 WHERE webhook_name = ?1
523 ",
524 )
525 .map_err(|e| Error::OperationFailed {
526 operation: "prepare_stats_query".to_string(),
527 cause: e.to_string(),
528 })?;
529
530 let stats = stmt
531 .query_row(params![webhook_name], |row| {
532 Ok(WebhookStats {
533 total: usize::try_from(row.get::<_, i64>(0).unwrap_or(0)).unwrap_or(0),
534 success: usize::try_from(row.get::<_, i64>(1).unwrap_or(0)).unwrap_or(0),
535 failed: usize::try_from(row.get::<_, i64>(2).unwrap_or(0)).unwrap_or(0),
536 avg_duration_ms: row.get::<_, f64>(3).unwrap_or(0.0),
537 })
538 })
539 .map_err(|e| Error::OperationFailed {
540 operation: "query_stats".to_string(),
541 cause: e.to_string(),
542 })?;
543
544 Ok(stats)
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551 use crate::webhooks::delivery::DeliveryResult;
552
553 #[test]
554 fn test_audit_logger_creation() {
555 let logger = WebhookAuditLogger::in_memory().expect("create logger");
556 let stats = logger.count_by_status("test").expect("get stats");
557 assert_eq!(stats.total, 0);
558 }
559
560 #[test]
561 fn test_store_and_retrieve() {
562 let logger = WebhookAuditLogger::in_memory().expect("create logger");
563
564 let result = DeliveryResult::success(200, 1, 100);
565 logger
566 .log_delivery(
567 "test-webhook",
568 "captured",
569 "event-123",
570 "project",
571 "https://example.com",
572 &result,
573 )
574 .expect("log delivery");
575
576 let history = logger.get_history("test-webhook", 10).expect("get history");
577 assert_eq!(history.len(), 1);
578 assert_eq!(history[0].webhook_name, "test-webhook");
579 assert_eq!(history[0].status, DeliveryStatus::Success);
580 }
581
582 #[test]
583 fn test_export_domain_logs() {
584 let logger = WebhookAuditLogger::in_memory().expect("create logger");
585
586 let result = DeliveryResult::success(200, 1, 100);
588 logger
589 .log_delivery(
590 "webhook-1",
591 "captured",
592 "e1",
593 "project",
594 "https://a.com",
595 &result,
596 )
597 .expect("log 1");
598 logger
599 .log_delivery(
600 "webhook-2",
601 "deleted",
602 "e2",
603 "user",
604 "https://b.com",
605 &result,
606 )
607 .expect("log 2");
608 logger
609 .log_delivery(
610 "webhook-3",
611 "updated",
612 "e3",
613 "project",
614 "https://c.com",
615 &result,
616 )
617 .expect("log 3");
618
619 let project_logs = logger.export_domain_logs("project").expect("export");
620 assert_eq!(project_logs.len(), 2);
621
622 let user_logs = logger.export_domain_logs("user").expect("export");
623 assert_eq!(user_logs.len(), 1);
624 }
625
626 #[test]
627 fn test_delete_domain_logs() {
628 let logger = WebhookAuditLogger::in_memory().expect("create logger");
629
630 let result = DeliveryResult::success(200, 1, 100);
631 logger
632 .log_delivery(
633 "webhook-1",
634 "captured",
635 "e1",
636 "project",
637 "https://a.com",
638 &result,
639 )
640 .expect("log 1");
641 logger
642 .log_delivery(
643 "webhook-2",
644 "captured",
645 "e2",
646 "project",
647 "https://b.com",
648 &result,
649 )
650 .expect("log 2");
651 logger
652 .log_delivery(
653 "webhook-3",
654 "captured",
655 "e3",
656 "user",
657 "https://c.com",
658 &result,
659 )
660 .expect("log 3");
661
662 let deleted = logger.delete_domain_logs("project").expect("delete");
663 assert_eq!(deleted, 2);
664
665 let project_logs = logger.export_domain_logs("project").expect("export");
666 assert_eq!(project_logs.len(), 0);
667
668 let user_logs = logger.export_domain_logs("user").expect("export");
670 assert_eq!(user_logs.len(), 1);
671 }
672
673 #[test]
674 fn test_count_by_status() {
675 let logger = WebhookAuditLogger::in_memory().expect("create logger");
676
677 let success = DeliveryResult::success(200, 1, 100);
678 let failure = DeliveryResult::failure("error".to_string(), 3, 5000);
679
680 logger
681 .log_delivery(
682 "webhook",
683 "captured",
684 "e1",
685 "project",
686 "https://a.com",
687 &success,
688 )
689 .expect("log 1");
690 logger
691 .log_delivery(
692 "webhook",
693 "captured",
694 "e2",
695 "project",
696 "https://a.com",
697 &success,
698 )
699 .expect("log 2");
700 logger
701 .log_delivery(
702 "webhook",
703 "captured",
704 "e3",
705 "project",
706 "https://a.com",
707 &failure,
708 )
709 .expect("log 3");
710
711 let stats = logger.count_by_status("webhook").expect("stats");
712 assert_eq!(stats.total, 3);
713 assert_eq!(stats.success, 2);
714 assert_eq!(stats.failed, 1);
715 }
716}