1use crate::Result;
11use crate::embedding::Embedder;
12use crate::models::{Domain, MemoryId, Namespace};
13use crate::services::recall::RecallService;
14use crate::storage::traits::VectorBackend;
15use std::sync::Arc;
16use std::time::Instant;
17use tracing::instrument;
18
19use super::config::DeduplicationConfig;
20use super::exact_match::ExactMatchChecker;
21use super::hasher::ContentHasher;
22use super::recent::RecentCaptureChecker;
23use super::semantic::SemanticSimilarityChecker;
24use super::types::{Deduplicator, DuplicateCheckResult};
25
26pub struct DeduplicationService<E: Embedder + Send + Sync, V: VectorBackend + Send + Sync> {
57 config: DeduplicationConfig,
59 exact_match: ExactMatchChecker,
61 semantic: Option<SemanticSimilarityChecker<E, V>>,
63 recent: RecentCaptureChecker,
65 domain: Domain,
67}
68
69impl<E: Embedder + Send + Sync, V: VectorBackend + Send + Sync> DeduplicationService<E, V> {
70 #[must_use]
79 pub fn new(
80 recall: Arc<RecallService>,
81 embedder: Arc<E>,
82 vector: Arc<V>,
83 config: DeduplicationConfig,
84 ) -> Self {
85 let exact_match = ExactMatchChecker::new(recall);
86 let semantic = Some(SemanticSimilarityChecker::new(
87 embedder,
88 vector,
89 config.clone(),
90 ));
91 let recent = RecentCaptureChecker::new(config.cache_capacity, config.recent_window);
92
93 Self {
94 config,
95 exact_match,
96 semantic,
97 recent,
98 domain: Domain::new(),
99 }
100 }
101
102 #[must_use]
112 pub fn without_embeddings(recall: Arc<RecallService>, config: DeduplicationConfig) -> Self {
113 let exact_match = ExactMatchChecker::new(recall);
114 let recent = RecentCaptureChecker::new(config.cache_capacity, config.recent_window);
115
116 Self {
117 config,
118 exact_match,
119 semantic: None,
120 recent,
121 domain: Domain::new(),
122 }
123 }
124
125 #[must_use]
127 pub fn with_domain(mut self, domain: Domain) -> Self {
128 self.domain = domain;
129 self
130 }
131
132 fn domain_string(&self) -> String {
134 self.domain.to_string()
135 }
136
137 #[allow(clippy::cast_possible_truncation)]
139 fn check_exact_match(
140 &self,
141 content: &str,
142 namespace: Namespace,
143 domain: &str,
144 start: Instant,
145 ) -> Option<DuplicateCheckResult> {
146 match self.exact_match.check(content, namespace, domain) {
147 Ok(Some((memory_id, urn))) => {
148 let duration_ms = start.elapsed().as_millis() as u64;
149 tracing::info!(
150 memory_id = %memory_id,
151 urn = %urn,
152 duration_ms = duration_ms,
153 "Exact match duplicate found"
154 );
155 metrics::counter!(
156 "deduplication_duplicates_total",
157 "namespace" => namespace.as_str().to_string(),
158 "reason" => "exact_match"
159 )
160 .increment(1);
161 Some(DuplicateCheckResult::exact_match(
162 memory_id,
163 urn,
164 duration_ms,
165 ))
166 },
167 Ok(None) => {
168 tracing::debug!("No exact match, checking semantic similarity");
169 None
170 },
171 Err(e) => {
172 tracing::warn!(error = %e, "Exact match check failed, continuing");
173 None
174 },
175 }
176 }
177
178 #[allow(clippy::cast_possible_truncation)]
180 fn check_semantic(
181 &self,
182 content: &str,
183 namespace: Namespace,
184 domain: &str,
185 start: Instant,
186 ) -> Option<DuplicateCheckResult> {
187 let semantic = self.semantic.as_ref()?;
188
189 match semantic.check(content, namespace, domain) {
190 Ok(Some((memory_id, urn, score))) => {
191 let duration_ms = start.elapsed().as_millis() as u64;
192 tracing::info!(
193 memory_id = %memory_id,
194 urn = %urn,
195 score = score,
196 duration_ms = duration_ms,
197 "Semantic similarity duplicate found"
198 );
199 metrics::counter!(
200 "deduplication_duplicates_total",
201 "namespace" => namespace.as_str().to_string(),
202 "reason" => "semantic_similar"
203 )
204 .increment(1);
205 Some(DuplicateCheckResult::semantic_match(
206 memory_id,
207 urn,
208 score,
209 duration_ms,
210 ))
211 },
212 Ok(None) => {
213 tracing::debug!("No semantic match, checking recent captures");
214 None
215 },
216 Err(e) => {
217 tracing::warn!(error = %e, "Semantic check failed, continuing");
218 None
219 },
220 }
221 }
222
223 #[allow(clippy::cast_possible_truncation)]
225 fn check_recent(
226 &self,
227 content: &str,
228 namespace: Namespace,
229 start: Instant,
230 ) -> Option<DuplicateCheckResult> {
231 if let Some((memory_id, urn)) = self.recent.check(content, namespace) {
232 let duration_ms = start.elapsed().as_millis() as u64;
233 tracing::info!(
234 memory_id = %memory_id,
235 urn = %urn,
236 duration_ms = duration_ms,
237 "Recent capture duplicate found"
238 );
239 metrics::counter!(
240 "deduplication_duplicates_total",
241 "namespace" => namespace.as_str().to_string(),
242 "reason" => "recent_capture"
243 )
244 .increment(1);
245 Some(DuplicateCheckResult::recent_capture(
246 memory_id,
247 urn,
248 duration_ms,
249 ))
250 } else {
251 None
252 }
253 }
254
255 #[allow(clippy::cast_possible_truncation)]
257 fn record_unique_check_metrics(&self, namespace: Namespace, duration_ms: u64) {
258 metrics::counter!(
259 "deduplication_checks_total",
260 "namespace" => namespace.as_str().to_string(),
261 "result" => "unique"
262 )
263 .increment(1);
264 metrics::histogram!(
265 "deduplication_check_duration_ms",
266 "checker" => "total"
267 )
268 .record(duration_ms as f64);
269 }
270
271 #[allow(clippy::cast_possible_truncation)] #[instrument(
291 skip(self, content),
292 fields(
293 operation = "dedup_check",
294 namespace = %namespace.as_str(),
295 content_length = content.len()
296 )
297 )]
298 pub fn check(&self, content: &str, namespace: Namespace) -> Result<DuplicateCheckResult> {
299 let start = Instant::now();
300 let domain = self.domain_string();
301
302 if !self.config.enabled {
304 tracing::debug!("Deduplication disabled, skipping check");
305 return Ok(DuplicateCheckResult::not_duplicate(
306 start.elapsed().as_millis() as u64,
307 ));
308 }
309
310 if let Some(result) = self.check_exact_match(content, namespace, &domain, start) {
312 return Ok(result);
313 }
314
315 if let Some(result) = self.check_semantic(content, namespace, &domain, start) {
317 return Ok(result);
318 }
319
320 if let Some(result) = self.check_recent(content, namespace, start) {
322 return Ok(result);
323 }
324
325 let duration_ms = start.elapsed().as_millis() as u64;
327 tracing::debug!(duration_ms = duration_ms, "No duplicate found");
328 self.record_unique_check_metrics(namespace, duration_ms);
329
330 Ok(DuplicateCheckResult::not_duplicate(duration_ms))
331 }
332
333 #[instrument(
344 skip(self, content),
345 fields(
346 operation = "record_capture",
347 memory_id = %memory_id,
348 namespace = %namespace.as_str()
349 )
350 )]
351 pub fn record_capture(&self, content: &str, memory_id: &MemoryId, namespace: Namespace) {
352 let domain = self.domain_string();
353 self.recent.record(content, memory_id, namespace, &domain);
354
355 tracing::debug!(
356 memory_id = %memory_id,
357 "Recorded capture for recent-capture tracking"
358 );
359 }
360
361 pub fn record_capture_by_hash(
371 &self,
372 content_hash: &str,
373 memory_id: &MemoryId,
374 namespace: Namespace,
375 ) {
376 let domain = self.domain_string();
377 self.recent
378 .record_by_hash(content_hash, memory_id, namespace, &domain);
379 }
380
381 #[must_use]
386 pub fn content_to_tag(content: &str) -> String {
387 ExactMatchChecker::content_to_tag(content)
388 }
389
390 #[must_use]
392 pub fn hash_content(content: &str) -> String {
393 ContentHasher::hash(content)
394 }
395
396 #[must_use]
398 pub const fn is_enabled(&self) -> bool {
399 self.config.enabled
400 }
401
402 #[must_use]
404 pub fn get_threshold(&self, namespace: Namespace) -> f32 {
405 self.config.get_threshold(namespace)
406 }
407}
408
409impl<E: Embedder + Send + Sync, V: VectorBackend + Send + Sync> Deduplicator
411 for DeduplicationService<E, V>
412{
413 fn check_duplicate(&self, content: &str, namespace: Namespace) -> Result<DuplicateCheckResult> {
414 self.check(content, namespace)
415 }
416
417 fn record_capture(&self, content_hash: &str, memory_id: &MemoryId) {
418 self.record_capture_by_hash(content_hash, memory_id, Namespace::Decisions);
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::embedding::FastEmbedEmbedder;
428 use crate::models::{Memory, MemoryStatus};
429 use crate::storage::index::SqliteBackend;
430 use crate::storage::traits::IndexBackend;
431 use crate::storage::vector::UsearchBackend;
432 use std::sync::RwLock;
433
434 struct RwLockVectorWrapper {
436 inner: RwLock<UsearchBackend>,
437 }
438
439 impl RwLockVectorWrapper {
440 fn new(backend: UsearchBackend) -> Self {
441 Self {
442 inner: RwLock::new(backend),
443 }
444 }
445 }
446
447 impl VectorBackend for RwLockVectorWrapper {
448 fn dimensions(&self) -> usize {
449 self.inner.read().unwrap().dimensions()
450 }
451
452 fn upsert(&self, id: &MemoryId, embedding: &[f32]) -> Result<()> {
453 self.inner.write().unwrap().upsert(id, embedding)
454 }
455
456 fn remove(&self, id: &MemoryId) -> Result<bool> {
457 self.inner.write().unwrap().remove(id)
458 }
459
460 fn search(
461 &self,
462 query_embedding: &[f32],
463 filter: &crate::storage::traits::VectorFilter,
464 limit: usize,
465 ) -> Result<Vec<(MemoryId, f32)>> {
466 self.inner
467 .read()
468 .unwrap()
469 .search(query_embedding, filter, limit)
470 }
471
472 fn count(&self) -> Result<usize> {
473 self.inner.read().unwrap().count()
474 }
475
476 fn clear(&self) -> Result<()> {
477 self.inner.write().unwrap().clear()
478 }
479 }
480
481 fn create_test_service() -> DeduplicationService<FastEmbedEmbedder, RwLockVectorWrapper> {
482 let index = SqliteBackend::in_memory().unwrap();
483 let recall = Arc::new(RecallService::with_index(index));
484 let embedder = Arc::new(FastEmbedEmbedder::new());
485 let vector = Arc::new(RwLockVectorWrapper::new(create_usearch_backend(
486 FastEmbedEmbedder::DEFAULT_DIMENSIONS,
487 )));
488 let config = DeduplicationConfig::default();
489
490 DeduplicationService::new(recall, embedder, vector, config)
491 }
492
493 #[cfg(not(feature = "usearch-hnsw"))]
496 fn create_usearch_backend(dimensions: usize) -> UsearchBackend {
497 UsearchBackend::in_memory(dimensions)
498 }
499
500 #[cfg(feature = "usearch-hnsw")]
503 fn create_usearch_backend(dimensions: usize) -> UsearchBackend {
504 UsearchBackend::in_memory(dimensions).expect("Failed to create usearch backend")
505 }
506
507 fn create_test_memory(
508 id: &str,
509 content: &str,
510 namespace: Namespace,
511 tags: Vec<String>,
512 ) -> Memory {
513 Memory {
514 id: MemoryId::new(id),
515 content: content.to_string(),
516 namespace,
517 domain: Domain::new(),
518 project_id: None,
519 branch: None,
520 file_path: None,
521 status: MemoryStatus::Active,
522 created_at: 1_234_567_890,
523 updated_at: 1_234_567_890,
524 tombstoned_at: None,
525 expires_at: None,
526 embedding: None,
527 tags,
528 #[cfg(feature = "group-scope")]
529 group_id: None,
530 source: None,
531 is_summary: false,
532 source_memory_ids: None,
533 consolidation_timestamp: None,
534 }
535 }
536
537 #[test]
538 fn test_check_no_duplicate() {
539 let service = create_test_service();
540
541 let result = service
542 .check(
543 "This is unique content that has never been seen before.",
544 Namespace::Decisions,
545 )
546 .unwrap();
547
548 assert!(!result.is_duplicate);
549 assert!(result.reason.is_none());
550 assert!(result.matched_memory_id.is_none());
551 }
552
553 #[test]
554 fn test_check_disabled() {
555 let index = SqliteBackend::in_memory().unwrap();
556 let recall = Arc::new(RecallService::with_index(index));
557 let embedder = Arc::new(FastEmbedEmbedder::new());
558 let vector = Arc::new(RwLockVectorWrapper::new(create_usearch_backend(
559 FastEmbedEmbedder::DEFAULT_DIMENSIONS,
560 )));
561 let config = DeduplicationConfig::default().with_enabled(false);
562
563 let service = DeduplicationService::new(recall, embedder, vector, config);
564
565 let result = service.check("Any content", Namespace::Decisions).unwrap();
566
567 assert!(!result.is_duplicate);
568 }
569
570 #[test]
571 fn test_check_recent_capture() {
572 let service = create_test_service();
573
574 let content = "Use PostgreSQL for the primary database storage.";
575 let memory_id = MemoryId::new("mem-123");
576
577 service.record_capture(content, &memory_id, Namespace::Decisions);
579
580 let result = service.check(content, Namespace::Decisions).unwrap();
582
583 assert!(result.is_duplicate);
584 assert_eq!(
585 result.reason,
586 Some(super::super::types::DuplicateReason::RecentCapture)
587 );
588 assert_eq!(result.matched_memory_id, Some(memory_id));
589 assert!(result.matched_urn.is_some());
590 assert!(
591 result
592 .matched_urn
593 .as_ref()
594 .unwrap()
595 .starts_with("subcog://")
596 );
597 }
598
599 #[test]
600 fn test_check_exact_match() {
601 let index = SqliteBackend::in_memory().unwrap();
602
603 let content = "Use PostgreSQL for the primary database storage.";
605 let hash_tag =
606 DeduplicationService::<FastEmbedEmbedder, RwLockVectorWrapper>::content_to_tag(content);
607 let memory = create_test_memory(
608 "existing-mem-456",
609 content,
610 Namespace::Decisions,
611 vec![hash_tag],
612 );
613 index.index(&memory).unwrap();
614
615 let recall = Arc::new(RecallService::with_index(index));
616 let embedder = Arc::new(FastEmbedEmbedder::new());
617 let vector = Arc::new(RwLockVectorWrapper::new(create_usearch_backend(
618 FastEmbedEmbedder::DEFAULT_DIMENSIONS,
619 )));
620 let config = DeduplicationConfig::default();
621
622 let service = DeduplicationService::new(recall, embedder, vector, config);
623
624 let result = service.check(content, Namespace::Decisions).unwrap();
626
627 assert!(result.is_duplicate);
628 assert_eq!(
629 result.reason,
630 Some(super::super::types::DuplicateReason::ExactMatch)
631 );
632 assert_eq!(
633 result.matched_memory_id,
634 Some(MemoryId::new("existing-mem-456"))
635 );
636 }
637
638 #[test]
639 fn test_check_semantic_match() {
640 let index = SqliteBackend::in_memory().unwrap();
641 let recall = Arc::new(RecallService::with_index(index));
642 let embedder = Arc::new(FastEmbedEmbedder::new());
643 let vector = Arc::new(RwLockVectorWrapper::new(create_usearch_backend(
644 FastEmbedEmbedder::DEFAULT_DIMENSIONS,
645 )));
646
647 let existing_content = "Use PostgreSQL as the primary database for storing user data and application state persistently.";
649 let existing_embedding = embedder.embed(existing_content).unwrap();
650 vector
651 .upsert(&MemoryId::new("semantic-mem-789"), &existing_embedding)
652 .unwrap();
653
654 let config = DeduplicationConfig::default();
655 let service = DeduplicationService::new(recall, embedder, vector, config);
656
657 let result = service
659 .check(existing_content, Namespace::Decisions)
660 .unwrap();
661
662 drop(result);
669 }
670
671 #[test]
672 fn test_content_to_tag() {
673 let content = "Use PostgreSQL for storage";
674 let tag =
675 DeduplicationService::<FastEmbedEmbedder, RwLockVectorWrapper>::content_to_tag(content);
676
677 assert!(tag.starts_with("hash:sha256:"));
678 assert_eq!(tag.len(), "hash:sha256:".len() + 16);
679 }
680
681 #[test]
682 fn test_hash_content() {
683 let content = "Use PostgreSQL for storage";
684 let hash =
685 DeduplicationService::<FastEmbedEmbedder, RwLockVectorWrapper>::hash_content(content);
686
687 assert_eq!(hash.len(), 64); }
689
690 #[test]
691 fn test_is_enabled() {
692 let service = create_test_service();
693 assert!(service.is_enabled());
694
695 let index = SqliteBackend::in_memory().unwrap();
696 let recall = Arc::new(RecallService::with_index(index));
697 let embedder = Arc::new(FastEmbedEmbedder::new());
698 let vector = Arc::new(RwLockVectorWrapper::new(create_usearch_backend(
699 FastEmbedEmbedder::DEFAULT_DIMENSIONS,
700 )));
701 let config = DeduplicationConfig::default().with_enabled(false);
702
703 let disabled_service = DeduplicationService::new(recall, embedder, vector, config);
704 assert!(!disabled_service.is_enabled());
705 }
706
707 #[test]
708 fn test_get_threshold() {
709 let service = create_test_service();
710
711 assert!((service.get_threshold(Namespace::Decisions) - 0.92).abs() < f32::EPSILON);
713 assert!((service.get_threshold(Namespace::Patterns) - 0.90).abs() < f32::EPSILON);
714 assert!((service.get_threshold(Namespace::Learnings) - 0.88).abs() < f32::EPSILON);
715 }
716
717 #[test]
718 fn test_without_embeddings() {
719 let index = SqliteBackend::in_memory().unwrap();
720 let recall = Arc::new(RecallService::with_index(index));
721 let config = DeduplicationConfig::default();
722
723 let service: DeduplicationService<FastEmbedEmbedder, RwLockVectorWrapper> =
724 DeduplicationService::without_embeddings(recall, config);
725
726 let result = service
728 .check("Some content to check", Namespace::Decisions)
729 .unwrap();
730
731 assert!(!result.is_duplicate);
732 }
733
734 #[test]
735 fn test_with_domain() {
736 let service = create_test_service().with_domain(Domain {
737 organization: Some("acme".to_string()),
738 project: Some("myproject".to_string()),
739 repository: None,
740 });
741
742 let content = "Test content for domain check.";
743 let memory_id = MemoryId::new("domain-test-mem");
744
745 service.record_capture(content, &memory_id, Namespace::Decisions);
746
747 let result = service.check(content, Namespace::Decisions).unwrap();
748
749 assert!(result.is_duplicate);
750 assert!(result.matched_urn.is_some());
751 assert!(result.matched_urn.unwrap().contains("acme/myproject"));
752 }
753
754 #[test]
755 fn test_deduplicator_trait() {
756 let service = create_test_service();
757
758 let deduplicator: &dyn Deduplicator = &service;
760
761 let content = "Content for trait test.";
762 let hash = ContentHasher::hash(content);
763 let memory_id = MemoryId::new("trait-test-mem");
764
765 deduplicator.record_capture(&hash, &memory_id);
766
767 let result = deduplicator
770 .check_duplicate(content, Namespace::Decisions)
771 .unwrap();
772
773 assert!(result.is_duplicate);
774 }
775}