Skip to main content

subcog/services/deduplication/
recent.rs

1//! Recent capture deduplication checker.
2//!
3//! Detects duplicates by tracking recently captured content hashes
4//! in an in-memory LRU cache with TTL-based expiration.
5
6use crate::models::{MemoryId, Namespace};
7use lru::LruCache;
8use std::num::NonZeroUsize;
9use std::sync::RwLock;
10use std::time::{Duration, Instant};
11use tracing::instrument;
12
13use super::hasher::ContentHasher;
14
15/// Entry in the recent capture cache.
16#[derive(Debug, Clone)]
17struct CacheEntry {
18    /// The memory ID of the captured content.
19    memory_id: MemoryId,
20    /// The namespace of the captured content.
21    namespace: Namespace,
22    /// The domain of the captured content.
23    domain: String,
24    /// When this entry was recorded.
25    captured_at: Instant,
26}
27
28/// Checker for recently captured content.
29///
30/// # How it works
31///
32/// 1. Maintains an LRU cache mapping content hashes to capture info
33/// 2. When checking, looks up the hash in the cache
34/// 3. Returns a match if found and not expired (within TTL window)
35/// 4. Automatically evicts expired entries and maintains LRU ordering
36///
37/// # Thread Safety
38///
39/// Uses `RwLock` for interior mutability, allowing concurrent reads
40/// and exclusive writes. Safe for use across async tasks.
41///
42/// # Lock Poisoning
43///
44/// Lock poisoning is handled with fail-open semantics: if the lock is
45/// poisoned (due to a panic in another thread), operations return `None`
46/// (for checks) or silently skip (for records). This is intentional:
47/// - Deduplication is a performance optimization, not a correctness requirement
48/// - Failing to detect a duplicate just means we capture twice (safe)
49/// - Blocking all captures due to a transient panic would be worse
50///
51/// # Example
52///
53/// ```rust,ignore
54/// use subcog::services::deduplication::RecentCaptureChecker;
55/// use std::time::Duration;
56///
57/// let checker = RecentCaptureChecker::new(1000, Duration::from_secs(300));
58///
59/// // Record a capture
60/// checker.record("content", MemoryId::new("id1"), Namespace::Decisions, "project");
61///
62/// // Check if same content was recently captured
63/// let result = checker.check("content", Namespace::Decisions);
64/// assert!(result.is_some());
65/// ```
66pub struct RecentCaptureChecker {
67    /// LRU cache mapping content hash to capture entry.
68    /// Uses `RwLock` for thread-safe interior mutability.
69    cache: RwLock<LruCache<String, CacheEntry>>,
70    /// Time-to-live for cache entries.
71    ttl: Duration,
72}
73
74impl RecentCaptureChecker {
75    /// Creates a new recent capture checker.
76    ///
77    /// # Arguments
78    ///
79    /// * `capacity` - Maximum number of entries in the cache
80    /// * `ttl` - How long entries remain valid
81    ///
82    /// # Panics
83    ///
84    /// Panics if capacity is 0.
85    #[must_use]
86    #[allow(clippy::expect_used)] // Documented panic for invalid input
87    pub fn new(capacity: usize, ttl: Duration) -> Self {
88        let cap = NonZeroUsize::new(capacity).expect("capacity must be > 0");
89        Self {
90            cache: RwLock::new(LruCache::new(cap)),
91            ttl,
92        }
93    }
94
95    /// Creates a checker with default settings.
96    ///
97    /// Default: 1000 entries, 5 minute TTL.
98    #[must_use]
99    pub fn default_settings() -> Self {
100        Self::new(1000, Duration::from_secs(300))
101    }
102
103    /// Checks if content was recently captured in the given namespace.
104    ///
105    /// # Arguments
106    ///
107    /// * `content` - The content to check
108    /// * `namespace` - The namespace to check within
109    ///
110    /// # Returns
111    ///
112    /// Returns `Some((MemoryId, URN))` if content was recently captured,
113    /// `None` otherwise.
114    ///
115    /// # Example
116    ///
117    /// ```rust,ignore
118    /// let result = checker.check("content", Namespace::Decisions);
119    /// match result {
120    ///     Some((id, urn)) => println!("Recently captured: {}", urn),
121    ///     None => println!("Not recently captured"),
122    /// }
123    /// ```
124    #[instrument(
125        skip(self, content),
126        fields(
127            operation = "recent_capture_check",
128            namespace = %namespace.as_str(),
129            content_length = content.len()
130        )
131    )]
132    pub fn check(&self, content: &str, namespace: Namespace) -> Option<(MemoryId, String)> {
133        let start = Instant::now();
134        let hash = ContentHasher::hash(content);
135
136        // Try to get from cache (read lock)
137        let result = {
138            let cache = self.cache.read().ok()?;
139            cache.peek(&hash).cloned()
140        };
141
142        let duration_ms = start.elapsed().as_millis();
143
144        match result {
145            Some(entry) => {
146                // Check if entry is still valid (not expired)
147                if entry.captured_at.elapsed() <= self.ttl {
148                    // Check namespace matches
149                    if entry.namespace == namespace {
150                        let urn = format!(
151                            "subcog://{}/{}/{}",
152                            entry.domain,
153                            namespace.as_str(),
154                            entry.memory_id
155                        );
156
157                        tracing::debug!(
158                            memory_id = %entry.memory_id,
159                            urn = %urn,
160                            age_ms = %entry.captured_at.elapsed().as_millis(),
161                            duration_ms = %duration_ms,
162                            "Recent capture found"
163                        );
164
165                        metrics::histogram!(
166                            "deduplication_check_duration_ms",
167                            "checker" => "recent_capture",
168                            "found" => "true"
169                        )
170                        .record(duration_ms as f64);
171
172                        return Some((entry.memory_id, urn));
173                    }
174                }
175
176                // Entry expired or wrong namespace - will be cleaned up lazily
177                tracing::debug!(
178                    duration_ms = %duration_ms,
179                    "Cache entry expired or wrong namespace"
180                );
181            },
182            None => {
183                tracing::debug!(
184                    duration_ms = %duration_ms,
185                    "No recent capture found"
186                );
187            },
188        }
189
190        metrics::histogram!(
191            "deduplication_check_duration_ms",
192            "checker" => "recent_capture",
193            "found" => "false"
194        )
195        .record(duration_ms as f64);
196
197        None
198    }
199
200    /// Records a successful capture for future duplicate detection.
201    ///
202    /// # Arguments
203    ///
204    /// * `content` - The captured content
205    /// * `memory_id` - The ID assigned to the captured memory
206    /// * `namespace` - The namespace the content was captured to
207    /// * `domain` - The domain the content was captured to
208    ///
209    /// # Example
210    ///
211    /// ```rust,ignore
212    /// checker.record(
213    ///     "Use PostgreSQL for storage",
214    ///     MemoryId::new("mem-123"),
215    ///     Namespace::Decisions,
216    ///     "project",
217    /// );
218    /// ```
219    #[instrument(
220        skip(self, content),
221        fields(
222            operation = "record_capture",
223            memory_id = %memory_id,
224            namespace = %namespace.as_str(),
225            content_length = content.len()
226        )
227    )]
228    pub fn record(&self, content: &str, memory_id: &MemoryId, namespace: Namespace, domain: &str) {
229        let hash = ContentHasher::hash(content);
230
231        let entry = CacheEntry {
232            memory_id: memory_id.clone(),
233            namespace,
234            domain: domain.to_string(),
235            captured_at: Instant::now(),
236        };
237
238        // Acquire write lock and insert
239        if let Ok(mut cache) = self.cache.write() {
240            cache.put(hash, entry);
241
242            tracing::debug!(
243                memory_id = %memory_id,
244                "Recorded capture in recent cache"
245            );
246
247            metrics::gauge!("deduplication_recent_cache_size").set(cache.len() as f64);
248        }
249    }
250
251    /// Records a capture using just the content hash.
252    ///
253    /// Useful when the hash has already been computed.
254    ///
255    /// # Arguments
256    ///
257    /// * `content_hash` - The pre-computed content hash
258    /// * `memory_id` - The ID assigned to the captured memory
259    /// * `namespace` - The namespace the content was captured to
260    /// * `domain` - The domain the content was captured to
261    pub fn record_by_hash(
262        &self,
263        content_hash: &str,
264        memory_id: &MemoryId,
265        namespace: Namespace,
266        domain: &str,
267    ) {
268        let entry = CacheEntry {
269            memory_id: memory_id.clone(),
270            namespace,
271            domain: domain.to_string(),
272            captured_at: Instant::now(),
273        };
274
275        if let Ok(mut cache) = self.cache.write() {
276            cache.put(content_hash.to_string(), entry);
277
278            tracing::debug!(
279                memory_id = %memory_id,
280                hash = %content_hash,
281                "Recorded capture by hash in recent cache"
282            );
283
284            metrics::gauge!("deduplication_recent_cache_size").set(cache.len() as f64);
285        }
286    }
287
288    /// Clears all entries from the cache.
289    #[cfg(test)]
290    pub fn clear(&self) {
291        if let Ok(mut cache) = self.cache.write() {
292            cache.clear();
293
294            tracing::debug!("Cleared recent capture cache");
295
296            metrics::gauge!("deduplication_recent_cache_size").set(0.0);
297        }
298    }
299
300    /// Returns the current number of entries in the cache.
301    ///
302    /// Note: This includes potentially expired entries that haven't
303    /// been cleaned up yet.
304    #[cfg(test)]
305    #[must_use]
306    pub fn len(&self) -> usize {
307        self.cache.read().map(|c| c.len()).unwrap_or(0)
308    }
309
310    /// Returns true if the cache is empty.
311    #[cfg(test)]
312    #[must_use]
313    pub fn is_empty(&self) -> bool {
314        self.len() == 0
315    }
316
317    /// Returns the configured TTL.
318    #[cfg(test)]
319    #[must_use]
320    pub const fn ttl(&self) -> Duration {
321        self.ttl
322    }
323}
324
325impl Default for RecentCaptureChecker {
326    fn default() -> Self {
327        Self::default_settings()
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334    use std::thread;
335
336    #[test]
337    fn test_new_checker() {
338        let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
339        assert_eq!(checker.len(), 0);
340        assert!(checker.is_empty());
341        assert_eq!(checker.ttl(), Duration::from_secs(60));
342    }
343
344    #[test]
345    fn test_default_settings() {
346        let checker = RecentCaptureChecker::default_settings();
347        assert_eq!(checker.ttl(), Duration::from_secs(300));
348    }
349
350    #[test]
351    fn test_record_and_check() {
352        let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
353
354        let content = "Use PostgreSQL for storage";
355        let memory_id = MemoryId::new("mem-123");
356
357        // Record the capture
358        checker.record(content, &memory_id, Namespace::Decisions, "project");
359
360        assert_eq!(checker.len(), 1);
361
362        // Check should find it
363        let result = checker.check(content, Namespace::Decisions);
364        assert!(result.is_some());
365
366        let (id, urn) = result.unwrap();
367        assert_eq!(id.as_str(), "mem-123");
368        assert_eq!(urn, "subcog://project/decisions/mem-123");
369    }
370
371    #[test]
372    fn test_check_not_found() {
373        let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
374
375        let result = checker.check("non-existent content", Namespace::Decisions);
376        assert!(result.is_none());
377    }
378
379    #[test]
380    fn test_check_wrong_namespace() {
381        let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
382
383        let content = "Use PostgreSQL for storage";
384        checker.record(
385            content,
386            &MemoryId::new("mem-123"),
387            Namespace::Decisions,
388            "project",
389        );
390
391        // Check in different namespace should not find it
392        let result = checker.check(content, Namespace::Patterns);
393        assert!(result.is_none());
394    }
395
396    #[test]
397    fn test_check_expired() {
398        // Create checker with very short TTL
399        let checker = RecentCaptureChecker::new(100, Duration::from_millis(50));
400
401        let content = "Use PostgreSQL for storage";
402        checker.record(
403            content,
404            &MemoryId::new("mem-123"),
405            Namespace::Decisions,
406            "project",
407        );
408
409        // Wait for expiration
410        thread::sleep(Duration::from_millis(100));
411
412        // Check should not find it (expired)
413        let result = checker.check(content, Namespace::Decisions);
414        assert!(result.is_none());
415    }
416
417    #[test]
418    fn test_normalized_content_matches() {
419        let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
420
421        // Record with normalized content
422        checker.record(
423            "Use PostgreSQL",
424            &MemoryId::new("mem-123"),
425            Namespace::Decisions,
426            "project",
427        );
428
429        // Check with whitespace/case variations should still match
430        let result = checker.check("  use  postgresql  ", Namespace::Decisions);
431        assert!(result.is_some());
432    }
433
434    #[test]
435    fn test_lru_eviction() {
436        // Create checker with capacity of 2
437        let checker = RecentCaptureChecker::new(2, Duration::from_secs(60));
438
439        checker.record(
440            "content1",
441            &MemoryId::new("mem-1"),
442            Namespace::Decisions,
443            "project",
444        );
445        checker.record(
446            "content2",
447            &MemoryId::new("mem-2"),
448            Namespace::Decisions,
449            "project",
450        );
451
452        assert_eq!(checker.len(), 2);
453
454        // Add a third - should evict content1 (least recently used)
455        checker.record(
456            "content3",
457            &MemoryId::new("mem-3"),
458            Namespace::Decisions,
459            "project",
460        );
461
462        assert_eq!(checker.len(), 2);
463
464        // content1 should be evicted
465        let result = checker.check("content1", Namespace::Decisions);
466        assert!(result.is_none());
467
468        // content2 and content3 should still be there
469        let result = checker.check("content2", Namespace::Decisions);
470        assert!(result.is_some());
471
472        let result = checker.check("content3", Namespace::Decisions);
473        assert!(result.is_some());
474    }
475
476    #[test]
477    fn test_clear() {
478        let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
479
480        checker.record(
481            "content1",
482            &MemoryId::new("mem-1"),
483            Namespace::Decisions,
484            "project",
485        );
486        checker.record(
487            "content2",
488            &MemoryId::new("mem-2"),
489            Namespace::Decisions,
490            "project",
491        );
492
493        assert_eq!(checker.len(), 2);
494
495        checker.clear();
496
497        assert_eq!(checker.len(), 0);
498        assert!(checker.is_empty());
499    }
500
501    #[test]
502    fn test_record_by_hash() {
503        let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
504
505        let content = "Use PostgreSQL for storage";
506        let hash = ContentHasher::hash(content);
507
508        // Record by hash
509        checker.record_by_hash(
510            &hash,
511            &MemoryId::new("mem-123"),
512            Namespace::Decisions,
513            "project",
514        );
515
516        // Check with content should find it
517        let result = checker.check(content, Namespace::Decisions);
518        assert!(result.is_some());
519    }
520
521    #[test]
522    fn test_update_existing() {
523        let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
524
525        let content = "Use PostgreSQL for storage";
526
527        // Record first time
528        checker.record(
529            content,
530            &MemoryId::new("mem-old"),
531            Namespace::Decisions,
532            "project",
533        );
534
535        // Record again with different ID
536        checker.record(
537            content,
538            &MemoryId::new("mem-new"),
539            Namespace::Decisions,
540            "project",
541        );
542
543        assert_eq!(checker.len(), 1);
544
545        // Should find the new ID
546        let result = checker.check(content, Namespace::Decisions);
547        assert!(result.is_some());
548
549        let (id, _) = result.unwrap();
550        assert_eq!(id.as_str(), "mem-new");
551    }
552
553    #[test]
554    fn test_thread_safety() {
555        use std::sync::Arc;
556
557        let checker = Arc::new(RecentCaptureChecker::new(100, Duration::from_secs(60)));
558
559        let checker1 = checker.clone();
560        let checker2 = checker.clone();
561
562        let t1 = thread::spawn(move || {
563            for i in 0..50 {
564                checker1.record(
565                    &format!("content-t1-{i}"),
566                    &MemoryId::new(format!("mem-t1-{i}")),
567                    Namespace::Decisions,
568                    "project",
569                );
570            }
571        });
572
573        let t2 = thread::spawn(move || {
574            for i in 0..50 {
575                checker2.record(
576                    &format!("content-t2-{i}"),
577                    &MemoryId::new(format!("mem-t2-{i}")),
578                    Namespace::Patterns,
579                    "project",
580                );
581            }
582        });
583
584        t1.join().unwrap();
585        t2.join().unwrap();
586
587        // Should have 100 entries
588        assert_eq!(checker.len(), 100);
589    }
590}