1use crate::Result;
51use crate::models::SearchFilter;
52use crate::storage::traits::IndexBackend;
53use chrono::{TimeZone, Utc};
54use std::sync::Arc;
55use std::time::{Duration, Instant};
56use tracing::{debug, info, instrument, warn};
57
58pub const EXPIRATION_CLEANUP_PROBABILITY_ENV: &str = "SUBCOG_EXPIRATION_CLEANUP_PROBABILITY";
60
61pub const DEFAULT_CLEANUP_PROBABILITY: f64 = 0.05;
63
64#[inline]
66fn duration_to_millis(duration: Duration) -> u64 {
67 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
68}
69
70#[inline]
72fn usize_to_f64(value: usize) -> f64 {
73 let capped = u32::try_from(value).unwrap_or(u32::MAX);
74 f64::from(capped)
75}
76
77#[inline]
79fn u64_to_f64(value: u64) -> f64 {
80 let capped = u32::try_from(value).unwrap_or(u32::MAX);
81 f64::from(capped)
82}
83
84#[derive(Debug, Clone)]
86pub struct ExpirationConfig {
87 pub batch_limit: usize,
91
92 pub cleanup_probability: f64,
97}
98
99impl Default for ExpirationConfig {
100 fn default() -> Self {
101 Self {
102 batch_limit: 10000,
103 cleanup_probability: DEFAULT_CLEANUP_PROBABILITY,
104 }
105 }
106}
107
108impl ExpirationConfig {
109 #[must_use]
111 pub fn new() -> Self {
112 Self::default()
113 }
114
115 #[must_use]
121 pub fn from_env() -> Self {
122 let mut config = Self::default();
123
124 if let Some(limit) = std::env::var("SUBCOG_EXPIRATION_BATCH_LIMIT")
126 .ok()
127 .and_then(|l| l.parse::<usize>().ok())
128 {
129 config.batch_limit = limit;
130 }
131
132 if let Some(prob) = std::env::var(EXPIRATION_CLEANUP_PROBABILITY_ENV)
134 .ok()
135 .and_then(|p| p.parse::<f64>().ok())
136 {
137 config.cleanup_probability = prob.clamp(0.0, 1.0);
138 }
139
140 config
141 }
142
143 #[must_use]
145 pub const fn with_batch_limit(mut self, limit: usize) -> Self {
146 self.batch_limit = limit;
147 self
148 }
149
150 #[must_use]
152 #[allow(clippy::missing_const_for_fn)] pub fn with_cleanup_probability(mut self, probability: f64) -> Self {
154 self.cleanup_probability = probability.clamp(0.0, 1.0);
155 self
156 }
157}
158
159#[derive(Debug, Clone, Default)]
161pub struct ExpirationGcResult {
162 pub memories_checked: usize,
164
165 pub memories_tombstoned: usize,
167
168 pub dry_run: bool,
170
171 pub duration_ms: u64,
173}
174
175impl ExpirationGcResult {
176 #[must_use]
178 pub const fn has_expired_memories(&self) -> bool {
179 self.memories_tombstoned > 0
180 }
181
182 #[must_use]
184 pub fn summary(&self) -> String {
185 let action = if self.dry_run {
186 "would tombstone"
187 } else {
188 "tombstoned"
189 };
190
191 if self.memories_tombstoned == 0 {
192 format!(
193 "No TTL-expired memories found ({} memories checked in {}ms)",
194 self.memories_checked, self.duration_ms
195 )
196 } else {
197 format!(
198 "{} {} TTL-expired memories - checked {} in {}ms",
199 action, self.memories_tombstoned, self.memories_checked, self.duration_ms
200 )
201 }
202 }
203}
204
205pub struct ExpirationService {
215 index: Arc<dyn IndexBackend + Send + Sync>,
217
218 config: ExpirationConfig,
220}
221
222impl ExpirationService {
223 #[must_use]
230 pub fn new(index: Arc<dyn IndexBackend + Send + Sync>, config: ExpirationConfig) -> Self {
231 let _ = Arc::strong_count(&index);
233 Self { index, config }
234 }
235
236 #[must_use]
246 pub fn should_run_cleanup(&self) -> bool {
247 if self.config.cleanup_probability <= 0.0 {
248 return false;
249 }
250 if self.config.cleanup_probability >= 1.0 {
251 return true;
252 }
253
254 let random: f64 = rand_float();
256 random < self.config.cleanup_probability
257 }
258
259 #[instrument(
278 name = "subcog.gc.expiration",
279 skip(self),
280 fields(
281 request_id = tracing::field::Empty,
282 component = "gc",
283 operation = "expiration",
284 dry_run = dry_run,
285 batch_limit = self.config.batch_limit
286 )
287 )]
288 pub fn gc_expired_memories(&self, dry_run: bool) -> Result<ExpirationGcResult> {
289 let start = Instant::now();
290 if let Some(request_id) = crate::observability::current_request_id() {
291 tracing::Span::current().record("request_id", request_id.as_str());
292 }
293
294 let mut result = ExpirationGcResult {
295 dry_run,
296 ..Default::default()
297 };
298
299 let now = crate::current_timestamp();
300
301 let filter = SearchFilter::new().with_include_tombstoned(false);
303 let memories = self.index.list_all(&filter, self.config.batch_limit)?;
304
305 debug!(
306 memory_count = memories.len(),
307 now, "Checking memories for TTL expiration"
308 );
309
310 for (id, _score) in memories {
311 result.memories_checked += 1;
312
313 let Some(memory) = self.index.get_memory(&id)? else {
315 continue;
316 };
317
318 let Some(expires_at) = memory.expires_at else {
320 continue;
322 };
323
324 if expires_at >= now {
325 continue;
327 }
328
329 debug!(
331 memory_id = %id.as_str(),
332 expires_at,
333 now,
334 expired_ago_secs = now.saturating_sub(expires_at),
335 "Memory TTL expired"
336 );
337
338 if dry_run {
339 result.memories_tombstoned += 1;
340 continue;
341 }
342
343 let mut updated = memory.clone();
345 let now_i64 = i64::try_from(now).unwrap_or(i64::MAX);
346 let now_dt = Utc
347 .timestamp_opt(now_i64, 0)
348 .single()
349 .unwrap_or_else(Utc::now);
350 updated.tombstoned_at = Some(now_dt);
351
352 let Err(e) = self.index.index(&updated) else {
353 result.memories_tombstoned += 1;
354 continue;
355 };
356
357 warn!(
358 memory_id = %id.as_str(),
359 error = %e,
360 "Failed to tombstone TTL-expired memory"
361 );
362 }
363
364 result.duration_ms = duration_to_millis(start.elapsed());
365
366 metrics::counter!(
368 "gc_expiration_runs_total",
369 "dry_run" => dry_run.to_string()
370 )
371 .increment(1);
372 metrics::gauge!("gc_expiration_tombstoned").set(usize_to_f64(result.memories_tombstoned));
373 metrics::histogram!("gc_expiration_duration_ms").record(u64_to_f64(result.duration_ms));
374 metrics::histogram!(
375 "memory_lifecycle_duration_ms",
376 "component" => "gc",
377 "operation" => "expiration"
378 )
379 .record(u64_to_f64(result.duration_ms));
380
381 info!(
382 memories_checked = result.memories_checked,
383 memories_tombstoned = result.memories_tombstoned,
384 duration_ms = result.duration_ms,
385 dry_run,
386 "Expiration GC completed"
387 );
388
389 Ok(result)
390 }
391
392 #[must_use]
394 pub const fn config(&self) -> &ExpirationConfig {
395 &self.config
396 }
397}
398
399#[allow(clippy::cast_precision_loss)] fn rand_float() -> f64 {
405 use std::time::{SystemTime, UNIX_EPOCH};
406
407 let seed = SystemTime::now()
409 .duration_since(UNIX_EPOCH)
410 .map_or(42, |d| {
411 let nanos = d.as_nanos();
412 #[allow(clippy::cast_possible_truncation)]
414 let result = nanos as u64;
415 result
416 });
417
418 let mut x = seed;
420 x ^= x << 13;
421 x ^= x >> 7;
422 x ^= x << 17;
423
424 (x as f64) / (u64::MAX as f64)
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use crate::models::{Domain, Memory, MemoryId, MemoryStatus, Namespace};
432 use crate::storage::index::SqliteBackend;
433
434 fn create_test_memory(id: &str, namespace: Namespace, expires_at: Option<u64>) -> Memory {
435 let now = crate::current_timestamp();
436 Memory {
437 id: MemoryId::new(id),
438 content: format!("Test memory {id}"),
439 namespace,
440 domain: Domain::new(),
441 project_id: None,
442 branch: None,
443 file_path: None,
444 status: MemoryStatus::Active,
445 created_at: now,
446 updated_at: now,
447 tombstoned_at: None,
448 expires_at,
449 embedding: None,
450 tags: vec!["test".to_string()],
451 #[cfg(feature = "group-scope")]
452 group_id: None,
453 source: None,
454 is_summary: false,
455 source_memory_ids: None,
456 consolidation_timestamp: None,
457 }
458 }
459
460 #[test]
461 fn test_expiration_config_default() {
462 let config = ExpirationConfig::default();
463 assert_eq!(config.batch_limit, 10000);
464 assert!((config.cleanup_probability - 0.05).abs() < f64::EPSILON);
465 }
466
467 #[test]
468 fn test_expiration_config_builders() {
469 let config = ExpirationConfig::new()
470 .with_batch_limit(5000)
471 .with_cleanup_probability(0.10);
472
473 assert_eq!(config.batch_limit, 5000);
474 assert!((config.cleanup_probability - 0.10).abs() < f64::EPSILON);
475 }
476
477 #[test]
478 fn test_cleanup_probability_clamping() {
479 let config = ExpirationConfig::new().with_cleanup_probability(1.5);
480 assert!((config.cleanup_probability - 1.0).abs() < f64::EPSILON);
481
482 let config = ExpirationConfig::new().with_cleanup_probability(-0.5);
483 assert!(config.cleanup_probability.abs() < f64::EPSILON);
484 }
485
486 #[test]
487 fn test_expiration_gc_result_summary() {
488 let result = ExpirationGcResult {
489 memories_checked: 100,
490 memories_tombstoned: 5,
491 dry_run: false,
492 duration_ms: 50,
493 };
494 assert!(result.summary().contains("tombstoned 5"));
495
496 let result = ExpirationGcResult {
497 memories_checked: 100,
498 memories_tombstoned: 0,
499 dry_run: false,
500 duration_ms: 50,
501 };
502 assert!(result.summary().contains("No TTL-expired"));
503 }
504
505 #[test]
506 fn test_expiration_gc_result_has_expired() {
507 let result = ExpirationGcResult {
508 memories_tombstoned: 0,
509 ..Default::default()
510 };
511 assert!(!result.has_expired_memories());
512
513 let result = ExpirationGcResult {
514 memories_tombstoned: 1,
515 ..Default::default()
516 };
517 assert!(result.has_expired_memories());
518 }
519
520 #[test]
521 fn test_should_run_cleanup_always() {
522 let config = ExpirationConfig::new().with_cleanup_probability(1.0);
523 let backend: Arc<dyn IndexBackend + Send + Sync> =
524 Arc::new(SqliteBackend::in_memory().expect("in-memory backend"));
525 let service = ExpirationService::new(backend, config);
526
527 assert!(service.should_run_cleanup());
529 }
530
531 #[test]
532 fn test_should_run_cleanup_never() {
533 let config = ExpirationConfig::new().with_cleanup_probability(0.0);
534 let backend: Arc<dyn IndexBackend + Send + Sync> =
535 Arc::new(SqliteBackend::in_memory().expect("in-memory backend"));
536 let service = ExpirationService::new(backend, config);
537
538 assert!(!service.should_run_cleanup());
540 }
541
542 #[test]
543 fn test_gc_expired_memories_dry_run() {
544 let backend: Arc<dyn IndexBackend + Send + Sync> =
545 Arc::new(SqliteBackend::in_memory().expect("in-memory backend"));
546 let config = ExpirationConfig::default();
547 let service = ExpirationService::new(Arc::clone(&backend), config);
548
549 let now = crate::current_timestamp();
551 let expired_memory = create_test_memory(
552 "expired-1",
553 Namespace::Decisions,
554 Some(now.saturating_sub(3600)), );
556 backend.index(&expired_memory).expect("index memory");
557
558 let future_memory = create_test_memory(
560 "future-1",
561 Namespace::Decisions,
562 Some(now.saturating_add(3600)), );
564 backend.index(&future_memory).expect("index memory");
565
566 let no_ttl_memory = create_test_memory("no-ttl-1", Namespace::Learnings, None);
568 backend.index(&no_ttl_memory).expect("index memory");
569
570 let result = service
572 .gc_expired_memories(true)
573 .expect("gc should succeed");
574
575 assert_eq!(result.memories_checked, 3);
576 assert_eq!(result.memories_tombstoned, 1);
577 assert!(result.dry_run);
578
579 let memory = backend
581 .get_memory(&MemoryId::new("expired-1"))
582 .expect("get memory")
583 .expect("memory exists");
584 assert!(memory.tombstoned_at.is_none());
585 }
586
587 #[test]
588 fn test_gc_expired_memories_actual() {
589 let backend: Arc<dyn IndexBackend + Send + Sync> =
590 Arc::new(SqliteBackend::in_memory().expect("in-memory backend"));
591 let config = ExpirationConfig::default();
592 let service = ExpirationService::new(Arc::clone(&backend), config);
593
594 let now = crate::current_timestamp();
596 let expired_memory = create_test_memory(
597 "expired-2",
598 Namespace::Decisions,
599 Some(now.saturating_sub(3600)), );
601 backend.index(&expired_memory).expect("index memory");
602
603 let result = service
605 .gc_expired_memories(false)
606 .expect("gc should succeed");
607
608 assert_eq!(result.memories_checked, 1);
609 assert_eq!(result.memories_tombstoned, 1);
610 assert!(!result.dry_run);
611
612 let memory = backend
614 .get_memory(&MemoryId::new("expired-2"))
615 .expect("get memory")
616 .expect("memory exists");
617 assert!(memory.tombstoned_at.is_some());
618 }
619
620 #[test]
621 fn test_gc_no_expired_memories() {
622 let backend: Arc<dyn IndexBackend + Send + Sync> =
623 Arc::new(SqliteBackend::in_memory().expect("in-memory backend"));
624 let config = ExpirationConfig::default();
625 let service = ExpirationService::new(Arc::clone(&backend), config);
626
627 let now = crate::current_timestamp();
629 let future_memory = create_test_memory(
630 "future-2",
631 Namespace::Decisions,
632 Some(now.saturating_add(86400)), );
634 backend.index(&future_memory).expect("index memory");
635
636 let no_ttl_memory = create_test_memory("no-ttl-2", Namespace::Learnings, None);
637 backend.index(&no_ttl_memory).expect("index memory");
638
639 let result = service
640 .gc_expired_memories(false)
641 .expect("gc should succeed");
642
643 assert_eq!(result.memories_checked, 2);
644 assert_eq!(result.memories_tombstoned, 0);
645 }
646
647 #[test]
648 fn test_rand_float_in_range() {
649 for _ in 0..100 {
651 let value = rand_float();
652 assert!((0.0..=1.0).contains(&value), "rand_float() = {value}");
653 }
654 }
655}