Skip to main content

subcog/webhooks/
audit.rs

1//! GDPR-compliant webhook delivery audit logging.
2//!
3//! This module provides audit logging for webhook deliveries with full
4//! GDPR compliance, including:
5//! - Recording all delivery attempts with metadata
6//! - Export of logs by domain (GDPR Article 20 - Data Portability)
7//! - Deletion of logs by domain (GDPR Article 17 - Right to Erasure)
8//!
9//! # Schema
10//!
11//! ```sql
12//! CREATE TABLE webhook_deliveries (
13//!     id TEXT PRIMARY KEY,
14//!     webhook_name TEXT NOT NULL,
15//!     event_type TEXT NOT NULL,
16//!     event_id TEXT NOT NULL,
17//!     domain TEXT NOT NULL,
18//!     url TEXT NOT NULL,
19//!     status TEXT NOT NULL,
20//!     status_code INTEGER,
21//!     attempts INTEGER NOT NULL,
22//!     duration_ms INTEGER NOT NULL,
23//!     error TEXT,
24//!     timestamp INTEGER NOT NULL
25//! );
26//! ```
27
28use crate::{Error, Result};
29use rusqlite::{Connection, params};
30use serde::{Deserialize, Serialize};
31use std::path::Path;
32use std::sync::Mutex;
33
34/// Delivery status for audit logging.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "lowercase")]
37pub enum DeliveryStatus {
38    /// Delivery succeeded.
39    Success,
40    /// Delivery failed after all retries.
41    Failed,
42    /// Delivery timed out.
43    Timeout,
44}
45
46impl DeliveryStatus {
47    /// Returns the string representation.
48    #[must_use]
49    pub const fn as_str(&self) -> &'static str {
50        match self {
51            Self::Success => "success",
52            Self::Failed => "failed",
53            Self::Timeout => "timeout",
54        }
55    }
56}
57
58impl std::fmt::Display for DeliveryStatus {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "{}", self.as_str())
61    }
62}
63
64impl std::str::FromStr for DeliveryStatus {
65    type Err = Error;
66
67    fn from_str(s: &str) -> Result<Self> {
68        match s {
69            "success" => Ok(Self::Success),
70            "failed" => Ok(Self::Failed),
71            "timeout" => Ok(Self::Timeout),
72            _ => Err(Error::InvalidInput(format!("Invalid delivery status: {s}"))),
73        }
74    }
75}
76
77/// A webhook delivery audit record.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct DeliveryRecord {
80    /// Unique record ID.
81    pub id: String,
82    /// Webhook name.
83    pub webhook_name: String,
84    /// Event type that triggered the delivery.
85    pub event_type: String,
86    /// Original event ID.
87    pub event_id: String,
88    /// Domain scope (project/user/org).
89    pub domain: String,
90    /// Target URL.
91    pub url: String,
92    /// Delivery status.
93    pub status: DeliveryStatus,
94    /// HTTP status code (if available).
95    pub status_code: Option<i32>,
96    /// Number of delivery attempts.
97    pub attempts: i32,
98    /// Total duration in milliseconds.
99    pub duration_ms: i64,
100    /// Error message (if failed).
101    pub error: Option<String>,
102    /// Unix timestamp of the delivery.
103    pub timestamp: i64,
104}
105
106impl DeliveryRecord {
107    /// Creates a new delivery record.
108    #[must_use]
109    pub fn new(
110        webhook_name: &str,
111        event_type: &str,
112        event_id: &str,
113        domain: &str,
114        url: &str,
115        result: &super::delivery::DeliveryResult,
116    ) -> Self {
117        Self {
118            id: uuid::Uuid::new_v4().to_string(),
119            webhook_name: webhook_name.to_string(),
120            event_type: event_type.to_string(),
121            event_id: event_id.to_string(),
122            domain: domain.to_string(),
123            url: url.to_string(),
124            status: if result.success {
125                DeliveryStatus::Success
126            } else {
127                DeliveryStatus::Failed
128            },
129            status_code: result.status_code.map(i32::from),
130            attempts: i32::try_from(result.attempts).unwrap_or(0),
131            duration_ms: i64::try_from(result.duration_ms).unwrap_or(0),
132            error: result.error.clone(),
133            timestamp: chrono::Utc::now().timestamp(),
134        }
135    }
136}
137
138/// Trait for webhook audit storage backends.
139pub trait WebhookAuditBackend: Send + Sync {
140    /// Stores a delivery record.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the record cannot be stored.
145    fn store(&self, record: &DeliveryRecord) -> Result<()>;
146
147    /// Gets delivery records for a webhook.
148    ///
149    /// # Arguments
150    ///
151    /// * `webhook_name` - Name of the webhook
152    /// * `limit` - Maximum number of records to return
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if the records cannot be retrieved.
157    fn get_history(&self, webhook_name: &str, limit: usize) -> Result<Vec<DeliveryRecord>>;
158
159    /// Exports all records for a domain (GDPR Article 20).
160    ///
161    /// # Arguments
162    ///
163    /// * `domain` - Domain scope to export
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if the records cannot be exported.
168    fn export_domain_logs(&self, domain: &str) -> Result<Vec<DeliveryRecord>>;
169
170    /// Deletes all records for a domain (GDPR Article 17).
171    ///
172    /// # Arguments
173    ///
174    /// * `domain` - Domain scope to delete
175    ///
176    /// # Returns
177    ///
178    /// The number of records deleted.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the records cannot be deleted.
183    fn delete_domain_logs(&self, domain: &str) -> Result<usize>;
184
185    /// Counts records by status for a webhook.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the count cannot be retrieved.
190    fn count_by_status(&self, webhook_name: &str) -> Result<WebhookStats>;
191}
192
193/// Statistics for a webhook.
194#[derive(Debug, Clone, Default, Serialize, Deserialize)]
195pub struct WebhookStats {
196    /// Total number of deliveries.
197    pub total: usize,
198    /// Number of successful deliveries.
199    pub success: usize,
200    /// Number of failed deliveries.
201    pub failed: usize,
202    /// Average duration in milliseconds.
203    pub avg_duration_ms: f64,
204}
205
206/// `SQLite`-backed webhook audit logger.
207pub struct WebhookAuditLogger {
208    /// `SQLite` connection.
209    conn: Mutex<Connection>,
210}
211
212// Mutex guards are held for the duration of database operations, which is correct behavior
213#[allow(clippy::significant_drop_tightening)]
214impl WebhookAuditLogger {
215    /// Creates a new audit logger with the given database path.
216    ///
217    /// # Arguments
218    ///
219    /// * `db_path` - Path to the `SQLite` database file
220    ///
221    /// # Errors
222    ///
223    /// Returns an error if the database cannot be opened or initialized.
224    pub fn new(db_path: &Path) -> Result<Self> {
225        // Ensure parent directory exists
226        if let Some(parent) = db_path.parent() {
227            std::fs::create_dir_all(parent).map_err(|e| Error::OperationFailed {
228                operation: "create_audit_dir".to_string(),
229                cause: e.to_string(),
230            })?;
231        }
232
233        let conn = Connection::open(db_path).map_err(|e| Error::OperationFailed {
234            operation: "open_audit_db".to_string(),
235            cause: e.to_string(),
236        })?;
237
238        // Enable WAL mode for better concurrency
239        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")
240            .map_err(|e| Error::OperationFailed {
241                operation: "configure_audit_db".to_string(),
242                cause: e.to_string(),
243            })?;
244
245        let logger = Self {
246            conn: Mutex::new(conn),
247        };
248
249        logger.create_schema()?;
250
251        Ok(logger)
252    }
253
254    /// Creates an in-memory audit logger for testing.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the database cannot be initialized.
259    #[cfg(test)]
260    pub fn in_memory() -> Result<Self> {
261        let conn = Connection::open_in_memory().map_err(|e| Error::OperationFailed {
262            operation: "open_memory_db".to_string(),
263            cause: e.to_string(),
264        })?;
265
266        let logger = Self {
267            conn: Mutex::new(conn),
268        };
269
270        logger.create_schema()?;
271
272        Ok(logger)
273    }
274
275    /// Creates the database schema.
276    fn create_schema(&self) -> Result<()> {
277        let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
278            operation: "lock_audit_db".to_string(),
279            cause: e.to_string(),
280        })?;
281
282        conn.execute_batch(
283            r"
284            CREATE TABLE IF NOT EXISTS webhook_deliveries (
285                id TEXT PRIMARY KEY,
286                webhook_name TEXT NOT NULL,
287                event_type TEXT NOT NULL,
288                event_id TEXT NOT NULL,
289                domain TEXT NOT NULL,
290                url TEXT NOT NULL,
291                status TEXT NOT NULL,
292                status_code INTEGER,
293                attempts INTEGER NOT NULL,
294                duration_ms INTEGER NOT NULL,
295                error TEXT,
296                timestamp INTEGER NOT NULL
297            );
298
299            CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_webhook_name
300                ON webhook_deliveries(webhook_name);
301            CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_domain
302                ON webhook_deliveries(domain);
303            CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_timestamp
304                ON webhook_deliveries(timestamp);
305            CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_event_id
306                ON webhook_deliveries(event_id);
307            ",
308        )
309        .map_err(|e| Error::OperationFailed {
310            operation: "create_audit_schema".to_string(),
311            cause: e.to_string(),
312        })?;
313
314        Ok(())
315    }
316
317    /// Records a delivery result.
318    ///
319    /// # Arguments
320    ///
321    /// * `webhook_name` - Name of the webhook
322    /// * `event_type` - Type of event
323    /// * `event_id` - ID of the event
324    /// * `domain` - Domain scope
325    /// * `url` - Target URL
326    /// * `result` - Delivery result
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the record cannot be stored.
331    pub fn log_delivery(
332        &self,
333        webhook_name: &str,
334        event_type: &str,
335        event_id: &str,
336        domain: &str,
337        url: &str,
338        result: &super::delivery::DeliveryResult,
339    ) -> Result<()> {
340        let record = DeliveryRecord::new(webhook_name, event_type, event_id, domain, url, result);
341        self.store(&record)
342    }
343}
344
345// Mutex guards are held for the duration of database operations, which is correct behavior
346#[allow(clippy::significant_drop_tightening)]
347impl WebhookAuditBackend for WebhookAuditLogger {
348    fn store(&self, record: &DeliveryRecord) -> Result<()> {
349        let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
350            operation: "lock_audit_db".to_string(),
351            cause: e.to_string(),
352        })?;
353
354        conn.execute(
355            r"
356            INSERT INTO webhook_deliveries
357                (id, webhook_name, event_type, event_id, domain, url, status,
358                 status_code, attempts, duration_ms, error, timestamp)
359            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
360            ",
361            params![
362                record.id,
363                record.webhook_name,
364                record.event_type,
365                record.event_id,
366                record.domain,
367                record.url,
368                record.status.as_str(),
369                record.status_code,
370                record.attempts,
371                record.duration_ms,
372                record.error,
373                record.timestamp,
374            ],
375        )
376        .map_err(|e| Error::OperationFailed {
377            operation: "store_delivery_record".to_string(),
378            cause: e.to_string(),
379        })?;
380
381        Ok(())
382    }
383
384    fn get_history(&self, webhook_name: &str, limit: usize) -> Result<Vec<DeliveryRecord>> {
385        let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
386            operation: "lock_audit_db".to_string(),
387            cause: e.to_string(),
388        })?;
389
390        let mut stmt = conn
391            .prepare(
392                r"
393                SELECT id, webhook_name, event_type, event_id, domain, url, status,
394                       status_code, attempts, duration_ms, error, timestamp
395                FROM webhook_deliveries
396                WHERE webhook_name = ?1
397                ORDER BY timestamp DESC
398                LIMIT ?2
399                ",
400            )
401            .map_err(|e| Error::OperationFailed {
402                operation: "prepare_history_query".to_string(),
403                cause: e.to_string(),
404            })?;
405
406        let limit_i64 = i64::try_from(limit).unwrap_or(i64::MAX);
407        let records = stmt
408            .query_map(params![webhook_name, limit_i64], |row| {
409                Ok(DeliveryRecord {
410                    id: row.get(0)?,
411                    webhook_name: row.get(1)?,
412                    event_type: row.get(2)?,
413                    event_id: row.get(3)?,
414                    domain: row.get(4)?,
415                    url: row.get(5)?,
416                    status: row
417                        .get::<_, String>(6)?
418                        .parse()
419                        .unwrap_or(DeliveryStatus::Failed),
420                    status_code: row.get(7)?,
421                    attempts: row.get(8)?,
422                    duration_ms: row.get(9)?,
423                    error: row.get(10)?,
424                    timestamp: row.get(11)?,
425                })
426            })
427            .map_err(|e| Error::OperationFailed {
428                operation: "query_history".to_string(),
429                cause: e.to_string(),
430            })?
431            .filter_map(std::result::Result::ok)
432            .collect();
433
434        Ok(records)
435    }
436
437    fn export_domain_logs(&self, domain: &str) -> Result<Vec<DeliveryRecord>> {
438        let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
439            operation: "lock_audit_db".to_string(),
440            cause: e.to_string(),
441        })?;
442
443        let mut stmt = conn
444            .prepare(
445                r"
446                SELECT id, webhook_name, event_type, event_id, domain, url, status,
447                       status_code, attempts, duration_ms, error, timestamp
448                FROM webhook_deliveries
449                WHERE domain = ?1
450                ORDER BY timestamp DESC
451                ",
452            )
453            .map_err(|e| Error::OperationFailed {
454                operation: "prepare_export_query".to_string(),
455                cause: e.to_string(),
456            })?;
457
458        let records = stmt
459            .query_map(params![domain], |row| {
460                Ok(DeliveryRecord {
461                    id: row.get(0)?,
462                    webhook_name: row.get(1)?,
463                    event_type: row.get(2)?,
464                    event_id: row.get(3)?,
465                    domain: row.get(4)?,
466                    url: row.get(5)?,
467                    status: row
468                        .get::<_, String>(6)?
469                        .parse()
470                        .unwrap_or(DeliveryStatus::Failed),
471                    status_code: row.get(7)?,
472                    attempts: row.get(8)?,
473                    duration_ms: row.get(9)?,
474                    error: row.get(10)?,
475                    timestamp: row.get(11)?,
476                })
477            })
478            .map_err(|e| Error::OperationFailed {
479                operation: "export_logs".to_string(),
480                cause: e.to_string(),
481            })?
482            .filter_map(std::result::Result::ok)
483            .collect();
484
485        Ok(records)
486    }
487
488    fn delete_domain_logs(&self, domain: &str) -> Result<usize> {
489        let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
490            operation: "lock_audit_db".to_string(),
491            cause: e.to_string(),
492        })?;
493
494        let count = conn
495            .execute(
496                "DELETE FROM webhook_deliveries WHERE domain = ?1",
497                params![domain],
498            )
499            .map_err(|e| Error::OperationFailed {
500                operation: "delete_domain_logs".to_string(),
501                cause: e.to_string(),
502            })?;
503
504        Ok(count)
505    }
506
507    fn count_by_status(&self, webhook_name: &str) -> Result<WebhookStats> {
508        let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
509            operation: "lock_audit_db".to_string(),
510            cause: e.to_string(),
511        })?;
512
513        let mut stmt = conn
514            .prepare(
515                r"
516                SELECT
517                    COUNT(*) as total,
518                    SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success,
519                    SUM(CASE WHEN status != 'success' THEN 1 ELSE 0 END) as failed,
520                    AVG(duration_ms) as avg_duration
521                FROM webhook_deliveries
522                WHERE webhook_name = ?1
523                ",
524            )
525            .map_err(|e| Error::OperationFailed {
526                operation: "prepare_stats_query".to_string(),
527                cause: e.to_string(),
528            })?;
529
530        let stats = stmt
531            .query_row(params![webhook_name], |row| {
532                Ok(WebhookStats {
533                    total: usize::try_from(row.get::<_, i64>(0).unwrap_or(0)).unwrap_or(0),
534                    success: usize::try_from(row.get::<_, i64>(1).unwrap_or(0)).unwrap_or(0),
535                    failed: usize::try_from(row.get::<_, i64>(2).unwrap_or(0)).unwrap_or(0),
536                    avg_duration_ms: row.get::<_, f64>(3).unwrap_or(0.0),
537                })
538            })
539            .map_err(|e| Error::OperationFailed {
540                operation: "query_stats".to_string(),
541                cause: e.to_string(),
542            })?;
543
544        Ok(stats)
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551    use crate::webhooks::delivery::DeliveryResult;
552
553    #[test]
554    fn test_audit_logger_creation() {
555        let logger = WebhookAuditLogger::in_memory().expect("create logger");
556        let stats = logger.count_by_status("test").expect("get stats");
557        assert_eq!(stats.total, 0);
558    }
559
560    #[test]
561    fn test_store_and_retrieve() {
562        let logger = WebhookAuditLogger::in_memory().expect("create logger");
563
564        let result = DeliveryResult::success(200, 1, 100);
565        logger
566            .log_delivery(
567                "test-webhook",
568                "captured",
569                "event-123",
570                "project",
571                "https://example.com",
572                &result,
573            )
574            .expect("log delivery");
575
576        let history = logger.get_history("test-webhook", 10).expect("get history");
577        assert_eq!(history.len(), 1);
578        assert_eq!(history[0].webhook_name, "test-webhook");
579        assert_eq!(history[0].status, DeliveryStatus::Success);
580    }
581
582    #[test]
583    fn test_export_domain_logs() {
584        let logger = WebhookAuditLogger::in_memory().expect("create logger");
585
586        // Add records for different domains
587        let result = DeliveryResult::success(200, 1, 100);
588        logger
589            .log_delivery(
590                "webhook-1",
591                "captured",
592                "e1",
593                "project",
594                "https://a.com",
595                &result,
596            )
597            .expect("log 1");
598        logger
599            .log_delivery(
600                "webhook-2",
601                "deleted",
602                "e2",
603                "user",
604                "https://b.com",
605                &result,
606            )
607            .expect("log 2");
608        logger
609            .log_delivery(
610                "webhook-3",
611                "updated",
612                "e3",
613                "project",
614                "https://c.com",
615                &result,
616            )
617            .expect("log 3");
618
619        let project_logs = logger.export_domain_logs("project").expect("export");
620        assert_eq!(project_logs.len(), 2);
621
622        let user_logs = logger.export_domain_logs("user").expect("export");
623        assert_eq!(user_logs.len(), 1);
624    }
625
626    #[test]
627    fn test_delete_domain_logs() {
628        let logger = WebhookAuditLogger::in_memory().expect("create logger");
629
630        let result = DeliveryResult::success(200, 1, 100);
631        logger
632            .log_delivery(
633                "webhook-1",
634                "captured",
635                "e1",
636                "project",
637                "https://a.com",
638                &result,
639            )
640            .expect("log 1");
641        logger
642            .log_delivery(
643                "webhook-2",
644                "captured",
645                "e2",
646                "project",
647                "https://b.com",
648                &result,
649            )
650            .expect("log 2");
651        logger
652            .log_delivery(
653                "webhook-3",
654                "captured",
655                "e3",
656                "user",
657                "https://c.com",
658                &result,
659            )
660            .expect("log 3");
661
662        let deleted = logger.delete_domain_logs("project").expect("delete");
663        assert_eq!(deleted, 2);
664
665        let project_logs = logger.export_domain_logs("project").expect("export");
666        assert_eq!(project_logs.len(), 0);
667
668        // User logs should still exist
669        let user_logs = logger.export_domain_logs("user").expect("export");
670        assert_eq!(user_logs.len(), 1);
671    }
672
673    #[test]
674    fn test_count_by_status() {
675        let logger = WebhookAuditLogger::in_memory().expect("create logger");
676
677        let success = DeliveryResult::success(200, 1, 100);
678        let failure = DeliveryResult::failure("error".to_string(), 3, 5000);
679
680        logger
681            .log_delivery(
682                "webhook",
683                "captured",
684                "e1",
685                "project",
686                "https://a.com",
687                &success,
688            )
689            .expect("log 1");
690        logger
691            .log_delivery(
692                "webhook",
693                "captured",
694                "e2",
695                "project",
696                "https://a.com",
697                &success,
698            )
699            .expect("log 2");
700        logger
701            .log_delivery(
702                "webhook",
703                "captured",
704                "e3",
705                "project",
706                "https://a.com",
707                &failure,
708            )
709            .expect("log 3");
710
711        let stats = logger.count_by_status("webhook").expect("stats");
712        assert_eq!(stats.total, 3);
713        assert_eq!(stats.success, 2);
714        assert_eq!(stats.failed, 1);
715    }
716}