subcog/services/deduplication/
recent.rs1use crate::models::{MemoryId, Namespace};
7use lru::LruCache;
8use std::num::NonZeroUsize;
9use std::sync::RwLock;
10use std::time::{Duration, Instant};
11use tracing::instrument;
12
13use super::hasher::ContentHasher;
14
15#[derive(Debug, Clone)]
17struct CacheEntry {
18 memory_id: MemoryId,
20 namespace: Namespace,
22 domain: String,
24 captured_at: Instant,
26}
27
28pub struct RecentCaptureChecker {
67 cache: RwLock<LruCache<String, CacheEntry>>,
70 ttl: Duration,
72}
73
74impl RecentCaptureChecker {
75 #[must_use]
86 #[allow(clippy::expect_used)] pub fn new(capacity: usize, ttl: Duration) -> Self {
88 let cap = NonZeroUsize::new(capacity).expect("capacity must be > 0");
89 Self {
90 cache: RwLock::new(LruCache::new(cap)),
91 ttl,
92 }
93 }
94
95 #[must_use]
99 pub fn default_settings() -> Self {
100 Self::new(1000, Duration::from_secs(300))
101 }
102
103 #[instrument(
125 skip(self, content),
126 fields(
127 operation = "recent_capture_check",
128 namespace = %namespace.as_str(),
129 content_length = content.len()
130 )
131 )]
132 pub fn check(&self, content: &str, namespace: Namespace) -> Option<(MemoryId, String)> {
133 let start = Instant::now();
134 let hash = ContentHasher::hash(content);
135
136 let result = {
138 let cache = self.cache.read().ok()?;
139 cache.peek(&hash).cloned()
140 };
141
142 let duration_ms = start.elapsed().as_millis();
143
144 match result {
145 Some(entry) => {
146 if entry.captured_at.elapsed() <= self.ttl {
148 if entry.namespace == namespace {
150 let urn = format!(
151 "subcog://{}/{}/{}",
152 entry.domain,
153 namespace.as_str(),
154 entry.memory_id
155 );
156
157 tracing::debug!(
158 memory_id = %entry.memory_id,
159 urn = %urn,
160 age_ms = %entry.captured_at.elapsed().as_millis(),
161 duration_ms = %duration_ms,
162 "Recent capture found"
163 );
164
165 metrics::histogram!(
166 "deduplication_check_duration_ms",
167 "checker" => "recent_capture",
168 "found" => "true"
169 )
170 .record(duration_ms as f64);
171
172 return Some((entry.memory_id, urn));
173 }
174 }
175
176 tracing::debug!(
178 duration_ms = %duration_ms,
179 "Cache entry expired or wrong namespace"
180 );
181 },
182 None => {
183 tracing::debug!(
184 duration_ms = %duration_ms,
185 "No recent capture found"
186 );
187 },
188 }
189
190 metrics::histogram!(
191 "deduplication_check_duration_ms",
192 "checker" => "recent_capture",
193 "found" => "false"
194 )
195 .record(duration_ms as f64);
196
197 None
198 }
199
200 #[instrument(
220 skip(self, content),
221 fields(
222 operation = "record_capture",
223 memory_id = %memory_id,
224 namespace = %namespace.as_str(),
225 content_length = content.len()
226 )
227 )]
228 pub fn record(&self, content: &str, memory_id: &MemoryId, namespace: Namespace, domain: &str) {
229 let hash = ContentHasher::hash(content);
230
231 let entry = CacheEntry {
232 memory_id: memory_id.clone(),
233 namespace,
234 domain: domain.to_string(),
235 captured_at: Instant::now(),
236 };
237
238 if let Ok(mut cache) = self.cache.write() {
240 cache.put(hash, entry);
241
242 tracing::debug!(
243 memory_id = %memory_id,
244 "Recorded capture in recent cache"
245 );
246
247 metrics::gauge!("deduplication_recent_cache_size").set(cache.len() as f64);
248 }
249 }
250
251 pub fn record_by_hash(
262 &self,
263 content_hash: &str,
264 memory_id: &MemoryId,
265 namespace: Namespace,
266 domain: &str,
267 ) {
268 let entry = CacheEntry {
269 memory_id: memory_id.clone(),
270 namespace,
271 domain: domain.to_string(),
272 captured_at: Instant::now(),
273 };
274
275 if let Ok(mut cache) = self.cache.write() {
276 cache.put(content_hash.to_string(), entry);
277
278 tracing::debug!(
279 memory_id = %memory_id,
280 hash = %content_hash,
281 "Recorded capture by hash in recent cache"
282 );
283
284 metrics::gauge!("deduplication_recent_cache_size").set(cache.len() as f64);
285 }
286 }
287
288 #[cfg(test)]
290 pub fn clear(&self) {
291 if let Ok(mut cache) = self.cache.write() {
292 cache.clear();
293
294 tracing::debug!("Cleared recent capture cache");
295
296 metrics::gauge!("deduplication_recent_cache_size").set(0.0);
297 }
298 }
299
300 #[cfg(test)]
305 #[must_use]
306 pub fn len(&self) -> usize {
307 self.cache.read().map(|c| c.len()).unwrap_or(0)
308 }
309
310 #[cfg(test)]
312 #[must_use]
313 pub fn is_empty(&self) -> bool {
314 self.len() == 0
315 }
316
317 #[cfg(test)]
319 #[must_use]
320 pub const fn ttl(&self) -> Duration {
321 self.ttl
322 }
323}
324
325impl Default for RecentCaptureChecker {
326 fn default() -> Self {
327 Self::default_settings()
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334 use std::thread;
335
336 #[test]
337 fn test_new_checker() {
338 let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
339 assert_eq!(checker.len(), 0);
340 assert!(checker.is_empty());
341 assert_eq!(checker.ttl(), Duration::from_secs(60));
342 }
343
344 #[test]
345 fn test_default_settings() {
346 let checker = RecentCaptureChecker::default_settings();
347 assert_eq!(checker.ttl(), Duration::from_secs(300));
348 }
349
350 #[test]
351 fn test_record_and_check() {
352 let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
353
354 let content = "Use PostgreSQL for storage";
355 let memory_id = MemoryId::new("mem-123");
356
357 checker.record(content, &memory_id, Namespace::Decisions, "project");
359
360 assert_eq!(checker.len(), 1);
361
362 let result = checker.check(content, Namespace::Decisions);
364 assert!(result.is_some());
365
366 let (id, urn) = result.unwrap();
367 assert_eq!(id.as_str(), "mem-123");
368 assert_eq!(urn, "subcog://project/decisions/mem-123");
369 }
370
371 #[test]
372 fn test_check_not_found() {
373 let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
374
375 let result = checker.check("non-existent content", Namespace::Decisions);
376 assert!(result.is_none());
377 }
378
379 #[test]
380 fn test_check_wrong_namespace() {
381 let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
382
383 let content = "Use PostgreSQL for storage";
384 checker.record(
385 content,
386 &MemoryId::new("mem-123"),
387 Namespace::Decisions,
388 "project",
389 );
390
391 let result = checker.check(content, Namespace::Patterns);
393 assert!(result.is_none());
394 }
395
396 #[test]
397 fn test_check_expired() {
398 let checker = RecentCaptureChecker::new(100, Duration::from_millis(50));
400
401 let content = "Use PostgreSQL for storage";
402 checker.record(
403 content,
404 &MemoryId::new("mem-123"),
405 Namespace::Decisions,
406 "project",
407 );
408
409 thread::sleep(Duration::from_millis(100));
411
412 let result = checker.check(content, Namespace::Decisions);
414 assert!(result.is_none());
415 }
416
417 #[test]
418 fn test_normalized_content_matches() {
419 let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
420
421 checker.record(
423 "Use PostgreSQL",
424 &MemoryId::new("mem-123"),
425 Namespace::Decisions,
426 "project",
427 );
428
429 let result = checker.check(" use postgresql ", Namespace::Decisions);
431 assert!(result.is_some());
432 }
433
434 #[test]
435 fn test_lru_eviction() {
436 let checker = RecentCaptureChecker::new(2, Duration::from_secs(60));
438
439 checker.record(
440 "content1",
441 &MemoryId::new("mem-1"),
442 Namespace::Decisions,
443 "project",
444 );
445 checker.record(
446 "content2",
447 &MemoryId::new("mem-2"),
448 Namespace::Decisions,
449 "project",
450 );
451
452 assert_eq!(checker.len(), 2);
453
454 checker.record(
456 "content3",
457 &MemoryId::new("mem-3"),
458 Namespace::Decisions,
459 "project",
460 );
461
462 assert_eq!(checker.len(), 2);
463
464 let result = checker.check("content1", Namespace::Decisions);
466 assert!(result.is_none());
467
468 let result = checker.check("content2", Namespace::Decisions);
470 assert!(result.is_some());
471
472 let result = checker.check("content3", Namespace::Decisions);
473 assert!(result.is_some());
474 }
475
476 #[test]
477 fn test_clear() {
478 let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
479
480 checker.record(
481 "content1",
482 &MemoryId::new("mem-1"),
483 Namespace::Decisions,
484 "project",
485 );
486 checker.record(
487 "content2",
488 &MemoryId::new("mem-2"),
489 Namespace::Decisions,
490 "project",
491 );
492
493 assert_eq!(checker.len(), 2);
494
495 checker.clear();
496
497 assert_eq!(checker.len(), 0);
498 assert!(checker.is_empty());
499 }
500
501 #[test]
502 fn test_record_by_hash() {
503 let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
504
505 let content = "Use PostgreSQL for storage";
506 let hash = ContentHasher::hash(content);
507
508 checker.record_by_hash(
510 &hash,
511 &MemoryId::new("mem-123"),
512 Namespace::Decisions,
513 "project",
514 );
515
516 let result = checker.check(content, Namespace::Decisions);
518 assert!(result.is_some());
519 }
520
521 #[test]
522 fn test_update_existing() {
523 let checker = RecentCaptureChecker::new(100, Duration::from_secs(60));
524
525 let content = "Use PostgreSQL for storage";
526
527 checker.record(
529 content,
530 &MemoryId::new("mem-old"),
531 Namespace::Decisions,
532 "project",
533 );
534
535 checker.record(
537 content,
538 &MemoryId::new("mem-new"),
539 Namespace::Decisions,
540 "project",
541 );
542
543 assert_eq!(checker.len(), 1);
544
545 let result = checker.check(content, Namespace::Decisions);
547 assert!(result.is_some());
548
549 let (id, _) = result.unwrap();
550 assert_eq!(id.as_str(), "mem-new");
551 }
552
553 #[test]
554 fn test_thread_safety() {
555 use std::sync::Arc;
556
557 let checker = Arc::new(RecentCaptureChecker::new(100, Duration::from_secs(60)));
558
559 let checker1 = checker.clone();
560 let checker2 = checker.clone();
561
562 let t1 = thread::spawn(move || {
563 for i in 0..50 {
564 checker1.record(
565 &format!("content-t1-{i}"),
566 &MemoryId::new(format!("mem-t1-{i}")),
567 Namespace::Decisions,
568 "project",
569 );
570 }
571 });
572
573 let t2 = thread::spawn(move || {
574 for i in 0..50 {
575 checker2.record(
576 &format!("content-t2-{i}"),
577 &MemoryId::new(format!("mem-t2-{i}")),
578 Namespace::Patterns,
579 "project",
580 );
581 }
582 });
583
584 t1.join().unwrap();
585 t2.join().unwrap();
586
587 assert_eq!(checker.len(), 100);
589 }
590}