subcog/gc/
retention.rs

1//! Retention policy garbage collector implementation.
2//!
3//! Identifies and tombstones memories that have exceeded their retention period.
4//!
5//! # Configuration
6//!
7//! Retention can be configured via:
8//! - Environment variable: `SUBCOG_RETENTION_DAYS` (default: 365)
9//! - Config file: `[gc] retention_days = 365`
10//! - Per-namespace overrides: `[gc.retention] decisions = 730`
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use subcog::gc::{RetentionConfig, RetentionGarbageCollector};
16//! use subcog::storage::index::SqliteBackend;
17//! use std::sync::Arc;
18//!
19//! // Load retention config from environment
20//! let config = RetentionConfig::from_env();
21//! assert_eq!(config.default_days, 365);
22//!
23//! // Create retention GC with index backend
24//! let backend = Arc::new(SqliteBackend::new("memories.db")?);
25//! let gc = RetentionGarbageCollector::new(backend, config);
26//!
27//! // Dry run to see what would be cleaned up
28//! let result = gc.gc_expired_memories(true)?;
29//! println!("Would tombstone {} expired memories", result.memories_tombstoned);
30//!
31//! // Actually perform the cleanup
32//! let result = gc.gc_expired_memories(false)?;
33//! println!("Tombstoned {} memories", result.memories_tombstoned);
34//! ```
35
36use crate::Result;
37use crate::models::{Namespace, SearchFilter};
38use crate::storage::traits::IndexBackend;
39use chrono::{TimeZone, Utc};
40use std::collections::HashMap;
41use std::sync::Arc;
42use std::time::{Duration, Instant};
43use tracing::{debug, info, info_span, instrument, warn};
44
45/// Environment variable for default retention period in days.
46pub const RETENTION_DAYS_ENV: &str = "SUBCOG_RETENTION_DAYS";
47
48/// Default retention period in days (1 year).
49pub const DEFAULT_RETENTION_DAYS: u32 = 365;
50
51/// Safely converts Duration to milliseconds as u64, capping at `u64::MAX`.
52#[inline]
53fn duration_to_millis(duration: Duration) -> u64 {
54    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
55}
56
57/// Converts usize to f64 for metrics, capping at `u32::MAX`.
58#[inline]
59fn usize_to_f64(value: usize) -> f64 {
60    let capped = u32::try_from(value).unwrap_or(u32::MAX);
61    f64::from(capped)
62}
63
64/// Converts u64 to f64 for metrics, capping at `u32::MAX`.
65#[inline]
66fn u64_to_f64(value: u64) -> f64 {
67    let capped = u32::try_from(value).unwrap_or(u32::MAX);
68    f64::from(capped)
69}
70
71/// Returns the configured retention period in days.
72///
73/// Reads from `SUBCOG_RETENTION_DAYS` environment variable, defaulting to 365.
74#[must_use]
75pub fn retention_days() -> u32 {
76    std::env::var(RETENTION_DAYS_ENV)
77        .ok()
78        .and_then(|v| v.parse().ok())
79        .unwrap_or(DEFAULT_RETENTION_DAYS)
80}
81
82/// Retention policy configuration.
83///
84/// Supports a default retention period and per-namespace overrides.
85#[derive(Debug, Clone)]
86pub struct RetentionConfig {
87    /// Default retention period in days.
88    pub default_days: u32,
89
90    /// Per-namespace retention overrides.
91    ///
92    /// Namespaces not in this map use `default_days`.
93    pub namespace_days: HashMap<Namespace, u32>,
94
95    /// Minimum retention period in days (cannot go below this).
96    ///
97    /// Provides a safety floor to prevent accidental data loss.
98    pub minimum_days: u32,
99
100    /// Maximum memories to process in a single GC run.
101    ///
102    /// Prevents long-running GC operations.
103    pub batch_limit: usize,
104}
105
106impl Default for RetentionConfig {
107    fn default() -> Self {
108        Self {
109            default_days: DEFAULT_RETENTION_DAYS,
110            namespace_days: HashMap::new(),
111            minimum_days: 30,   // At least 30 days
112            batch_limit: 10000, // Process up to 10k memories per run
113        }
114    }
115}
116
117impl RetentionConfig {
118    /// Creates a new retention config with default values.
119    #[must_use]
120    pub fn new() -> Self {
121        Self::default()
122    }
123
124    /// Creates a retention config from environment variables.
125    ///
126    /// Reads:
127    /// - `SUBCOG_RETENTION_DAYS`: Default retention period
128    /// - `SUBCOG_RETENTION_MIN_DAYS`: Minimum retention period
129    /// - `SUBCOG_RETENTION_BATCH_LIMIT`: Batch limit for GC runs
130    /// - `SUBCOG_RETENTION_<NAMESPACE>_DAYS`: Per-namespace overrides
131    #[must_use]
132    pub fn from_env() -> Self {
133        let mut config = Self::default();
134
135        // Default retention
136        if let Some(d) = std::env::var(RETENTION_DAYS_ENV)
137            .ok()
138            .and_then(|days| days.parse::<u32>().ok())
139        {
140            config.default_days = d;
141        }
142
143        // Minimum retention
144        if let Some(d) = std::env::var("SUBCOG_RETENTION_MIN_DAYS")
145            .ok()
146            .and_then(|days| days.parse::<u32>().ok())
147        {
148            config.minimum_days = d;
149        }
150
151        // Batch limit
152        if let Some(l) = std::env::var("SUBCOG_RETENTION_BATCH_LIMIT")
153            .ok()
154            .and_then(|limit| limit.parse::<usize>().ok())
155        {
156            config.batch_limit = l;
157        }
158
159        // Per-namespace overrides
160        for ns in Namespace::all().iter().copied() {
161            let env_key = format!(
162                "SUBCOG_RETENTION_{}_DAYS",
163                ns.as_str().to_uppercase().replace('-', "_")
164            );
165            if let Some(d) = std::env::var(&env_key)
166                .ok()
167                .and_then(|days| days.parse::<u32>().ok())
168            {
169                config.namespace_days.insert(ns, d);
170            }
171        }
172
173        config
174    }
175
176    /// Sets the default retention period.
177    #[must_use]
178    pub const fn with_default_days(mut self, days: u32) -> Self {
179        self.default_days = days;
180        self
181    }
182
183    /// Sets the minimum retention period.
184    #[must_use]
185    pub const fn with_minimum_days(mut self, days: u32) -> Self {
186        self.minimum_days = days;
187        self
188    }
189
190    /// Sets the batch limit.
191    #[must_use]
192    pub const fn with_batch_limit(mut self, limit: usize) -> Self {
193        self.batch_limit = limit;
194        self
195    }
196
197    /// Sets a per-namespace retention override.
198    #[must_use]
199    pub fn with_namespace_days(mut self, namespace: Namespace, days: u32) -> Self {
200        self.namespace_days.insert(namespace, days);
201        self
202    }
203
204    /// Gets the effective retention period for a namespace.
205    ///
206    /// Returns the namespace-specific override if set, otherwise the default.
207    /// The result is clamped to be at least `minimum_days`.
208    #[must_use]
209    pub fn effective_days(&self, namespace: Namespace) -> u32 {
210        let days = self
211            .namespace_days
212            .get(&namespace)
213            .copied()
214            .unwrap_or(self.default_days);
215
216        // Enforce minimum retention
217        days.max(self.minimum_days)
218    }
219
220    /// Returns the cutoff timestamp for expired memories in a namespace.
221    ///
222    /// Memories with `created_at` before this timestamp are considered expired.
223    #[must_use]
224    pub fn cutoff_timestamp(&self, namespace: Namespace) -> u64 {
225        let days = self.effective_days(namespace);
226        let now = crate::current_timestamp();
227        let seconds_per_day: u64 = 86400;
228        now.saturating_sub(u64::from(days) * seconds_per_day)
229    }
230}
231
232/// Result of a retention garbage collection operation.
233#[derive(Debug, Clone, Default)]
234pub struct RetentionGcResult {
235    /// Total number of memories checked.
236    pub memories_checked: usize,
237
238    /// Number of memories that were (or would be) tombstoned.
239    pub memories_tombstoned: usize,
240
241    /// Breakdown of tombstoned memories by namespace.
242    pub by_namespace: HashMap<String, usize>,
243
244    /// Whether this was a dry run (no actual changes made).
245    pub dry_run: bool,
246
247    /// Duration of the GC operation in milliseconds.
248    pub duration_ms: u64,
249}
250
251impl RetentionGcResult {
252    /// Returns `true` if any memories were tombstoned.
253    #[must_use]
254    pub const fn has_expired_memories(&self) -> bool {
255        self.memories_tombstoned > 0
256    }
257
258    /// Returns a human-readable summary of the GC result.
259    #[must_use]
260    pub fn summary(&self) -> String {
261        let action = if self.dry_run {
262            "would tombstone"
263        } else {
264            "tombstoned"
265        };
266
267        if self.memories_tombstoned == 0 {
268            format!(
269                "No expired memories found ({} memories checked in {}ms)",
270                self.memories_checked, self.duration_ms
271            )
272        } else {
273            let ns_breakdown: Vec<String> = self
274                .by_namespace
275                .iter()
276                .map(|(ns, count)| format!("{ns}: {count}"))
277                .collect();
278
279            format!(
280                "{} {} expired memories ({}) - checked {} in {}ms",
281                action,
282                self.memories_tombstoned,
283                ns_breakdown.join(", "),
284                self.memories_checked,
285                self.duration_ms
286            )
287        }
288    }
289}
290
291/// Garbage collector for expired memories based on retention policy.
292///
293/// Identifies memories that have exceeded their retention period and marks
294/// them as tombstoned. Supports per-namespace retention policies.
295///
296/// # Thread Safety
297///
298/// The garbage collector holds an `Arc` reference to the index backend,
299/// making it safe to share across threads.
300pub struct RetentionGarbageCollector<I: IndexBackend> {
301    /// Reference to the index backend for querying and updating memories.
302    index: Arc<I>,
303
304    /// Retention policy configuration.
305    config: RetentionConfig,
306}
307
308impl<I: IndexBackend> RetentionGarbageCollector<I> {
309    /// Creates a new retention garbage collector.
310    ///
311    /// # Arguments
312    ///
313    /// * `index` - Shared reference to the index backend.
314    /// * `config` - Retention policy configuration.
315    #[must_use]
316    pub fn new(index: Arc<I>, config: RetentionConfig) -> Self {
317        // Arc::strong_count prevents clippy::missing_const_for_fn false positive
318        let _ = Arc::strong_count(&index);
319        Self { index, config }
320    }
321
322    /// Performs garbage collection on expired memories.
323    ///
324    /// This method:
325    /// 1. Iterates through all namespaces
326    /// 2. For each namespace, calculates the retention cutoff
327    /// 3. Queries for memories older than the cutoff
328    /// 4. Tombstones expired memories (unless `dry_run`)
329    ///
330    /// # Arguments
331    ///
332    /// * `dry_run` - If true, only report what would be done without making changes
333    ///
334    /// # Returns
335    ///
336    /// A `RetentionGcResult` containing statistics about the operation.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if index backend operations fail.
341    #[instrument(
342        name = "subcog.gc.retention",
343        skip(self),
344        fields(
345            request_id = tracing::field::Empty,
346            component = "gc",
347            operation = "retention",
348            dry_run = dry_run,
349            default_retention_days = self.config.default_days
350        )
351    )]
352    pub fn gc_expired_memories(&self, dry_run: bool) -> Result<RetentionGcResult> {
353        let start = Instant::now();
354        if let Some(request_id) = crate::observability::current_request_id() {
355            tracing::Span::current().record("request_id", request_id.as_str());
356        }
357        let mut result = RetentionGcResult {
358            dry_run,
359            ..Default::default()
360        };
361
362        let now = crate::current_timestamp();
363
364        // Process each namespace with its specific retention policy
365        for namespace in Namespace::user_namespaces().iter().copied() {
366            let cutoff = self.config.cutoff_timestamp(namespace);
367            let retention_days = self.config.effective_days(namespace);
368            let _span = info_span!(
369                "subcog.gc.retention.namespace",
370                namespace = %namespace.as_str(),
371                retention_days = retention_days
372            )
373            .entered();
374
375            debug!(
376                namespace = namespace.as_str(),
377                retention_days, cutoff, "Processing namespace for retention GC"
378            );
379
380            let count = self.process_namespace(namespace, cutoff, now, dry_run, &mut result)?;
381
382            if count > 0 {
383                result
384                    .by_namespace
385                    .insert(namespace.as_str().to_string(), count);
386            }
387        }
388
389        result.duration_ms = duration_to_millis(start.elapsed());
390
391        // Record metrics
392        metrics::counter!(
393            "gc_retention_runs_total",
394            "dry_run" => dry_run.to_string()
395        )
396        .increment(1);
397        metrics::gauge!("gc_retention_tombstoned").set(usize_to_f64(result.memories_tombstoned));
398        metrics::histogram!("gc_retention_duration_ms").record(u64_to_f64(result.duration_ms));
399        metrics::histogram!(
400            "memory_lifecycle_duration_ms",
401            "component" => "gc",
402            "operation" => "retention"
403        )
404        .record(u64_to_f64(result.duration_ms));
405
406        info!(
407            memories_checked = result.memories_checked,
408            memories_tombstoned = result.memories_tombstoned,
409            duration_ms = result.duration_ms,
410            dry_run,
411            "Retention GC completed"
412        );
413
414        Ok(result)
415    }
416
417    /// Processes a single namespace for expired memories.
418    fn process_namespace(
419        &self,
420        namespace: Namespace,
421        cutoff: u64,
422        now: u64,
423        dry_run: bool,
424        result: &mut RetentionGcResult,
425    ) -> Result<usize> {
426        let filter = SearchFilter::new()
427            .with_namespace(namespace)
428            .with_include_tombstoned(false);
429
430        let memories = self.index.list_all(&filter, self.config.batch_limit)?;
431        let mut tombstoned = 0;
432
433        for (id, _score) in memories {
434            result.memories_checked += 1;
435
436            // Get the full memory to check created_at
437            let Some(memory) = self.index.get_memory(&id)? else {
438                continue;
439            };
440
441            // Check if memory has expired
442            if memory.created_at >= cutoff {
443                continue;
444            }
445
446            // Memory has expired
447            if dry_run {
448                tombstoned += 1;
449                continue;
450            }
451
452            // Tombstone the memory
453            let mut updated = memory.clone();
454            let now_i64 = i64::try_from(now).unwrap_or(i64::MAX);
455            let now_dt = Utc
456                .timestamp_opt(now_i64, 0)
457                .single()
458                .unwrap_or_else(Utc::now);
459            updated.tombstoned_at = Some(now_dt);
460
461            let Err(e) = self.index.index(&updated) else {
462                tombstoned += 1;
463                continue;
464            };
465
466            warn!(
467                memory_id = %id.as_str(),
468                error = %e,
469                "Failed to tombstone expired memory"
470            );
471        }
472
473        result.memories_tombstoned += tombstoned;
474        Ok(tombstoned)
475    }
476
477    /// Returns the current retention configuration.
478    #[must_use]
479    pub const fn config(&self) -> &RetentionConfig {
480        &self.config
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use crate::models::{Domain, Memory, MemoryId, MemoryStatus};
488    use crate::storage::index::SqliteBackend;
489
490    fn create_test_memory(id: &str, namespace: Namespace, created_at: u64) -> Memory {
491        Memory {
492            id: MemoryId::new(id),
493            content: format!("Test memory {id}"),
494            namespace,
495            domain: Domain::new(),
496            project_id: None,
497            branch: None,
498            file_path: None,
499            status: MemoryStatus::Active,
500            created_at,
501            updated_at: created_at,
502            tombstoned_at: None,
503            expires_at: None,
504            embedding: None,
505            tags: vec!["test".to_string()],
506            #[cfg(feature = "group-scope")]
507            group_id: None,
508            source: None,
509            is_summary: false,
510            source_memory_ids: None,
511            consolidation_timestamp: None,
512        }
513    }
514
515    #[test]
516    fn test_retention_config_default() {
517        let config = RetentionConfig::default();
518        assert_eq!(config.default_days, 365);
519        assert_eq!(config.minimum_days, 30);
520        assert_eq!(config.batch_limit, 10000);
521    }
522
523    #[test]
524    fn test_retention_config_builders() {
525        let config = RetentionConfig::new()
526            .with_default_days(180)
527            .with_minimum_days(7)
528            .with_batch_limit(5000)
529            .with_namespace_days(Namespace::Decisions, 730);
530
531        assert_eq!(config.default_days, 180);
532        assert_eq!(config.minimum_days, 7);
533        assert_eq!(config.batch_limit, 5000);
534        assert_eq!(config.namespace_days.get(&Namespace::Decisions), Some(&730));
535    }
536
537    #[test]
538    fn test_effective_days_with_override() {
539        let config = RetentionConfig::new()
540            .with_default_days(365)
541            .with_namespace_days(Namespace::Decisions, 730);
542
543        // Namespace with override
544        assert_eq!(config.effective_days(Namespace::Decisions), 730);
545
546        // Namespace without override uses default
547        assert_eq!(config.effective_days(Namespace::Learnings), 365);
548    }
549
550    #[test]
551    fn test_effective_days_minimum_enforced() {
552        let config = RetentionConfig::new()
553            .with_default_days(10) // Below minimum
554            .with_minimum_days(30);
555
556        // Should be clamped to minimum
557        assert_eq!(config.effective_days(Namespace::Patterns), 30);
558    }
559
560    #[test]
561    fn test_cutoff_timestamp() {
562        let config = RetentionConfig::new().with_default_days(30);
563
564        let cutoff = config.cutoff_timestamp(Namespace::Decisions);
565        let now = crate::current_timestamp();
566        let expected = now - (30 * 86400);
567
568        // Allow 1 second tolerance for test timing
569        assert!(cutoff.abs_diff(expected) <= 1);
570    }
571
572    #[test]
573    fn test_retention_gc_result_summary_no_expired() {
574        let result = RetentionGcResult {
575            memories_checked: 100,
576            memories_tombstoned: 0,
577            by_namespace: HashMap::new(),
578            dry_run: false,
579            duration_ms: 50,
580        };
581
582        assert!(!result.has_expired_memories());
583        assert!(result.summary().contains("No expired memories"));
584        assert!(result.summary().contains("100 memories checked"));
585    }
586
587    #[test]
588    fn test_retention_gc_result_summary_with_expired() {
589        let mut by_namespace = HashMap::new();
590        by_namespace.insert("decisions".to_string(), 5);
591        by_namespace.insert("learnings".to_string(), 3);
592
593        let result = RetentionGcResult {
594            memories_checked: 100,
595            memories_tombstoned: 8,
596            by_namespace,
597            dry_run: false,
598            duration_ms: 75,
599        };
600
601        assert!(result.has_expired_memories());
602        let summary = result.summary();
603        assert!(summary.contains("tombstoned 8 expired memories"));
604    }
605
606    #[test]
607    fn test_retention_gc_result_summary_dry_run() {
608        let mut by_namespace = HashMap::new();
609        by_namespace.insert("decisions".to_string(), 5);
610
611        let result = RetentionGcResult {
612            memories_checked: 50,
613            memories_tombstoned: 5,
614            by_namespace,
615            dry_run: true,
616            duration_ms: 25,
617        };
618
619        let summary = result.summary();
620        assert!(summary.contains("would tombstone"));
621    }
622
623    #[test]
624    fn test_gc_no_expired_memories() {
625        let backend = Arc::new(SqliteBackend::in_memory().expect("Failed to create backend"));
626
627        // Create a recent memory
628        let now = crate::current_timestamp();
629        let memory = create_test_memory("mem1", Namespace::Decisions, now);
630        backend.index(&memory).expect("Failed to index memory");
631
632        let config = RetentionConfig::new().with_default_days(30);
633        let gc = RetentionGarbageCollector::new(Arc::clone(&backend), config);
634
635        let result = gc.gc_expired_memories(false).expect("GC should succeed");
636
637        assert!(!result.has_expired_memories());
638        assert_eq!(result.memories_checked, 1);
639        assert_eq!(result.memories_tombstoned, 0);
640    }
641
642    #[test]
643    fn test_gc_expired_memory_dry_run() {
644        let backend = Arc::new(SqliteBackend::in_memory().expect("Failed to create backend"));
645
646        // Create an old memory (400 days ago)
647        let now = crate::current_timestamp();
648        let old_timestamp = now - (400 * 86400);
649        let memory = create_test_memory("mem1", Namespace::Decisions, old_timestamp);
650        backend.index(&memory).expect("Failed to index memory");
651
652        let config = RetentionConfig::new().with_default_days(365);
653        let gc = RetentionGarbageCollector::new(Arc::clone(&backend), config);
654
655        let result = gc.gc_expired_memories(true).expect("GC should succeed");
656
657        assert!(result.has_expired_memories());
658        assert_eq!(result.memories_tombstoned, 1);
659        assert!(result.dry_run);
660
661        // Memory should NOT be tombstoned in dry run
662        let memory = backend
663            .get_memory(&MemoryId::new("mem1"))
664            .expect("Failed to get memory")
665            .expect("Memory should exist");
666        assert!(memory.tombstoned_at.is_none());
667    }
668
669    #[test]
670    fn test_gc_expired_memory_actual() {
671        let backend = Arc::new(SqliteBackend::in_memory().expect("Failed to create backend"));
672
673        // Create an old memory (400 days ago)
674        let now = crate::current_timestamp();
675        let old_timestamp = now - (400 * 86400);
676        let memory = create_test_memory("mem1", Namespace::Decisions, old_timestamp);
677        backend.index(&memory).expect("Failed to index memory");
678
679        let config = RetentionConfig::new().with_default_days(365);
680        let gc = RetentionGarbageCollector::new(Arc::clone(&backend), config);
681
682        let result = gc.gc_expired_memories(false).expect("GC should succeed");
683
684        assert!(result.has_expired_memories());
685        assert_eq!(result.memories_tombstoned, 1);
686        assert!(!result.dry_run);
687
688        // Memory SHOULD be tombstoned
689        let memory = backend
690            .get_memory(&MemoryId::new("mem1"))
691            .expect("Failed to get memory")
692            .expect("Memory should exist");
693        assert!(memory.tombstoned_at.is_some());
694    }
695
696    #[test]
697    fn test_gc_per_namespace_retention() {
698        let backend = Arc::new(SqliteBackend::in_memory().expect("Failed to create backend"));
699
700        let now = crate::current_timestamp();
701
702        // Create a memory 100 days old in decisions
703        let decisions_mem =
704            create_test_memory("decisions1", Namespace::Decisions, now - (100 * 86400));
705        backend
706            .index(&decisions_mem)
707            .expect("Failed to index memory");
708
709        // Create a memory 100 days old in learnings
710        let learnings_mem =
711            create_test_memory("learnings1", Namespace::Learnings, now - (100 * 86400));
712        backend
713            .index(&learnings_mem)
714            .expect("Failed to index memory");
715
716        // Config: decisions retained 730 days, learnings 90 days
717        let config = RetentionConfig::new()
718            .with_default_days(90)
719            .with_namespace_days(Namespace::Decisions, 730);
720
721        let gc = RetentionGarbageCollector::new(Arc::clone(&backend), config);
722
723        let result = gc.gc_expired_memories(false).expect("GC should succeed");
724
725        // Only learnings should be expired (100 > 90)
726        // Decisions should NOT be expired (100 < 730)
727        assert_eq!(result.memories_tombstoned, 1);
728
729        // Verify decisions is not tombstoned
730        let decisions = backend
731            .get_memory(&MemoryId::new("decisions1"))
732            .expect("Failed to get memory")
733            .expect("Memory should exist");
734        assert!(decisions.tombstoned_at.is_none());
735
736        // Verify learnings is tombstoned
737        let learnings = backend
738            .get_memory(&MemoryId::new("learnings1"))
739            .expect("Failed to get memory")
740            .expect("Memory should exist");
741        assert!(learnings.tombstoned_at.is_some());
742    }
743
744    #[test]
745    fn test_retention_days_from_env() {
746        // Default when env not set
747        let days = retention_days();
748        assert_eq!(days, DEFAULT_RETENTION_DAYS);
749    }
750}