1use crate::Result;
18use crate::current_timestamp;
19use crate::llm::LlmProvider;
20use crate::models::{
21 EdgeType, EventMeta, Memory, MemoryEvent, MemoryStatus, MemoryTier, Namespace, RetentionScore,
22};
23use crate::observability::current_request_id;
24use crate::security::record_event;
25use crate::storage::index::SqliteBackend;
26use crate::storage::traits::PersistenceBackend;
27use lru::LruCache;
28use std::collections::HashMap;
29use std::num::NonZeroUsize;
30use std::sync::Arc;
31use std::time::Instant;
32use tracing::{info_span, instrument};
33
34const SECONDS_PER_DAY: f32 = 86400.0;
37const RECENCY_DECAY_DAYS: f32 = 30.0;
39const DEFAULT_IMPORTANCE: f32 = 0.5;
41const CONTRADICTION_THRESHOLD: usize = 10;
43#[allow(clippy::expect_used)]
47const ACCESS_CACHE_CAPACITY: NonZeroUsize = {
48 match NonZeroUsize::new(10_000) {
50 Some(n) => n,
51 None => panic!("ACCESS_CACHE_CAPACITY must be non-zero"),
52 }
53};
54
55pub struct ConsolidationService<P: PersistenceBackend> {
57 persistence: P,
59 access_counts: LruCache<String, u32>,
61 last_access: LruCache<String, u64>,
63 llm: Option<Arc<dyn LlmProvider + Send + Sync>>,
65 index: Option<Arc<SqliteBackend>>,
67}
68
69impl<P: PersistenceBackend> ConsolidationService<P> {
70 #[must_use]
72 pub fn new(persistence: P) -> Self {
73 Self {
74 persistence,
75 access_counts: LruCache::new(ACCESS_CACHE_CAPACITY),
76 last_access: LruCache::new(ACCESS_CACHE_CAPACITY),
77 llm: None,
78 index: None,
79 }
80 }
81
82 #[must_use]
123 pub fn with_llm(mut self, llm: Arc<dyn LlmProvider + Send + Sync>) -> Self {
124 self.llm = Some(llm);
125 self
126 }
127
128 #[must_use]
152 pub fn with_index(mut self, index: Arc<SqliteBackend>) -> Self {
153 self.index = Some(index);
154 self
155 }
156
157 pub fn record_access(&mut self, memory_id: &str) {
187 let now = current_timestamp();
188 let key = memory_id.to_string();
189 let count = self.access_counts.get(&key).copied().unwrap_or(0) + 1;
190 self.access_counts.put(key.clone(), count);
191 self.last_access.put(key, now);
192 }
193
194 #[instrument(
259 name = "subcog.memory.consolidate_memories",
260 skip(self, recall_service, config),
261 fields(
262 request_id = tracing::field::Empty,
263 component = "memory",
264 operation = "consolidate_memories"
265 )
266 )]
267 #[allow(clippy::too_many_lines)]
268 pub fn consolidate_memories(
269 &mut self,
270 recall_service: &crate::services::RecallService,
271 config: &crate::config::ConsolidationConfig,
272 ) -> Result<ConsolidationStats> {
273 let start = Instant::now();
274 if let Some(request_id) = current_request_id() {
275 tracing::Span::current().record("request_id", request_id.as_str());
276 }
277
278 let result = (|| {
279 let mut stats = ConsolidationStats::default();
280
281 if !config.enabled {
283 tracing::info!("Consolidation is disabled in configuration");
284 return Ok(stats);
285 }
286
287 tracing::info!(
289 namespace_filter = ?config.namespace_filter,
290 time_window_days = ?config.time_window_days,
291 similarity_threshold = config.similarity_threshold,
292 "Finding related memory groups for consolidation"
293 );
294
295 let groups = self.find_related_memories(recall_service, config)?;
296
297 if groups.is_empty() {
298 tracing::info!("No related memory groups found for consolidation");
299 return Ok(stats);
300 }
301
302 tracing::info!(
303 namespace_count = groups.len(),
304 "Found related memory groups in {} namespaces",
305 groups.len()
306 );
307
308 for (namespace, namespace_groups) in groups {
310 tracing::debug!(
311 namespace = ?namespace,
312 group_count = namespace_groups.len(),
313 "Processing {} groups in namespace {:?}",
314 namespace_groups.len(),
315 namespace
316 );
317
318 for (group_idx, memory_ids) in namespace_groups.iter().enumerate() {
320 tracing::debug!(
321 namespace = ?namespace,
322 group_idx = group_idx,
323 memory_count = memory_ids.len(),
324 "Processing group {} with {} memories",
325 group_idx,
326 memory_ids.len()
327 );
328
329 let mut memories = Vec::new();
331 for memory_id in memory_ids {
332 if let Some(memory) = self.persistence.get(memory_id)? {
333 memories.push(memory);
334 stats.processed += 1;
335 } else {
336 tracing::warn!(
337 memory_id = %memory_id.as_str(),
338 "Memory not found in persistence, skipping"
339 );
340 }
341 }
342
343 if memories.is_empty() {
344 tracing::warn!(
345 namespace = ?namespace,
346 group_idx = group_idx,
347 "No memories found for group, skipping"
348 );
349 continue;
350 }
351
352 let summary_content = match self.summarize_group(&memories) {
354 Ok(summary) => summary,
355 Err(e) => {
356 metrics::counter!(
358 "consolidation_llm_failures",
359 "namespace" => namespace.as_str()
360 )
361 .increment(1);
362
363 tracing::warn!(
364 error = %e,
365 namespace = ?namespace,
366 group_idx = group_idx,
367 memory_count = memories.len(),
368 "Failed to summarize group, creating relationships without summary"
369 );
370
371 if let Some(ref index) = self.index {
373 self.create_related_edges(&memories, index)?;
374 tracing::info!(
375 namespace = ?namespace,
376 memory_count = memories.len(),
377 "Created RelatedTo edges for group without LLM summary"
378 );
379 } else {
380 tracing::debug!(
381 namespace = ?namespace,
382 memory_count = memories.len(),
383 "Index backend not available, skipping edge creation"
384 );
385 }
386 continue;
387 },
388 };
389
390 match self.create_summary_node(&summary_content, &memories) {
392 Ok(summary_node) => {
393 stats.summaries_created += 1;
394 tracing::info!(
395 summary_id = %summary_node.id.as_str(),
396 namespace = ?namespace,
397 source_count = memories.len(),
398 "Created summary node"
399 );
400 },
401 Err(e) => {
402 tracing::error!(
403 error = %e,
404 namespace = ?namespace,
405 group_idx = group_idx,
406 memory_count = memories.len(),
407 "Failed to create summary node"
408 );
409 return Err(e);
410 },
411 }
412 }
413 }
414
415 record_event(MemoryEvent::Consolidated {
416 meta: EventMeta::new("consolidation", current_request_id()),
417 processed: stats.processed,
418 archived: stats.archived,
419 merged: stats.merged,
420 });
421
422 tracing::info!(
423 processed = stats.processed,
424 summaries_created = stats.summaries_created,
425 "Consolidation completed successfully"
426 );
427
428 Ok(stats)
429 })();
430
431 let status = if result.is_ok() { "success" } else { "error" };
432 metrics::counter!(
433 "memory_operations_total",
434 "operation" => "consolidate_memories",
435 "namespace" => "mixed",
436 "domain" => "project",
437 "status" => status
438 )
439 .increment(1);
440 metrics::histogram!(
441 "memory_operation_duration_ms",
442 "operation" => "consolidate_memories",
443 "namespace" => "mixed"
444 )
445 .record(start.elapsed().as_secs_f64() * 1000.0);
446
447 metrics::counter!(
449 "consolidation_operations_total",
450 "status" => status
451 )
452 .increment(1);
453 metrics::histogram!("consolidation_duration_ms")
454 .record(start.elapsed().as_secs_f64() * 1000.0);
455
456 if let Ok(ref stats) = result {
458 metrics::counter!("consolidation_summaries_created")
459 .increment(stats.summaries_created as u64);
460 }
461
462 result
463 }
464
465 #[instrument(
506 name = "subcog.memory.consolidate",
507 skip(self),
508 fields(
509 request_id = tracing::field::Empty,
510 component = "memory",
511 operation = "consolidate"
512 )
513 )]
514 pub fn consolidate(&mut self) -> Result<ConsolidationStats> {
515 let start = Instant::now();
516 if let Some(request_id) = current_request_id() {
517 tracing::Span::current().record("request_id", request_id.as_str());
518 }
519 let result = (|| {
520 let mut stats = ConsolidationStats::default();
521
522 let memory_ids = self.persistence.list_ids()?;
524 stats.processed = memory_ids.len();
525
526 let now = current_timestamp();
527 let mut to_archive = Vec::new();
528
529 for id in &memory_ids {
530 let score = self.calculate_retention_score(id.as_str(), now);
532 let tier = score.suggested_tier();
533
534 if tier == MemoryTier::Archive {
536 to_archive.push(id.clone());
537 }
538 }
539
540 {
542 let _span = info_span!("subcog.memory.consolidate.archive").entered();
543 for id in to_archive {
544 if let Some(mut memory) = self.persistence.get(&id)? {
545 memory.status = MemoryStatus::Archived;
546 self.persistence.store(&memory)?;
547 record_event(MemoryEvent::Archived {
548 meta: EventMeta::with_timestamp(
549 "consolidation",
550 current_request_id(),
551 now,
552 ),
553 memory_id: memory.id.clone(),
554 reason: "consolidation_archive".to_string(),
555 });
556 stats.archived += 1;
557 }
558 }
559 }
560
561 {
563 let _span = info_span!("subcog.memory.consolidate.contradictions").entered();
564 stats.contradictions = self.detect_contradictions(&memory_ids)?;
565 }
566
567 record_event(MemoryEvent::Consolidated {
568 meta: EventMeta::new("consolidation", current_request_id()),
569 processed: stats.processed,
570 archived: stats.archived,
571 merged: stats.merged,
572 });
573
574 Ok(stats)
575 })();
576
577 let status = if result.is_ok() { "success" } else { "error" };
578 metrics::counter!(
579 "memory_operations_total",
580 "operation" => "consolidate",
581 "namespace" => "mixed",
582 "domain" => "project",
583 "status" => status
584 )
585 .increment(1);
586 metrics::histogram!(
587 "memory_operation_duration_ms",
588 "operation" => "consolidate",
589 "namespace" => "mixed"
590 )
591 .record(start.elapsed().as_secs_f64() * 1000.0);
592 metrics::histogram!(
593 "memory_lifecycle_duration_ms",
594 "component" => "memory",
595 "operation" => "consolidate"
596 )
597 .record(start.elapsed().as_secs_f64() * 1000.0);
598
599 result
600 }
601
602 #[allow(clippy::cast_precision_loss)]
608 fn calculate_retention_score(&self, memory_id: &str, now: u64) -> RetentionScore {
609 let access_count = self.access_counts.peek(memory_id).copied().unwrap_or(0);
612 let max_accesses = self
613 .access_counts
614 .iter()
615 .map(|(_, v)| *v)
616 .max()
617 .unwrap_or(1)
618 .max(1);
619 let access_frequency = (access_count as f32) / (max_accesses as f32);
620
621 let last_access = self.last_access.peek(memory_id).copied().unwrap_or(0);
623 let age_days = (now.saturating_sub(last_access)) as f32 / SECONDS_PER_DAY;
624 let recency = (-age_days / RECENCY_DECAY_DAYS).exp().clamp(0.0, 1.0);
625
626 let importance = DEFAULT_IMPORTANCE;
628
629 RetentionScore::new(access_frequency, recency, importance)
630 }
631
632 fn detect_contradictions(&self, memory_ids: &[crate::models::MemoryId]) -> Result<usize> {
634 let mut contradiction_count = 0;
635 let mut namespace_memories: HashMap<Namespace, Vec<&crate::models::MemoryId>> =
636 HashMap::new();
637
638 for id in memory_ids {
640 if let Some(memory) = self.persistence.get(id)? {
641 namespace_memories
642 .entry(memory.namespace)
643 .or_default()
644 .push(id);
645 }
646 }
647
648 for ids in namespace_memories.values() {
651 if ids.len() > CONTRADICTION_THRESHOLD {
652 contradiction_count += ids.len() / CONTRADICTION_THRESHOLD;
654 }
655 }
656
657 Ok(contradiction_count)
658 }
659
660 pub fn merge_memories(
745 &mut self,
746 source_id: &crate::models::MemoryId,
747 target_id: &crate::models::MemoryId,
748 ) -> Result<Memory> {
749 let source =
750 self.persistence
751 .get(source_id)?
752 .ok_or_else(|| crate::Error::OperationFailed {
753 operation: "merge_memories".to_string(),
754 cause: format!("Source memory not found: {}", source_id.as_str()),
755 })?;
756
757 let target =
758 self.persistence
759 .get(target_id)?
760 .ok_or_else(|| crate::Error::OperationFailed {
761 operation: "merge_memories".to_string(),
762 cause: format!("Target memory not found: {}", target_id.as_str()),
763 })?;
764
765 let now = current_timestamp();
767 let merged_content = format!("{}\n\n---\n\n{}", target.content, source.content);
768
769 let mut merged_tags = target.tags.clone();
771 for tag in &source.tags {
772 if !merged_tags.contains(tag) {
773 merged_tags.push(tag.clone());
774 }
775 }
776
777 let source_created_at = source.created_at;
779 let source_source = source.source.clone();
780
781 let merged = Memory {
782 id: target.id.clone(),
783 content: merged_content,
784 namespace: target.namespace,
785 domain: target.domain,
786 project_id: target
787 .project_id
788 .clone()
789 .or_else(|| source.project_id.clone()),
790 branch: target.branch.clone().or_else(|| source.branch.clone()),
791 file_path: target
792 .file_path
793 .clone()
794 .or_else(|| source.file_path.clone()),
795 status: MemoryStatus::Active,
796 created_at: target.created_at.min(source_created_at),
797 updated_at: now,
798 tombstoned_at: None,
799 expires_at: None,
800 embedding: None, tags: merged_tags,
802 #[cfg(feature = "group-scope")]
803 group_id: None,
804 source: target.source.or(source_source),
805 is_summary: false,
806 source_memory_ids: None,
807 consolidation_timestamp: None,
808 };
809
810 self.persistence.store(&merged)?;
812
813 let mut superseded_source = source;
815 superseded_source.status = MemoryStatus::Superseded;
816 self.persistence.store(&superseded_source)?;
817
818 Ok(merged)
819 }
820
821 pub const fn link_memories(
860 &self,
861 _from_id: &crate::models::MemoryId,
862 _to_id: &crate::models::MemoryId,
863 _edge_type: EdgeType,
864 ) -> Result<()> {
865 Ok(())
868 }
869
870 #[must_use]
914 pub fn get_suggested_tier(&self, memory_id: &str) -> MemoryTier {
915 let now = current_timestamp();
916 let score = self.calculate_retention_score(memory_id, now);
917 score.suggested_tier()
918 }
919
920 pub fn find_related_memories(
963 &self,
964 _recall_service: &crate::services::RecallService,
965 config: &crate::config::ConsolidationConfig,
966 ) -> Result<HashMap<Namespace, Vec<Vec<crate::models::MemoryId>>>> {
967 let memory_ids = self.persistence.list_ids()?;
974
975 let now = current_timestamp();
977 let cutoff_timestamp = config.time_window_days.map(|days| {
978 let days_in_seconds = u64::from(days) * 86400;
979 now.saturating_sub(days_in_seconds)
980 });
981
982 let mut namespace_groups: HashMap<Namespace, Vec<Memory>> = HashMap::new();
984
985 for id in &memory_ids {
986 let Some(memory) = self.persistence.get(id)? else {
987 continue;
988 };
989
990 let outside_window = cutoff_timestamp.is_some_and(|cutoff| memory.created_at < cutoff);
992 if outside_window {
993 continue;
994 }
995
996 let excluded_namespace = config
998 .namespace_filter
999 .as_ref()
1000 .is_some_and(|ns| !ns.contains(&memory.namespace));
1001 if excluded_namespace {
1002 continue;
1003 }
1004
1005 if memory.embedding.is_none() {
1007 tracing::debug!(
1008 memory_id = %memory.id.as_str(),
1009 "Skipping memory without embedding for consolidation"
1010 );
1011 continue;
1012 }
1013
1014 namespace_groups
1015 .entry(memory.namespace)
1016 .or_default()
1017 .push(memory);
1018 }
1019
1020 let mut result: HashMap<Namespace, Vec<Vec<crate::models::MemoryId>>> = HashMap::new();
1022
1023 for (namespace, memories) in namespace_groups {
1024 if memories.len() < config.min_memories_to_consolidate {
1026 tracing::debug!(
1027 namespace = ?namespace,
1028 count = memories.len(),
1029 min_required = config.min_memories_to_consolidate,
1030 "Skipping namespace with insufficient memories"
1031 );
1032 continue;
1033 }
1034
1035 let groups = self.cluster_by_similarity(&memories, config.similarity_threshold)?;
1037
1038 if !groups.is_empty() {
1039 result.insert(namespace, groups);
1040 }
1041 }
1042
1043 Ok(result)
1044 }
1045
1046 fn cluster_by_similarity(
1051 &self,
1052 memories: &[Memory],
1053 threshold: f32,
1054 ) -> Result<Vec<Vec<crate::models::MemoryId>>> {
1055 let mut groups: Vec<Vec<crate::models::MemoryId>> = Vec::new();
1056 let mut assigned: std::collections::HashSet<String> = std::collections::HashSet::new();
1057
1058 for (i, memory) in memories.iter().enumerate() {
1059 if assigned.contains(memory.id.as_str()) {
1061 continue;
1062 }
1063
1064 let Some(ref embedding_i) = memory.embedding else {
1065 continue;
1066 };
1067
1068 let mut group = vec![memory.id.clone()];
1069 assigned.insert(memory.id.as_str().to_string());
1070
1071 Self::find_similar_memories(
1073 memories,
1074 i,
1075 embedding_i,
1076 threshold,
1077 &mut assigned,
1078 &mut group,
1079 );
1080
1081 if group.len() >= 2 {
1083 groups.push(group);
1084 }
1085 }
1086
1087 Ok(groups)
1088 }
1089
1090 fn find_similar_memories(
1094 memories: &[Memory],
1095 current_idx: usize,
1096 current_embedding: &[f32],
1097 threshold: f32,
1098 assigned: &mut std::collections::HashSet<String>,
1099 group: &mut Vec<crate::models::MemoryId>,
1100 ) {
1101 for (j, other_memory) in memories.iter().enumerate() {
1102 if current_idx == j || assigned.contains(other_memory.id.as_str()) {
1103 continue;
1104 }
1105
1106 let Some(ref embedding_j) = other_memory.embedding else {
1107 continue;
1108 };
1109
1110 let similarity = cosine_similarity(current_embedding, embedding_j);
1111 if similarity >= threshold {
1112 group.push(other_memory.id.clone());
1113 assigned.insert(other_memory.id.as_str().to_string());
1114 }
1115 }
1116 }
1117
1118 #[instrument(skip(self, memories), fields(memory_count = memories.len()))]
1163 pub fn summarize_group(&self, memories: &[Memory]) -> Result<String> {
1164 use crate::llm::{BASE_SYSTEM_PROMPT, MEMORY_SUMMARIZATION_PROMPT};
1165
1166 let llm = self.llm.as_ref().ok_or_else(|| {
1168 tracing::warn!("No LLM provider configured for memory summarization");
1169 crate::Error::OperationFailed {
1170 operation: "summarize_group".to_string(),
1171 cause: "LLM provider not configured. Use with_llm() to set provider.".to_string(),
1172 }
1173 })?;
1174
1175 if memories.is_empty() {
1177 tracing::warn!("Attempted to summarize empty group of memories");
1178 return Err(crate::Error::OperationFailed {
1179 operation: "summarize_group".to_string(),
1180 cause: "No memories provided for summarization".to_string(),
1181 });
1182 }
1183
1184 let memories_text = memories
1186 .iter()
1187 .enumerate()
1188 .map(|(i, memory)| {
1189 format!(
1190 "Memory {}: [ID: {}, Namespace: {:?}, Tags: {}]\n{}",
1191 i + 1,
1192 memory.id.as_str(),
1193 memory.namespace,
1194 memory.tags.join(", "),
1195 memory.content
1196 )
1197 })
1198 .collect::<Vec<_>>()
1199 .join("\n\n---\n\n");
1200
1201 let user_prompt = format!(
1202 "Summarize the following {} related memories into a cohesive summary:\n\n{}",
1203 memories.len(),
1204 memories_text
1205 );
1206
1207 let system_prompt = format!("{BASE_SYSTEM_PROMPT}\n\n{MEMORY_SUMMARIZATION_PROMPT}");
1209
1210 tracing::debug!(
1212 memory_count = memories.len(),
1213 "Calling LLM for memory summarization"
1214 );
1215
1216 let summary = llm
1217 .complete_with_system(&system_prompt, &user_prompt)
1218 .map_err(|e| {
1219 tracing::error!(
1220 error = %e,
1221 memory_count = memories.len(),
1222 "LLM summarization failed"
1223 );
1224 crate::Error::OperationFailed {
1225 operation: "summarize_group".to_string(),
1226 cause: format!("LLM summarization failed: {e}"),
1227 }
1228 })?;
1229
1230 let summary = summary.trim().to_string();
1232
1233 if summary.is_empty() {
1235 tracing::warn!("LLM returned empty summary");
1236 return Err(crate::Error::OperationFailed {
1237 operation: "summarize_group".to_string(),
1238 cause: "LLM returned empty summary".to_string(),
1239 });
1240 }
1241
1242 tracing::info!(
1243 memory_count = memories.len(),
1244 summary_length = summary.len(),
1245 "Successfully generated memory summary"
1246 );
1247
1248 Ok(summary)
1249 }
1250
1251 #[allow(clippy::too_many_lines)]
1288 #[instrument(skip(self, summary_content, source_memories), fields(
1289 source_count = source_memories.len(),
1290 summary_length = summary_content.len()
1291 ))]
1292 pub fn create_summary_node(
1293 &mut self,
1294 summary_content: &str,
1295 source_memories: &[Memory],
1296 ) -> Result<Memory> {
1297 if source_memories.is_empty() {
1299 tracing::warn!("Attempted to create summary node with no source memories");
1300 return Err(crate::Error::OperationFailed {
1301 operation: "create_summary_node".to_string(),
1302 cause: "No source memories provided for summary".to_string(),
1303 });
1304 }
1305
1306 if summary_content.trim().is_empty() {
1307 tracing::warn!("Attempted to create summary node with empty content");
1308 return Err(crate::Error::OperationFailed {
1309 operation: "create_summary_node".to_string(),
1310 cause: "Summary content cannot be empty".to_string(),
1311 });
1312 }
1313
1314 let now = current_timestamp();
1315
1316 let summary_id = crate::models::MemoryId::new(format!("summary_{now}"));
1318
1319 let source_memory_ids: Vec<crate::models::MemoryId> =
1321 source_memories.iter().map(|m| m.id.clone()).collect();
1322
1323 let mut merged_tags: Vec<String> = Vec::new();
1325 for memory in source_memories {
1326 for tag in &memory.tags {
1327 if !merged_tags.contains(tag) {
1328 merged_tags.push(tag.clone());
1329 }
1330 }
1331 }
1332
1333 let namespace = source_memories[0].namespace;
1336 let domain = source_memories[0].domain.clone();
1337
1338 let project_id = source_memories[0].project_id.clone();
1340 let branch = source_memories[0].branch.clone();
1341
1342 let summary_node = Memory {
1344 id: summary_id,
1345 content: summary_content.to_string(),
1346 namespace,
1347 domain,
1348 project_id,
1349 branch,
1350 file_path: None, status: MemoryStatus::Active,
1352 created_at: now,
1353 updated_at: now,
1354 tombstoned_at: None,
1355 expires_at: None,
1356 embedding: None, tags: merged_tags,
1358 #[cfg(feature = "group-scope")]
1359 group_id: None,
1360 source: Some("consolidation".to_string()),
1361 is_summary: true,
1362 source_memory_ids: Some(source_memory_ids.clone()),
1363 consolidation_timestamp: Some(now),
1364 };
1365
1366 self.persistence.store(&summary_node)?;
1368
1369 if let Some(ref index) = self.index {
1374 use crate::storage::traits::IndexBackend;
1376 if let Err(e) = index.index(&summary_node) {
1377 tracing::warn!(
1378 error = %e,
1379 summary_id = %summary_node.id.as_str(),
1380 "Failed to index summary node, edges will not be stored"
1381 );
1382 return Ok(summary_node);
1384 }
1385 tracing::debug!(
1386 summary_id = %summary_node.id.as_str(),
1387 source_count = source_memory_ids.len(),
1388 "Storing edge relationships for summary node"
1389 );
1390
1391 let mut summarized_by_edges = 0u64;
1392 let mut source_of_edges = 0u64;
1393 for source_id in &source_memory_ids {
1394 if let Err(e) =
1396 index.store_edge(source_id, &summary_node.id, EdgeType::SummarizedBy)
1397 {
1398 tracing::warn!(
1399 error = %e,
1400 source_id = %source_id.as_str(),
1401 summary_id = %summary_node.id.as_str(),
1402 "Failed to store SummarizedBy edge, continuing"
1403 );
1404 } else {
1405 summarized_by_edges += 1;
1406 }
1407
1408 if let Err(e) = index.store_edge(&summary_node.id, source_id, EdgeType::SourceOf) {
1410 tracing::warn!(
1411 error = %e,
1412 summary_id = %summary_node.id.as_str(),
1413 source_id = %source_id.as_str(),
1414 "Failed to store SourceOf edge, continuing"
1415 );
1416 } else {
1417 source_of_edges += 1;
1418 }
1419 }
1420
1421 metrics::counter!(
1423 "consolidation_edges_created",
1424 "edge_type" => "summarized_by"
1425 )
1426 .increment(summarized_by_edges);
1427 metrics::counter!(
1428 "consolidation_edges_created",
1429 "edge_type" => "source_of"
1430 )
1431 .increment(source_of_edges);
1432
1433 tracing::info!(
1434 summary_id = %summary_node.id.as_str(),
1435 edges_stored = source_memory_ids.len(),
1436 "Stored edge relationships for summary node"
1437 );
1438 } else {
1439 tracing::debug!(
1440 summary_id = %summary_node.id.as_str(),
1441 "Index backend not available, skipping edge storage"
1442 );
1443 }
1444
1445 tracing::info!(
1446 summary_id = %summary_node.id.as_str(),
1447 source_count = source_memories.len(),
1448 tags_count = summary_node.tags.len(),
1449 "Created summary memory node"
1450 );
1451
1452 Ok(summary_node)
1453 }
1454
1455 #[instrument(skip(self, memories, index), fields(memory_count = memories.len()))]
1493 fn create_related_edges(&self, memories: &[Memory], index: &Arc<SqliteBackend>) -> Result<()> {
1494 if memories.len() < 2 {
1495 tracing::debug!("Fewer than 2 memories, skipping edge creation");
1496 return Ok(());
1497 }
1498
1499 let mut edge_count: u64 = 0;
1500
1501 for (i, memory_a) in memories.iter().enumerate() {
1503 for memory_b in memories.iter().skip(i + 1) {
1504 if let Err(e) = index.store_edge(&memory_a.id, &memory_b.id, EdgeType::RelatedTo) {
1506 tracing::warn!(
1507 error = %e,
1508 from_id = %memory_a.id.as_str(),
1509 to_id = %memory_b.id.as_str(),
1510 "Failed to store RelatedTo edge, continuing"
1511 );
1512 } else {
1513 edge_count += 1;
1514 }
1515
1516 if let Err(e) = index.store_edge(&memory_b.id, &memory_a.id, EdgeType::RelatedTo) {
1518 tracing::warn!(
1519 error = %e,
1520 from_id = %memory_b.id.as_str(),
1521 to_id = %memory_a.id.as_str(),
1522 "Failed to store RelatedTo edge (reverse), continuing"
1523 );
1524 } else {
1525 edge_count += 1;
1526 }
1527 }
1528 }
1529
1530 metrics::counter!(
1532 "consolidation_edges_created",
1533 "edge_type" => "related_to"
1534 )
1535 .increment(edge_count);
1536
1537 tracing::info!(
1538 memory_count = memories.len(),
1539 edge_count = edge_count,
1540 "Created RelatedTo edges between memories"
1541 );
1542
1543 Ok(())
1544 }
1545}
1546
1547fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
1558 if a.len() != b.len() {
1559 return 0.0;
1560 }
1561
1562 let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1563 let magnitude_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1564 let magnitude_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1565
1566 if magnitude_a == 0.0 || magnitude_b == 0.0 {
1567 return 0.0;
1568 }
1569
1570 (dot_product / (magnitude_a * magnitude_b)).clamp(0.0, 1.0)
1571}
1572
1573#[derive(Debug, Clone, Default)]
1595pub struct ConsolidationStats {
1596 pub processed: usize,
1598 pub archived: usize,
1600 pub merged: usize,
1602 pub contradictions: usize,
1604 pub summaries_created: usize,
1606}
1607
1608impl ConsolidationStats {
1609 #[must_use]
1628 pub const fn is_empty(&self) -> bool {
1629 self.processed == 0
1630 && self.archived == 0
1631 && self.merged == 0
1632 && self.contradictions == 0
1633 && self.summaries_created == 0
1634 }
1635
1636 #[must_use]
1662 pub fn summary(&self) -> String {
1663 if self.is_empty() {
1664 "No memories to consolidate".to_string()
1665 } else {
1666 format!(
1667 "Processed: {}, Archived: {}, Merged: {}, Contradictions: {}, Summaries: {}",
1668 self.processed,
1669 self.archived,
1670 self.merged,
1671 self.contradictions,
1672 self.summaries_created
1673 )
1674 }
1675 }
1676}
1677
1678#[cfg(test)]
1679#[allow(clippy::items_after_statements, clippy::redundant_clone)]
1680mod tests {
1681 use super::*;
1682 use crate::models::{Domain, MemoryId};
1683 use crate::storage::persistence::FilesystemBackend;
1684
1685 fn create_test_memory(id: &str, content: &str) -> Memory {
1686 Memory {
1687 id: MemoryId::new(id),
1688 content: content.to_string(),
1689 namespace: Namespace::Decisions,
1690 domain: Domain::new(),
1691 project_id: None,
1692 branch: None,
1693 file_path: None,
1694 status: MemoryStatus::Active,
1695 created_at: current_timestamp(),
1696 updated_at: current_timestamp(),
1697 tombstoned_at: None,
1698 expires_at: None,
1699 embedding: None,
1700 tags: vec!["test".to_string()],
1701 #[cfg(feature = "group-scope")]
1702 group_id: None,
1703 source: None,
1704 is_summary: false,
1705 source_memory_ids: None,
1706 consolidation_timestamp: None,
1707 }
1708 }
1709
1710 #[test]
1711 fn test_consolidation_stats_empty() {
1712 let stats = ConsolidationStats::default();
1713 assert!(stats.is_empty());
1714 assert_eq!(stats.summary(), "No memories to consolidate");
1715 }
1716
1717 #[test]
1718 fn test_consolidation_stats_summary() {
1719 let stats = ConsolidationStats {
1720 processed: 10,
1721 archived: 2,
1722 merged: 1,
1723 contradictions: 0,
1724 summaries_created: 0,
1725 };
1726 assert!(!stats.is_empty());
1727 assert!(stats.summary().contains("Processed: 10"));
1728 assert!(stats.summary().contains("Archived: 2"));
1729 }
1730
1731 #[test]
1732 fn test_record_access() {
1733 let temp_dir = tempfile::tempdir().ok();
1734 let path = temp_dir.as_ref().map_or_else(
1735 || std::path::PathBuf::from("/tmp/test_consolidation"),
1736 |d| d.path().to_path_buf(),
1737 );
1738 let backend = FilesystemBackend::new(&path);
1739 let mut service = ConsolidationService::new(backend);
1740
1741 service.record_access("memory_1");
1742 service.record_access("memory_1");
1743 service.record_access("memory_2");
1744
1745 assert_eq!(service.access_counts.peek("memory_1"), Some(&2));
1747 assert_eq!(service.access_counts.peek("memory_2"), Some(&1));
1748 }
1749
1750 #[test]
1751 fn test_get_suggested_tier() {
1752 let temp_dir = tempfile::tempdir().ok();
1753 let path = temp_dir.as_ref().map_or_else(
1754 || std::path::PathBuf::from("/tmp/test_tier"),
1755 |d| d.path().to_path_buf(),
1756 );
1757 let backend = FilesystemBackend::new(&path);
1758 let mut service = ConsolidationService::new(backend);
1759
1760 let tier = service.get_suggested_tier("unknown_memory");
1762 assert!(matches!(tier, MemoryTier::Cold | MemoryTier::Archive));
1763
1764 for _ in 0..100 {
1766 service.record_access("hot_memory");
1767 }
1768
1769 let tier = service.get_suggested_tier("hot_memory");
1771 assert!(matches!(tier, MemoryTier::Hot | MemoryTier::Warm));
1772 }
1773
1774 #[test]
1775 fn test_consolidate_empty() {
1776 let temp_dir = tempfile::tempdir().ok();
1777 let path = temp_dir.as_ref().map_or_else(
1778 || std::path::PathBuf::from("/tmp/test_consolidate_empty"),
1779 |d| d.path().to_path_buf(),
1780 );
1781 let backend = FilesystemBackend::new(&path);
1782 let mut service = ConsolidationService::new(backend);
1783
1784 let result = service.consolidate();
1785 assert!(result.is_ok());
1786
1787 let stats = result.ok();
1788 assert!(stats.is_some());
1789 let stats = stats.as_ref();
1790 assert!(stats.is_some_and(super::ConsolidationStats::is_empty));
1791 }
1792
1793 #[test]
1794 fn test_consolidate_with_memories() {
1795 let temp_dir = tempfile::tempdir().ok();
1796 let path = temp_dir.as_ref().map_or_else(
1797 || std::path::PathBuf::from("/tmp/test_consolidate_with"),
1798 |d| d.path().to_path_buf(),
1799 );
1800 let backend = FilesystemBackend::new(&path);
1801
1802 let memory1 = create_test_memory("mem_1", "First memory");
1804 let memory2 = create_test_memory("mem_2", "Second memory");
1805 let _ = backend.store(&memory1);
1806 let _ = backend.store(&memory2);
1807
1808 let mut service = ConsolidationService::new(backend);
1809
1810 let result = service.consolidate();
1811 assert!(result.is_ok());
1812
1813 let stats = result.ok();
1814 assert!(stats.is_some());
1815 assert_eq!(stats.as_ref().map(|s| s.processed), Some(2));
1816 }
1817
1818 #[test]
1819 fn test_retention_score_calculation() {
1820 let temp_dir = tempfile::tempdir().ok();
1821 let path = temp_dir.as_ref().map_or_else(
1822 || std::path::PathBuf::from("/tmp/test_retention"),
1823 |d| d.path().to_path_buf(),
1824 );
1825 let backend = FilesystemBackend::new(&path);
1826 let service = ConsolidationService::new(backend);
1827
1828 let now = current_timestamp();
1829 let score = service.calculate_retention_score("test_memory", now);
1830
1831 assert!(score.score() >= 0.0);
1833 assert!(score.score() <= 1.0);
1834 }
1835
1836 #[test]
1837 fn test_cosine_similarity_identical_vectors() {
1838 let vec_a = vec![1.0, 2.0, 3.0];
1839 let vec_b = vec![1.0, 2.0, 3.0];
1840 let similarity = super::cosine_similarity(&vec_a, &vec_b);
1841 assert!((similarity - 1.0).abs() < f32::EPSILON);
1842 }
1843
1844 #[test]
1845 fn test_cosine_similarity_orthogonal_vectors() {
1846 let vec_a = vec![1.0, 0.0, 0.0];
1847 let vec_b = vec![0.0, 1.0, 0.0];
1848 let similarity = super::cosine_similarity(&vec_a, &vec_b);
1849 assert!(similarity.abs() < f32::EPSILON);
1850 }
1851
1852 #[test]
1853 fn test_cosine_similarity_different_lengths() {
1854 let vec_a = vec![1.0, 2.0, 3.0];
1855 let vec_b = vec![1.0, 2.0];
1856 let similarity = super::cosine_similarity(&vec_a, &vec_b);
1857 assert!(similarity.abs() < f32::EPSILON);
1858 }
1859
1860 #[test]
1861 fn test_cosine_similarity_zero_vectors() {
1862 let vec_a = vec![0.0, 0.0, 0.0];
1863 let vec_b = vec![1.0, 2.0, 3.0];
1864 let similarity = super::cosine_similarity(&vec_a, &vec_b);
1865 assert!(similarity.abs() < f32::EPSILON);
1866 }
1867
1868 #[test]
1869 fn test_find_related_memories_no_vector_search() {
1870 let temp_dir = tempfile::tempdir().ok();
1871 let path = temp_dir.as_ref().map_or_else(
1872 || std::path::PathBuf::from("/tmp/test_find_related"),
1873 |d| d.path().to_path_buf(),
1874 );
1875 let backend = FilesystemBackend::new(&path);
1876 let service = ConsolidationService::new(backend);
1877
1878 let recall = crate::services::RecallService::new();
1880 let config = crate::config::ConsolidationConfig::new();
1881
1882 let result = service.find_related_memories(&recall, &config);
1883 assert!(result.is_ok());
1884 assert!(result.unwrap().is_empty());
1885 }
1886
1887 #[test]
1888 fn test_cluster_by_similarity() {
1889 let temp_dir = tempfile::tempdir().ok();
1890 let path = temp_dir.as_ref().map_or_else(
1891 || std::path::PathBuf::from("/tmp/test_cluster"),
1892 |d| d.path().to_path_buf(),
1893 );
1894 let backend = FilesystemBackend::new(&path);
1895 let service = ConsolidationService::new(backend);
1896
1897 let embedding_a = vec![1.0, 0.0, 0.0];
1899 let embedding_b = vec![0.9, 0.1, 0.0]; let embedding_c = vec![0.0, 1.0, 0.0]; let mut memory_a = create_test_memory("mem_a", "Content A");
1903 memory_a.embedding = Some(embedding_a);
1904
1905 let mut memory_b = create_test_memory("mem_b", "Content B");
1906 memory_b.embedding = Some(embedding_b);
1907
1908 let mut memory_c = create_test_memory("mem_c", "Content C");
1909 memory_c.embedding = Some(embedding_c);
1910
1911 let memories = vec![memory_a, memory_b, memory_c];
1912
1913 let result = service.cluster_by_similarity(&memories, 0.7);
1915 assert!(result.is_ok());
1916
1917 let groups = result.unwrap();
1918 assert!(!groups.is_empty());
1920
1921 let has_group = groups.iter().any(|g| g.len() >= 2);
1923 assert!(has_group);
1924 }
1925
1926 #[test]
1927 fn test_cluster_by_similarity_no_embeddings() {
1928 let temp_dir = tempfile::tempdir().ok();
1929 let path = temp_dir.as_ref().map_or_else(
1930 || std::path::PathBuf::from("/tmp/test_cluster_no_emb"),
1931 |d| d.path().to_path_buf(),
1932 );
1933 let backend = FilesystemBackend::new(&path);
1934 let service = ConsolidationService::new(backend);
1935
1936 let memory_a = create_test_memory("mem_a", "Content A");
1938 let memory_b = create_test_memory("mem_b", "Content B");
1939
1940 let memories = vec![memory_a, memory_b];
1941
1942 let result = service.cluster_by_similarity(&memories, 0.7);
1943 assert!(result.is_ok());
1944
1945 let groups = result.unwrap();
1946 assert!(groups.is_empty());
1948 }
1949
1950 #[test]
1951 fn test_cluster_by_similarity_high_threshold() {
1952 let temp_dir = tempfile::tempdir().ok();
1953 let path = temp_dir.as_ref().map_or_else(
1954 || std::path::PathBuf::from("/tmp/test_cluster_high"),
1955 |d| d.path().to_path_buf(),
1956 );
1957 let backend = FilesystemBackend::new(&path);
1958 let service = ConsolidationService::new(backend);
1959
1960 let embedding_a = vec![1.0, 0.0, 0.0];
1963 let embedding_b = vec![0.9, 0.3, 0.0];
1964
1965 let mut memory_a = create_test_memory("mem_a", "Content A");
1966 memory_a.embedding = Some(embedding_a);
1967
1968 let mut memory_b = create_test_memory("mem_b", "Content B");
1969 memory_b.embedding = Some(embedding_b);
1970
1971 let memories = vec![memory_a, memory_b];
1972
1973 let result = service.cluster_by_similarity(&memories, 0.99);
1975 assert!(result.is_ok());
1976
1977 let groups = result.unwrap();
1978 assert!(groups.is_empty());
1980 }
1981
1982 #[test]
1983 fn test_summarize_group_no_llm() {
1984 let temp_dir = tempfile::tempdir().ok();
1985 let path = temp_dir.as_ref().map_or_else(
1986 || std::path::PathBuf::from("/tmp/test_summarize_no_llm"),
1987 |d| d.path().to_path_buf(),
1988 );
1989 let backend = FilesystemBackend::new(&path);
1990 let service = ConsolidationService::new(backend);
1991
1992 let memory_a = create_test_memory("mem_a", "First decision");
1994 let memory_b = create_test_memory("mem_b", "Second decision");
1995
1996 let memories = vec![memory_a, memory_b];
1997
1998 let result = service.summarize_group(&memories);
2000 assert!(result.is_err());
2001 assert!(
2002 result
2003 .unwrap_err()
2004 .to_string()
2005 .contains("LLM provider not configured")
2006 );
2007 }
2008
2009 #[test]
2010 fn test_summarize_group_empty_memories() {
2011 use std::sync::Arc;
2012
2013 let temp_dir = tempfile::tempdir().ok();
2014 let path = temp_dir.as_ref().map_or_else(
2015 || std::path::PathBuf::from("/tmp/test_summarize_empty"),
2016 |d| d.path().to_path_buf(),
2017 );
2018 let backend = FilesystemBackend::new(&path);
2019
2020 struct MockLlm;
2022 impl crate::llm::LlmProvider for MockLlm {
2023 fn name(&self) -> &'static str {
2024 "mock"
2025 }
2026 fn complete(&self, _prompt: &str) -> Result<String> {
2027 Ok("Mock summary".to_string())
2028 }
2029 fn analyze_for_capture(&self, _content: &str) -> Result<crate::llm::CaptureAnalysis> {
2030 Err(crate::Error::OperationFailed {
2031 operation: "analyze_for_capture".to_string(),
2032 cause: "Not implemented for mock".to_string(),
2033 })
2034 }
2035 }
2036
2037 let llm: Arc<dyn crate::llm::LlmProvider + Send + Sync> = Arc::new(MockLlm);
2038 let service = ConsolidationService::new(backend).with_llm(llm);
2039
2040 let result = service.summarize_group(&[]);
2042 assert!(result.is_err());
2043 assert!(
2044 result
2045 .unwrap_err()
2046 .to_string()
2047 .contains("No memories provided")
2048 );
2049 }
2050
2051 #[test]
2052 fn test_summarize_group_with_mock_llm() {
2053 use std::sync::Arc;
2054
2055 let temp_dir = tempfile::tempdir().ok();
2056 let path = temp_dir.as_ref().map_or_else(
2057 || std::path::PathBuf::from("/tmp/test_summarize_mock"),
2058 |d| d.path().to_path_buf(),
2059 );
2060 let backend = FilesystemBackend::new(&path);
2061
2062 struct MockLlm;
2064 impl crate::llm::LlmProvider for MockLlm {
2065 fn name(&self) -> &'static str {
2066 "mock"
2067 }
2068 fn complete(&self, _prompt: &str) -> Result<String> {
2069 Ok("This is a comprehensive summary of the related decisions, preserving all key technical details.".to_string())
2070 }
2071 fn analyze_for_capture(&self, _content: &str) -> Result<crate::llm::CaptureAnalysis> {
2072 Err(crate::Error::OperationFailed {
2073 operation: "analyze_for_capture".to_string(),
2074 cause: "Not implemented for mock".to_string(),
2075 })
2076 }
2077 }
2078
2079 let llm: Arc<dyn crate::llm::LlmProvider + Send + Sync> = Arc::new(MockLlm);
2080 let service = ConsolidationService::new(backend).with_llm(llm);
2081
2082 let memory_a = create_test_memory("mem_a", "Use PostgreSQL for primary storage");
2084 let memory_b = create_test_memory("mem_b", "Enable JSONB for flexible schemas");
2085
2086 let memories = vec![memory_a, memory_b];
2087
2088 let result = service.summarize_group(&memories);
2090 assert!(result.is_ok());
2091
2092 let summary = result.unwrap();
2093 assert!(!summary.is_empty());
2094 assert!(summary.contains("comprehensive summary"));
2095 }
2096
2097 #[test]
2098 fn test_summarize_group_llm_failure() {
2099 use std::sync::Arc;
2100
2101 let temp_dir = tempfile::tempdir().ok();
2102 let path = temp_dir.as_ref().map_or_else(
2103 || std::path::PathBuf::from("/tmp/test_summarize_fail"),
2104 |d| d.path().to_path_buf(),
2105 );
2106 let backend = FilesystemBackend::new(&path);
2107
2108 struct FailingMockLlm;
2110 impl crate::llm::LlmProvider for FailingMockLlm {
2111 fn name(&self) -> &'static str {
2112 "failing_mock"
2113 }
2114 fn complete(&self, _prompt: &str) -> Result<String> {
2115 Err(crate::Error::OperationFailed {
2116 operation: "llm_complete".to_string(),
2117 cause: "Mock LLM failure".to_string(),
2118 })
2119 }
2120 fn analyze_for_capture(&self, _content: &str) -> Result<crate::llm::CaptureAnalysis> {
2121 Err(crate::Error::OperationFailed {
2122 operation: "analyze_for_capture".to_string(),
2123 cause: "Not implemented for mock".to_string(),
2124 })
2125 }
2126 }
2127
2128 let llm: Arc<dyn crate::llm::LlmProvider + Send + Sync> = Arc::new(FailingMockLlm);
2129 let service = ConsolidationService::new(backend).with_llm(llm);
2130
2131 let memory_a = create_test_memory("mem_a", "First decision");
2133 let memory_b = create_test_memory("mem_b", "Second decision");
2134
2135 let memories = vec![memory_a, memory_b];
2136
2137 let result = service.summarize_group(&memories);
2139 assert!(result.is_err());
2140 assert!(
2141 result
2142 .unwrap_err()
2143 .to_string()
2144 .contains("LLM summarization failed")
2145 );
2146 }
2147
2148 #[test]
2149 fn test_summarize_group_empty_response() {
2150 use std::sync::Arc;
2151
2152 let temp_dir = tempfile::tempdir().ok();
2153 let path = temp_dir.as_ref().map_or_else(
2154 || std::path::PathBuf::from("/tmp/test_summarize_empty_resp"),
2155 |d| d.path().to_path_buf(),
2156 );
2157 let backend = FilesystemBackend::new(&path);
2158
2159 struct EmptyMockLlm;
2161 impl crate::llm::LlmProvider for EmptyMockLlm {
2162 fn name(&self) -> &'static str {
2163 "empty_mock"
2164 }
2165 fn complete(&self, _prompt: &str) -> Result<String> {
2166 Ok(" ".to_string()) }
2168 fn analyze_for_capture(&self, _content: &str) -> Result<crate::llm::CaptureAnalysis> {
2169 Err(crate::Error::OperationFailed {
2170 operation: "analyze_for_capture".to_string(),
2171 cause: "Not implemented for mock".to_string(),
2172 })
2173 }
2174 }
2175
2176 let llm: Arc<dyn crate::llm::LlmProvider + Send + Sync> = Arc::new(EmptyMockLlm);
2177 let service = ConsolidationService::new(backend).with_llm(llm);
2178
2179 let memory_a = create_test_memory("mem_a", "First decision");
2181
2182 let memories = vec![memory_a];
2183
2184 let result = service.summarize_group(&memories);
2186 assert!(result.is_err());
2187 assert!(result.unwrap_err().to_string().contains("empty summary"));
2188 }
2189
2190 #[test]
2191 fn test_create_summary_node_success() {
2192 let temp_dir = tempfile::tempdir().ok();
2193 let path = temp_dir.as_ref().map_or_else(
2194 || std::path::PathBuf::from("/tmp/test_create_summary"),
2195 |d| d.path().to_path_buf(),
2196 );
2197 let backend = FilesystemBackend::new(&path);
2198 let mut service = ConsolidationService::new(backend);
2199
2200 let mut memory_a = create_test_memory("mem_a", "First decision");
2202 memory_a.tags = vec!["database".to_string(), "postgres".to_string()];
2203
2204 let mut memory_b = create_test_memory("mem_b", "Second decision");
2205 memory_b.tags = vec!["database".to_string(), "schema".to_string()];
2206
2207 let source_memories = vec![memory_a.clone(), memory_b.clone()];
2208 let summary_content = "Combined database decisions using PostgreSQL";
2209
2210 let result = service.create_summary_node(summary_content, &source_memories);
2212 assert!(result.is_ok());
2213
2214 let summary_node = result.unwrap();
2215
2216 assert!(summary_node.is_summary);
2218 assert_eq!(summary_node.content, summary_content);
2219 assert!(summary_node.source_memory_ids.is_some());
2220 assert_eq!(summary_node.source_memory_ids.as_ref().unwrap().len(), 2);
2221 assert!(summary_node.consolidation_timestamp.is_some());
2222 assert_eq!(summary_node.source, Some("consolidation".to_string()));
2223
2224 assert_eq!(summary_node.tags.len(), 3); assert!(summary_node.tags.contains(&"database".to_string()));
2227 assert!(summary_node.tags.contains(&"postgres".to_string()));
2228 assert!(summary_node.tags.contains(&"schema".to_string()));
2229
2230 assert_eq!(summary_node.namespace, memory_a.namespace);
2232 assert_eq!(summary_node.domain, memory_a.domain);
2233 }
2234
2235 #[test]
2236 fn test_create_summary_node_empty_sources() {
2237 let temp_dir = tempfile::tempdir().ok();
2238 let path = temp_dir.as_ref().map_or_else(
2239 || std::path::PathBuf::from("/tmp/test_create_summary_empty"),
2240 |d| d.path().to_path_buf(),
2241 );
2242 let backend = FilesystemBackend::new(&path);
2243 let mut service = ConsolidationService::new(backend);
2244
2245 let summary_content = "Summary text";
2246
2247 let result = service.create_summary_node(summary_content, &[]);
2249 assert!(result.is_err());
2250 assert!(
2251 result
2252 .unwrap_err()
2253 .to_string()
2254 .contains("No source memories")
2255 );
2256 }
2257
2258 #[test]
2259 fn test_create_summary_node_empty_content() {
2260 let temp_dir = tempfile::tempdir().ok();
2261 let path = temp_dir.as_ref().map_or_else(
2262 || std::path::PathBuf::from("/tmp/test_create_summary_empty_content"),
2263 |d| d.path().to_path_buf(),
2264 );
2265 let backend = FilesystemBackend::new(&path);
2266 let mut service = ConsolidationService::new(backend);
2267
2268 let memory_a = create_test_memory("mem_a", "Content");
2269 let source_memories = vec![memory_a];
2270
2271 let result = service.create_summary_node(" ", &source_memories);
2273 assert!(result.is_err());
2274 assert!(
2275 result
2276 .unwrap_err()
2277 .to_string()
2278 .contains("Summary content cannot be empty")
2279 );
2280 }
2281
2282 #[test]
2283 fn test_create_summary_node_tags_deduplication() {
2284 let temp_dir = tempfile::tempdir().ok();
2285 let path = temp_dir.as_ref().map_or_else(
2286 || std::path::PathBuf::from("/tmp/test_create_summary_tags"),
2287 |d| d.path().to_path_buf(),
2288 );
2289 let backend = FilesystemBackend::new(&path);
2290 let mut service = ConsolidationService::new(backend);
2291
2292 let mut memory_a = create_test_memory("mem_a", "Content A");
2294 memory_a.tags = vec!["tag1".to_string(), "tag2".to_string(), "tag3".to_string()];
2295
2296 let mut memory_b = create_test_memory("mem_b", "Content B");
2297 memory_b.tags = vec!["tag2".to_string(), "tag3".to_string(), "tag4".to_string()];
2298
2299 let mut memory_c = create_test_memory("mem_c", "Content C");
2300 memory_c.tags = vec!["tag1".to_string(), "tag4".to_string(), "tag5".to_string()];
2301
2302 let source_memories = vec![memory_a, memory_b, memory_c];
2303 let summary_content = "Summary with merged tags";
2304
2305 let result = service.create_summary_node(summary_content, &source_memories);
2306 assert!(result.is_ok());
2307
2308 let summary_node = result.unwrap();
2309
2310 assert_eq!(summary_node.tags.len(), 5);
2312 assert!(summary_node.tags.contains(&"tag1".to_string()));
2313 assert!(summary_node.tags.contains(&"tag2".to_string()));
2314 assert!(summary_node.tags.contains(&"tag3".to_string()));
2315 assert!(summary_node.tags.contains(&"tag4".to_string()));
2316 assert!(summary_node.tags.contains(&"tag5".to_string()));
2317 }
2318
2319 #[test]
2320 fn test_create_summary_node_stored_in_persistence() {
2321 let temp_dir = tempfile::tempdir().ok();
2322 let path = temp_dir.as_ref().map_or_else(
2323 || std::path::PathBuf::from("/tmp/test_create_summary_persist"),
2324 |d| d.path().to_path_buf(),
2325 );
2326 let backend = FilesystemBackend::new(&path);
2327 let mut service = ConsolidationService::new(backend);
2328
2329 let memory_a = create_test_memory("mem_a", "Content A");
2330 let source_memories = vec![memory_a];
2331 let summary_content = "Persisted summary";
2332
2333 let result = service.create_summary_node(summary_content, &source_memories);
2334 assert!(result.is_ok());
2335
2336 let summary_node = result.unwrap();
2337
2338 let retrieved = service.persistence.get(&summary_node.id);
2340 assert!(retrieved.is_ok());
2341 assert!(retrieved.unwrap().is_some());
2342
2343 let retrieved_node = service.persistence.get(&summary_node.id).unwrap().unwrap();
2344 assert_eq!(retrieved_node.id, summary_node.id);
2345 assert!(retrieved_node.is_summary);
2346 assert_eq!(retrieved_node.content, summary_content);
2347 }
2348
2349 #[test]
2350 fn test_create_summary_node_source_ids_preserved() {
2351 let temp_dir = tempfile::tempdir().ok();
2352 let path = temp_dir.as_ref().map_or_else(
2353 || std::path::PathBuf::from("/tmp/test_create_summary_source_ids"),
2354 |d| d.path().to_path_buf(),
2355 );
2356 let backend = FilesystemBackend::new(&path);
2357 let mut service = ConsolidationService::new(backend);
2358
2359 let memory_a = create_test_memory("mem_a", "Content A");
2360 let memory_b = create_test_memory("mem_b", "Content B");
2361 let memory_c = create_test_memory("mem_c", "Content C");
2362
2363 let source_memories = vec![memory_a.clone(), memory_b.clone(), memory_c.clone()];
2364 let summary_content = "Summary of three memories";
2365
2366 let result = service.create_summary_node(summary_content, &source_memories);
2367 assert!(result.is_ok());
2368
2369 let summary_node = result.unwrap();
2370
2371 let source_ids = summary_node.source_memory_ids.unwrap();
2373 assert_eq!(source_ids.len(), 3);
2374 assert!(source_ids.contains(&memory_a.id));
2375 assert!(source_ids.contains(&memory_b.id));
2376 assert!(source_ids.contains(&memory_c.id));
2377 }
2378
2379 #[test]
2380 fn test_create_summary_node_inherits_project_info() {
2381 let temp_dir = tempfile::tempdir().ok();
2382 let path = temp_dir.as_ref().map_or_else(
2383 || std::path::PathBuf::from("/tmp/test_create_summary_project"),
2384 |d| d.path().to_path_buf(),
2385 );
2386 let backend = FilesystemBackend::new(&path);
2387 let mut service = ConsolidationService::new(backend);
2388
2389 let mut memory_a = create_test_memory("mem_a", "Content A");
2390 memory_a.project_id = Some("project123".to_string());
2391 memory_a.branch = Some("main".to_string());
2392
2393 let source_memories = vec![memory_a.clone()];
2394 let summary_content = "Summary with project info";
2395
2396 let result = service.create_summary_node(summary_content, &source_memories);
2397 assert!(result.is_ok());
2398
2399 let summary_node = result.unwrap();
2400
2401 assert_eq!(summary_node.project_id, Some("project123".to_string()));
2403 assert_eq!(summary_node.branch, Some("main".to_string()));
2404 }
2405
2406 #[test]
2407 fn test_create_summary_node_stores_edges_with_index() {
2408 use crate::storage::index::SqliteBackend;
2409 use crate::storage::traits::IndexBackend;
2410
2411 let temp_dir = tempfile::tempdir().ok();
2412 let path = temp_dir.as_ref().map_or_else(
2413 || std::path::PathBuf::from("/tmp/test_create_summary_edges"),
2414 |d| d.path().to_path_buf(),
2415 );
2416 let backend = FilesystemBackend::new(&path);
2417
2418 let index = SqliteBackend::in_memory().expect("Failed to create in-memory SQLite");
2420 let index_arc = Arc::new(index);
2421
2422 let mut service = ConsolidationService::new(backend).with_index(index_arc.clone());
2423
2424 let memory_a = create_test_memory("edge_source_1", "First memory");
2426 let memory_b = create_test_memory("edge_source_2", "Second memory");
2427
2428 index_arc
2430 .index(&memory_a)
2431 .expect("Failed to index memory_a");
2432 index_arc
2433 .index(&memory_b)
2434 .expect("Failed to index memory_b");
2435
2436 let source_memories = vec![memory_a.clone(), memory_b.clone()];
2437 let summary_content = "Summary of two memories";
2438
2439 let result = service.create_summary_node(summary_content, &source_memories);
2441 assert!(result.is_ok());
2442
2443 let summary_node = result.unwrap();
2444
2445 let edges_from_a = index_arc
2449 .query_edges(&memory_a.id, EdgeType::SummarizedBy)
2450 .expect("Failed to query edges from memory_a");
2451 let edges_from_b = index_arc
2452 .query_edges(&memory_b.id, EdgeType::SummarizedBy)
2453 .expect("Failed to query edges from memory_b");
2454
2455 assert_eq!(edges_from_a.len(), 1, "memory_a should have 1 edge");
2457 assert_eq!(edges_from_b.len(), 1, "memory_b should have 1 edge");
2458 assert_eq!(
2459 edges_from_a[0], summary_node.id,
2460 "memory_a edge should point to summary"
2461 );
2462 assert_eq!(
2463 edges_from_b[0], summary_node.id,
2464 "memory_b edge should point to summary"
2465 );
2466 }
2467
2468 #[test]
2469 fn test_create_summary_node_without_index_backend() {
2470 let temp_dir = tempfile::tempdir().ok();
2471 let path = temp_dir.as_ref().map_or_else(
2472 || std::path::PathBuf::from("/tmp/test_create_summary_no_index"),
2473 |d| d.path().to_path_buf(),
2474 );
2475 let backend = FilesystemBackend::new(&path);
2476 let mut service = ConsolidationService::new(backend); let memory_a = create_test_memory("mem_a", "Content A");
2479 let source_memories = vec![memory_a];
2480 let summary_content = "Summary without index";
2481
2482 let result = service.create_summary_node(summary_content, &source_memories);
2484 assert!(result.is_ok());
2485
2486 let summary_node = result.unwrap();
2487 assert!(summary_node.is_summary);
2488 }
2490
2491 #[test]
2492 fn test_consolidate_memories_disabled() {
2493 let temp_dir = tempfile::tempdir().ok();
2494 let path = temp_dir.as_ref().map_or_else(
2495 || std::path::PathBuf::from("/tmp/test_consolidate_disabled"),
2496 |d| d.path().to_path_buf(),
2497 );
2498 let backend = FilesystemBackend::new(&path);
2499 let mut service = ConsolidationService::new(backend);
2500
2501 let recall = crate::services::RecallService::new();
2502 let config = crate::config::ConsolidationConfig::new(); let result = service.consolidate_memories(&recall, &config);
2505 assert!(result.is_ok());
2506
2507 let stats = result.unwrap();
2508 assert!(stats.is_empty());
2509 assert_eq!(stats.summaries_created, 0);
2510 }
2511
2512 #[test]
2513 fn test_consolidate_memories_no_vector_search() {
2514 let temp_dir = tempfile::tempdir().ok();
2515 let path = temp_dir.as_ref().map_or_else(
2516 || std::path::PathBuf::from("/tmp/test_consolidate_no_vector"),
2517 |d| d.path().to_path_buf(),
2518 );
2519 let backend = FilesystemBackend::new(&path);
2520 let mut service = ConsolidationService::new(backend);
2521
2522 let recall = crate::services::RecallService::new();
2524 let mut config = crate::config::ConsolidationConfig::new();
2525 config.enabled = true;
2526
2527 let result = service.consolidate_memories(&recall, &config);
2528 assert!(result.is_ok());
2529
2530 let stats = result.unwrap();
2531 assert_eq!(stats.summaries_created, 0);
2533 }
2534
2535 #[test]
2536 fn test_consolidate_memories_no_llm() {
2537 use crate::embedding::Embedder as EmbedderTrait;
2538 use crate::storage::traits::VectorBackend;
2539
2540 let temp_dir = tempfile::tempdir().ok();
2541 let path = temp_dir.as_ref().map_or_else(
2542 || std::path::PathBuf::from("/tmp/test_consolidate_no_llm"),
2543 |d| d.path().to_path_buf(),
2544 );
2545 let backend = FilesystemBackend::new(&path);
2546 let mut service = ConsolidationService::new(backend);
2547
2548 let embedding_a = vec![1.0, 0.0, 0.0];
2550 let embedding_b = vec![0.9, 0.1, 0.0]; let mut memory_a = create_test_memory("consolidate_mem_a", "Decision about PostgreSQL");
2553 memory_a.embedding = Some(embedding_a);
2554 memory_a.namespace = crate::models::Namespace::Decisions;
2555
2556 let mut memory_b = create_test_memory("consolidate_mem_b", "Use PostgreSQL for storage");
2557 memory_b.embedding = Some(embedding_b);
2558 memory_b.namespace = crate::models::Namespace::Decisions;
2559
2560 let _ = service.persistence.store(&memory_a);
2562 let _ = service.persistence.store(&memory_b);
2563
2564 struct MockEmbedder;
2566 impl EmbedderTrait for MockEmbedder {
2567 fn dimensions(&self) -> usize {
2568 3
2569 }
2570 fn embed(&self, _text: &str) -> Result<Vec<f32>> {
2571 Ok(vec![1.0, 0.0, 0.0])
2572 }
2573 }
2574
2575 struct MockVectorBackend;
2577 impl VectorBackend for MockVectorBackend {
2578 fn dimensions(&self) -> usize {
2579 3
2580 }
2581 fn upsert(&self, _id: &MemoryId, _embedding: &[f32]) -> Result<()> {
2582 Ok(())
2583 }
2584 fn remove(&self, _id: &MemoryId) -> Result<bool> {
2585 Ok(true)
2586 }
2587 fn search(
2588 &self,
2589 _query_embedding: &[f32],
2590 _filter: &crate::storage::traits::VectorFilter,
2591 _limit: usize,
2592 ) -> Result<Vec<(MemoryId, f32)>> {
2593 Ok(vec![])
2594 }
2595 fn count(&self) -> Result<usize> {
2596 Ok(0)
2597 }
2598 fn clear(&self) -> Result<()> {
2599 Ok(())
2600 }
2601 }
2602
2603 let recall = crate::services::RecallService::new()
2605 .with_embedder(Arc::new(MockEmbedder))
2606 .with_vector(Arc::new(MockVectorBackend));
2607
2608 let mut config = crate::config::ConsolidationConfig::new();
2610 config.enabled = true;
2611 config.similarity_threshold = 0.7;
2612
2613 let result = service.consolidate_memories(&recall, &config);
2614 assert!(result.is_ok());
2615
2616 let stats = result.unwrap();
2618 assert_eq!(stats.summaries_created, 0);
2619 }
2620
2621 #[test]
2622 fn test_consolidate_memories_with_mock_llm() {
2623 use crate::embedding::Embedder as EmbedderTrait;
2624 use crate::storage::traits::VectorBackend;
2625 use std::sync::Arc;
2626
2627 let temp_dir = tempfile::tempdir().ok();
2628 let path = temp_dir.as_ref().map_or_else(
2629 || std::path::PathBuf::from("/tmp/test_consolidate_with_llm"),
2630 |d| d.path().to_path_buf(),
2631 );
2632 let backend = FilesystemBackend::new(&path);
2633
2634 struct MockLlm;
2636 impl crate::llm::LlmProvider for MockLlm {
2637 fn name(&self) -> &'static str {
2638 "mock"
2639 }
2640 fn complete(&self, _prompt: &str) -> Result<String> {
2641 Ok("Comprehensive summary of database decisions: Use PostgreSQL with JSONB support.".to_string())
2642 }
2643 fn analyze_for_capture(&self, _content: &str) -> Result<crate::llm::CaptureAnalysis> {
2644 Err(crate::Error::OperationFailed {
2645 operation: "analyze_for_capture".to_string(),
2646 cause: "Not implemented for mock".to_string(),
2647 })
2648 }
2649 }
2650
2651 let llm: Arc<dyn crate::llm::LlmProvider + Send + Sync> = Arc::new(MockLlm);
2652 let mut service = ConsolidationService::new(backend).with_llm(llm);
2653
2654 let embedding_a = vec![1.0, 0.0, 0.0];
2656 let embedding_b = vec![0.95, 0.05, 0.0]; let mut memory_a = create_test_memory("consolidate_llm_a", "Use PostgreSQL for storage");
2659 memory_a.embedding = Some(embedding_a.clone());
2660 memory_a.namespace = crate::models::Namespace::Decisions;
2661
2662 let mut memory_b = create_test_memory("consolidate_llm_b", "Enable JSONB in PostgreSQL");
2663 memory_b.embedding = Some(embedding_b.clone());
2664 memory_b.namespace = crate::models::Namespace::Decisions;
2665
2666 let _ = service.persistence.store(&memory_a);
2668 let _ = service.persistence.store(&memory_b);
2669
2670 struct MockEmbedder;
2672 impl EmbedderTrait for MockEmbedder {
2673 fn dimensions(&self) -> usize {
2674 3
2675 }
2676 fn embed(&self, _text: &str) -> Result<Vec<f32>> {
2677 Ok(vec![1.0, 0.0, 0.0])
2678 }
2679 }
2680
2681 struct MockVectorBackend;
2683 impl VectorBackend for MockVectorBackend {
2684 fn dimensions(&self) -> usize {
2685 3
2686 }
2687 fn upsert(&self, _id: &MemoryId, _embedding: &[f32]) -> Result<()> {
2688 Ok(())
2689 }
2690 fn remove(&self, _id: &MemoryId) -> Result<bool> {
2691 Ok(true)
2692 }
2693 fn search(
2694 &self,
2695 _query_embedding: &[f32],
2696 _filter: &crate::storage::traits::VectorFilter,
2697 _limit: usize,
2698 ) -> Result<Vec<(MemoryId, f32)>> {
2699 Ok(vec![])
2700 }
2701 fn count(&self) -> Result<usize> {
2702 Ok(0)
2703 }
2704 fn clear(&self) -> Result<()> {
2705 Ok(())
2706 }
2707 }
2708
2709 let recall = crate::services::RecallService::new()
2711 .with_embedder(Arc::new(MockEmbedder))
2712 .with_vector(Arc::new(MockVectorBackend));
2713
2714 let mut config = crate::config::ConsolidationConfig::new();
2715 config.enabled = true;
2716 config.similarity_threshold = 0.7;
2717 config.min_memories_to_consolidate = 2;
2718
2719 let result = service.consolidate_memories(&recall, &config);
2720 assert!(result.is_ok());
2721
2722 let stats = result.unwrap();
2723 assert_eq!(stats.processed, 2);
2725 assert_eq!(stats.summaries_created, 1);
2726 }
2727
2728 #[test]
2729 fn test_consolidate_memories_respects_namespace_filter() {
2730 use crate::embedding::Embedder as EmbedderTrait;
2731 use crate::storage::traits::VectorBackend;
2732 use std::sync::Arc;
2733
2734 let temp_dir = tempfile::tempdir().ok();
2735 let path = temp_dir.as_ref().map_or_else(
2736 || std::path::PathBuf::from("/tmp/test_consolidate_namespace_filter"),
2737 |d| d.path().to_path_buf(),
2738 );
2739 let backend = FilesystemBackend::new(&path);
2740
2741 struct MockLlm;
2743 impl crate::llm::LlmProvider for MockLlm {
2744 fn name(&self) -> &'static str {
2745 "mock"
2746 }
2747 fn complete(&self, _prompt: &str) -> Result<String> {
2748 Ok("Summary".to_string())
2749 }
2750 fn analyze_for_capture(&self, _content: &str) -> Result<crate::llm::CaptureAnalysis> {
2751 Err(crate::Error::OperationFailed {
2752 operation: "analyze_for_capture".to_string(),
2753 cause: "Not implemented for mock".to_string(),
2754 })
2755 }
2756 }
2757
2758 let llm: Arc<dyn crate::llm::LlmProvider + Send + Sync> = Arc::new(MockLlm);
2759 let mut service = ConsolidationService::new(backend).with_llm(llm);
2760
2761 let embedding = vec![1.0, 0.0, 0.0];
2763
2764 let mut memory_decisions = create_test_memory("mem_decisions", "Decision");
2765 memory_decisions.embedding = Some(embedding.clone());
2766 memory_decisions.namespace = crate::models::Namespace::Decisions;
2767
2768 let mut memory_patterns = create_test_memory("mem_patterns", "Pattern");
2769 memory_patterns.embedding = Some(embedding.clone());
2770 memory_patterns.namespace = crate::models::Namespace::Patterns;
2771
2772 let _ = service.persistence.store(&memory_decisions);
2774 let _ = service.persistence.store(&memory_patterns);
2775
2776 struct MockEmbedder;
2778 impl EmbedderTrait for MockEmbedder {
2779 fn dimensions(&self) -> usize {
2780 3
2781 }
2782 fn embed(&self, _text: &str) -> Result<Vec<f32>> {
2783 Ok(vec![1.0, 0.0, 0.0])
2784 }
2785 }
2786
2787 struct MockVectorBackend;
2788 impl VectorBackend for MockVectorBackend {
2789 fn dimensions(&self) -> usize {
2790 3
2791 }
2792 fn upsert(&self, _id: &MemoryId, _embedding: &[f32]) -> Result<()> {
2793 Ok(())
2794 }
2795 fn remove(&self, _id: &MemoryId) -> Result<bool> {
2796 Ok(true)
2797 }
2798 fn search(
2799 &self,
2800 _query_embedding: &[f32],
2801 _filter: &crate::storage::traits::VectorFilter,
2802 _limit: usize,
2803 ) -> Result<Vec<(MemoryId, f32)>> {
2804 Ok(vec![])
2805 }
2806 fn count(&self) -> Result<usize> {
2807 Ok(0)
2808 }
2809 fn clear(&self) -> Result<()> {
2810 Ok(())
2811 }
2812 }
2813
2814 let recall = crate::services::RecallService::new()
2815 .with_embedder(Arc::new(MockEmbedder))
2816 .with_vector(Arc::new(MockVectorBackend));
2817
2818 let mut config = crate::config::ConsolidationConfig::new();
2820 config.enabled = true;
2821 config.namespace_filter = Some(vec![crate::models::Namespace::Decisions]);
2822 config.min_memories_to_consolidate = 1; let result = service.consolidate_memories(&recall, &config);
2825 assert!(result.is_ok());
2826
2827 let stats = result.unwrap();
2829 assert_eq!(stats.summaries_created, 0);
2831 }
2832
2833 #[test]
2834 fn test_consolidation_stats_with_summaries() {
2835 let stats = ConsolidationStats {
2836 processed: 10,
2837 archived: 2,
2838 merged: 1,
2839 contradictions: 0,
2840 summaries_created: 3,
2841 };
2842 assert!(!stats.is_empty());
2843 let summary = stats.summary();
2844 assert!(summary.contains("Processed: 10"));
2845 assert!(summary.contains("Summaries: 3"));
2846 }
2847
2848 #[test]
2849 fn test_create_related_edges() {
2850 use crate::storage::index::SqliteBackend;
2851 use crate::storage::traits::IndexBackend;
2852 use std::sync::Arc;
2853
2854 let temp_dir = tempfile::tempdir().ok();
2855 let path = temp_dir.as_ref().map_or_else(
2856 || std::path::PathBuf::from("/tmp/test_create_related_edges"),
2857 |d| d.path().to_path_buf(),
2858 );
2859 let backend = FilesystemBackend::new(&path);
2860
2861 let index = SqliteBackend::in_memory().expect("Failed to create in-memory SQLite");
2863 let index_arc = Arc::new(index);
2864
2865 let service = ConsolidationService::new(backend).with_index(index_arc.clone());
2866
2867 let memory_a = create_test_memory("related_a", "First memory");
2869 let memory_b = create_test_memory("related_b", "Second memory");
2870 let memory_c = create_test_memory("related_c", "Third memory");
2871
2872 index_arc
2874 .index(&memory_a)
2875 .expect("Failed to index memory_a");
2876 index_arc
2877 .index(&memory_b)
2878 .expect("Failed to index memory_b");
2879 index_arc
2880 .index(&memory_c)
2881 .expect("Failed to index memory_c");
2882
2883 let memories = vec![memory_a.clone(), memory_b.clone(), memory_c.clone()];
2884
2885 let result = service.create_related_edges(&memories, &index_arc);
2887 assert!(result.is_ok());
2888
2889 let edges_from_a = index_arc
2891 .query_edges(&memory_a.id, EdgeType::RelatedTo)
2892 .expect("Failed to query edges from memory_a");
2893
2894 assert_eq!(
2896 edges_from_a.len(),
2897 2,
2898 "memory_a should have 2 RelatedTo edges"
2899 );
2900 assert!(
2901 edges_from_a.contains(&memory_b.id),
2902 "memory_a should be related to memory_b"
2903 );
2904 assert!(
2905 edges_from_a.contains(&memory_c.id),
2906 "memory_a should be related to memory_c"
2907 );
2908
2909 let edges_from_b = index_arc
2911 .query_edges(&memory_b.id, EdgeType::RelatedTo)
2912 .expect("Failed to query edges from memory_b");
2913
2914 assert_eq!(
2915 edges_from_b.len(),
2916 2,
2917 "memory_b should have 2 RelatedTo edges"
2918 );
2919 assert!(
2920 edges_from_b.contains(&memory_a.id),
2921 "memory_b should be related to memory_a"
2922 );
2923 assert!(
2924 edges_from_b.contains(&memory_c.id),
2925 "memory_b should be related to memory_c"
2926 );
2927 }
2928
2929 #[test]
2930 fn test_create_related_edges_single_memory() {
2931 use crate::storage::index::SqliteBackend;
2932 use std::sync::Arc;
2933
2934 let temp_dir = tempfile::tempdir().ok();
2935 let path = temp_dir.as_ref().map_or_else(
2936 || std::path::PathBuf::from("/tmp/test_related_edges_single"),
2937 |d| d.path().to_path_buf(),
2938 );
2939 let backend = FilesystemBackend::new(&path);
2940
2941 let index = SqliteBackend::in_memory().expect("Failed to create in-memory SQLite");
2942 let index_arc = Arc::new(index);
2943
2944 let service = ConsolidationService::new(backend).with_index(index_arc.clone());
2945
2946 let memory_a = create_test_memory("single_mem", "Only memory");
2948 let memories = vec![memory_a];
2949
2950 let result = service.create_related_edges(&memories, &index_arc);
2952 assert!(result.is_ok());
2953 }
2954
2955 #[test]
2956 fn test_consolidate_memories_no_llm_creates_edges() {
2957 use crate::embedding::Embedder as EmbedderTrait;
2958 use crate::storage::index::SqliteBackend;
2959 use crate::storage::traits::{IndexBackend, VectorBackend};
2960
2961 let temp_dir = tempfile::tempdir().ok();
2962 let path = temp_dir.as_ref().map_or_else(
2963 || std::path::PathBuf::from("/tmp/test_consolidate_no_llm_edges"),
2964 |d| d.path().to_path_buf(),
2965 );
2966 let backend = FilesystemBackend::new(&path);
2967
2968 let index = SqliteBackend::in_memory().expect("Failed to create in-memory SQLite");
2970 let index_arc = Arc::new(index);
2971
2972 let mut service = ConsolidationService::new(backend).with_index(index_arc.clone());
2974
2975 let embedding_a = vec![1.0, 0.0, 0.0];
2977 let embedding_b = vec![0.95, 0.05, 0.0]; let mut memory_a = create_test_memory("no_llm_edge_a", "Decision about PostgreSQL");
2980 memory_a.embedding = Some(embedding_a);
2981 memory_a.namespace = crate::models::Namespace::Decisions;
2982
2983 let mut memory_b = create_test_memory("no_llm_edge_b", "Use PostgreSQL for storage");
2984 memory_b.embedding = Some(embedding_b);
2985 memory_b.namespace = crate::models::Namespace::Decisions;
2986
2987 index_arc
2989 .index(&memory_a)
2990 .expect("Failed to index memory_a");
2991 index_arc
2992 .index(&memory_b)
2993 .expect("Failed to index memory_b");
2994
2995 let _ = service.persistence.store(&memory_a);
2997 let _ = service.persistence.store(&memory_b);
2998
2999 struct MockEmbedder;
3001 impl EmbedderTrait for MockEmbedder {
3002 fn dimensions(&self) -> usize {
3003 3
3004 }
3005 fn embed(&self, _text: &str) -> Result<Vec<f32>> {
3006 Ok(vec![1.0, 0.0, 0.0])
3007 }
3008 }
3009
3010 struct MockVectorBackend;
3011 impl VectorBackend for MockVectorBackend {
3012 fn dimensions(&self) -> usize {
3013 3
3014 }
3015 fn upsert(&self, _id: &MemoryId, _embedding: &[f32]) -> Result<()> {
3016 Ok(())
3017 }
3018 fn remove(&self, _id: &MemoryId) -> Result<bool> {
3019 Ok(true)
3020 }
3021 fn search(
3022 &self,
3023 _query_embedding: &[f32],
3024 _filter: &crate::storage::traits::VectorFilter,
3025 _limit: usize,
3026 ) -> Result<Vec<(MemoryId, f32)>> {
3027 Ok(vec![])
3028 }
3029 fn count(&self) -> Result<usize> {
3030 Ok(0)
3031 }
3032 fn clear(&self) -> Result<()> {
3033 Ok(())
3034 }
3035 }
3036
3037 let recall = crate::services::RecallService::new()
3038 .with_embedder(Arc::new(MockEmbedder))
3039 .with_vector(Arc::new(MockVectorBackend));
3040
3041 let mut config = crate::config::ConsolidationConfig::new();
3043 config.enabled = true;
3044 config.similarity_threshold = 0.7;
3045 config.min_memories_to_consolidate = 2;
3046
3047 let result = service.consolidate_memories(&recall, &config);
3049 assert!(result.is_ok());
3050
3051 let stats = result.unwrap();
3052 assert_eq!(stats.processed, 2);
3054 assert_eq!(stats.summaries_created, 0);
3055
3056 let edges_from_a = index_arc
3058 .query_edges(&memory_a.id, EdgeType::RelatedTo)
3059 .expect("Failed to query edges from memory_a");
3060
3061 assert!(
3062 !edges_from_a.is_empty(),
3063 "memory_a should have RelatedTo edges even without LLM"
3064 );
3065 assert!(
3066 edges_from_a.contains(&memory_b.id),
3067 "memory_a should be related to memory_b"
3068 );
3069 }
3070
3071 #[test]
3072 fn test_consolidate_memories_no_llm_no_index() {
3073 use crate::embedding::Embedder as EmbedderTrait;
3074 use crate::storage::traits::VectorBackend;
3075
3076 let temp_dir = tempfile::tempdir().ok();
3077 let path = temp_dir.as_ref().map_or_else(
3078 || std::path::PathBuf::from("/tmp/test_consolidate_no_llm_no_index"),
3079 |d| d.path().to_path_buf(),
3080 );
3081 let backend = FilesystemBackend::new(&path);
3082
3083 let mut service = ConsolidationService::new(backend);
3085
3086 let embedding_a = vec![1.0, 0.0, 0.0];
3088 let embedding_b = vec![0.95, 0.05, 0.0];
3089
3090 let mut memory_a = create_test_memory("no_backends_a", "Decision A");
3091 memory_a.embedding = Some(embedding_a);
3092 memory_a.namespace = crate::models::Namespace::Decisions;
3093
3094 let mut memory_b = create_test_memory("no_backends_b", "Decision B");
3095 memory_b.embedding = Some(embedding_b);
3096 memory_b.namespace = crate::models::Namespace::Decisions;
3097
3098 let _ = service.persistence.store(&memory_a);
3100 let _ = service.persistence.store(&memory_b);
3101
3102 struct MockEmbedder;
3104 impl EmbedderTrait for MockEmbedder {
3105 fn dimensions(&self) -> usize {
3106 3
3107 }
3108 fn embed(&self, _text: &str) -> Result<Vec<f32>> {
3109 Ok(vec![1.0, 0.0, 0.0])
3110 }
3111 }
3112
3113 struct MockVectorBackend;
3114 impl VectorBackend for MockVectorBackend {
3115 fn dimensions(&self) -> usize {
3116 3
3117 }
3118 fn upsert(&self, _id: &MemoryId, _embedding: &[f32]) -> Result<()> {
3119 Ok(())
3120 }
3121 fn remove(&self, _id: &MemoryId) -> Result<bool> {
3122 Ok(true)
3123 }
3124 fn search(
3125 &self,
3126 _query_embedding: &[f32],
3127 _filter: &crate::storage::traits::VectorFilter,
3128 _limit: usize,
3129 ) -> Result<Vec<(MemoryId, f32)>> {
3130 Ok(vec![])
3131 }
3132 fn count(&self) -> Result<usize> {
3133 Ok(0)
3134 }
3135 fn clear(&self) -> Result<()> {
3136 Ok(())
3137 }
3138 }
3139
3140 let recall = crate::services::RecallService::new()
3141 .with_embedder(Arc::new(MockEmbedder))
3142 .with_vector(Arc::new(MockVectorBackend));
3143
3144 let mut config = crate::config::ConsolidationConfig::new();
3145 config.enabled = true;
3146 config.similarity_threshold = 0.7;
3147 config.min_memories_to_consolidate = 2;
3148
3149 let result = service.consolidate_memories(&recall, &config);
3151 assert!(result.is_ok());
3152
3153 let stats = result.unwrap();
3154 assert_eq!(stats.processed, 2);
3156 assert_eq!(stats.summaries_created, 0);
3157 }
3158
3159 #[test]
3160 fn test_cluster_by_similarity_empty_list() {
3161 let temp_dir = tempfile::tempdir().ok();
3162 let path = temp_dir.as_ref().map_or_else(
3163 || std::path::PathBuf::from("/tmp/test_cluster_empty_list"),
3164 |d| d.path().to_path_buf(),
3165 );
3166 let backend = FilesystemBackend::new(&path);
3167 let service = ConsolidationService::new(backend);
3168
3169 let result = service.cluster_by_similarity(&[], 0.7);
3171 assert!(result.is_ok());
3172
3173 let groups = result.unwrap();
3174 assert!(groups.is_empty());
3175 }
3176
3177 #[test]
3178 fn test_create_summary_node_no_tags() {
3179 let temp_dir = tempfile::tempdir().ok();
3180 let path = temp_dir.as_ref().map_or_else(
3181 || std::path::PathBuf::from("/tmp/test_create_summary_no_tags"),
3182 |d| d.path().to_path_buf(),
3183 );
3184 let backend = FilesystemBackend::new(&path);
3185 let mut service = ConsolidationService::new(backend);
3186
3187 let mut memory_a = create_test_memory("mem_a", "Content A");
3189 memory_a.tags = vec![];
3190
3191 let mut memory_b = create_test_memory("mem_b", "Content B");
3192 memory_b.tags = vec![];
3193
3194 let source_memories = vec![memory_a, memory_b];
3195 let summary_content = "Summary of untagged memories";
3196
3197 let result = service.create_summary_node(summary_content, &source_memories);
3198 assert!(result.is_ok());
3199
3200 let summary_node = result.unwrap();
3201 assert!(summary_node.tags.is_empty());
3203 }
3204
3205 #[test]
3206 fn test_edge_storage_idempotency() {
3207 use crate::storage::index::SqliteBackend;
3208 use crate::storage::traits::IndexBackend;
3209 use std::sync::Arc;
3210
3211 let temp_dir = tempfile::tempdir().ok();
3212 let path = temp_dir.as_ref().map_or_else(
3213 || std::path::PathBuf::from("/tmp/test_edge_idempotency"),
3214 |d| d.path().to_path_buf(),
3215 );
3216 let backend = FilesystemBackend::new(&path);
3217
3218 let index = SqliteBackend::in_memory().expect("Failed to create in-memory SQLite");
3220 let index_arc = Arc::new(index);
3221
3222 let mut service = ConsolidationService::new(backend).with_index(index_arc.clone());
3223
3224 let memory_a = create_test_memory("idempotent_a", "First memory");
3226 let memory_b = create_test_memory("idempotent_b", "Second memory");
3227
3228 index_arc
3230 .index(&memory_a)
3231 .expect("Failed to index memory_a");
3232 index_arc
3233 .index(&memory_b)
3234 .expect("Failed to index memory_b");
3235
3236 let source_memories = vec![memory_a.clone(), memory_b.clone()];
3237 let summary_content = "Summary for idempotency test";
3238
3239 let result1 = service.create_summary_node(summary_content, &source_memories);
3241 assert!(result1.is_ok());
3242 let summary1 = result1.unwrap();
3243
3244 index_arc.index(&summary1).expect("Failed to index summary");
3246
3247 let result2 = service.create_summary_node(summary_content, &source_memories);
3249 assert!(result2.is_ok());
3250 let summary2 = result2.unwrap();
3251
3252 assert!(summary1.is_summary);
3254 assert!(summary2.is_summary);
3255
3256 let edges_from_a = index_arc
3258 .query_edges(&memory_a.id, EdgeType::SummarizedBy)
3259 .expect("Failed to query edges");
3260
3261 assert!(!edges_from_a.is_empty());
3263 }
3264
3265 #[test]
3266 fn test_find_related_memories_with_time_window() {
3267 use crate::embedding::Embedder as EmbedderTrait;
3268 use crate::storage::traits::VectorBackend;
3269 use std::sync::Arc;
3270
3271 let temp_dir = tempfile::tempdir().ok();
3272 let path = temp_dir.as_ref().map_or_else(
3273 || std::path::PathBuf::from("/tmp/test_find_time_window"),
3274 |d| d.path().to_path_buf(),
3275 );
3276 let backend = FilesystemBackend::new(&path);
3277 let service = ConsolidationService::new(backend);
3278
3279 struct MockEmbedder;
3281 impl EmbedderTrait for MockEmbedder {
3282 fn dimensions(&self) -> usize {
3283 3
3284 }
3285 fn embed(&self, _text: &str) -> Result<Vec<f32>> {
3286 Ok(vec![1.0, 0.0, 0.0])
3287 }
3288 }
3289
3290 struct MockVectorBackend;
3291 impl VectorBackend for MockVectorBackend {
3292 fn dimensions(&self) -> usize {
3293 3
3294 }
3295 fn upsert(&self, _id: &MemoryId, _embedding: &[f32]) -> Result<()> {
3296 Ok(())
3297 }
3298 fn remove(&self, _id: &MemoryId) -> Result<bool> {
3299 Ok(true)
3300 }
3301 fn search(
3302 &self,
3303 _query_embedding: &[f32],
3304 _filter: &crate::storage::traits::VectorFilter,
3305 _limit: usize,
3306 ) -> Result<Vec<(MemoryId, f32)>> {
3307 Ok(vec![])
3308 }
3309 fn count(&self) -> Result<usize> {
3310 Ok(0)
3311 }
3312 fn clear(&self) -> Result<()> {
3313 Ok(())
3314 }
3315 }
3316
3317 let recall = crate::services::RecallService::new()
3318 .with_embedder(Arc::new(MockEmbedder))
3319 .with_vector(Arc::new(MockVectorBackend));
3320
3321 let mut config = crate::config::ConsolidationConfig::new();
3323 config.time_window_days = Some(7); let result = service.find_related_memories(&recall, &config);
3326 assert!(result.is_ok());
3327
3328 let groups = result.unwrap();
3330 assert!(groups.is_empty());
3331 }
3332
3333 #[test]
3334 fn test_consolidate_memories_multiple_namespaces() {
3335 use crate::embedding::Embedder as EmbedderTrait;
3336 use crate::storage::traits::VectorBackend;
3337 use std::sync::Arc;
3338
3339 struct MockLlm;
3341 impl crate::llm::LlmProvider for MockLlm {
3342 fn name(&self) -> &'static str {
3343 "mock"
3344 }
3345 fn complete(&self, _prompt: &str) -> Result<String> {
3346 Ok("Summary of memories".to_string())
3347 }
3348 fn analyze_for_capture(&self, _content: &str) -> Result<crate::llm::CaptureAnalysis> {
3349 Err(crate::Error::OperationFailed {
3350 operation: "analyze_for_capture".to_string(),
3351 cause: "Not implemented for mock".to_string(),
3352 })
3353 }
3354 }
3355
3356 struct MockEmbedder;
3357 impl EmbedderTrait for MockEmbedder {
3358 fn dimensions(&self) -> usize {
3359 3
3360 }
3361 fn embed(&self, _text: &str) -> Result<Vec<f32>> {
3362 Ok(vec![1.0, 0.0, 0.0])
3363 }
3364 }
3365
3366 struct MockVectorBackend;
3367 impl VectorBackend for MockVectorBackend {
3368 fn dimensions(&self) -> usize {
3369 3
3370 }
3371 fn upsert(&self, _id: &MemoryId, _embedding: &[f32]) -> Result<()> {
3372 Ok(())
3373 }
3374 fn remove(&self, _id: &MemoryId) -> Result<bool> {
3375 Ok(true)
3376 }
3377 fn search(
3378 &self,
3379 _query_embedding: &[f32],
3380 _filter: &crate::storage::traits::VectorFilter,
3381 _limit: usize,
3382 ) -> Result<Vec<(MemoryId, f32)>> {
3383 Ok(vec![])
3384 }
3385 fn count(&self) -> Result<usize> {
3386 Ok(0)
3387 }
3388 fn clear(&self) -> Result<()> {
3389 Ok(())
3390 }
3391 }
3392
3393 let temp_dir = tempfile::tempdir().ok();
3394 let path = temp_dir.as_ref().map_or_else(
3395 || std::path::PathBuf::from("/tmp/test_consolidate_multi_namespace"),
3396 |d| d.path().to_path_buf(),
3397 );
3398 let backend = FilesystemBackend::new(&path);
3399
3400 let llm: Arc<dyn crate::llm::LlmProvider + Send + Sync> = Arc::new(MockLlm);
3401 let mut service = ConsolidationService::new(backend).with_llm(llm);
3402
3403 let embedding = vec![1.0, 0.0, 0.0];
3405
3406 let mut mem_decisions_1 = create_test_memory("dec_1", "Decision 1");
3407 mem_decisions_1.embedding = Some(embedding.clone());
3408 mem_decisions_1.namespace = crate::models::Namespace::Decisions;
3409
3410 let mut mem_decisions_2 = create_test_memory("dec_2", "Decision 2");
3411 mem_decisions_2.embedding = Some(embedding.clone());
3412 mem_decisions_2.namespace = crate::models::Namespace::Decisions;
3413
3414 let mut mem_patterns_1 = create_test_memory("pat_1", "Pattern 1");
3415 mem_patterns_1.embedding = Some(embedding.clone());
3416 mem_patterns_1.namespace = crate::models::Namespace::Patterns;
3417
3418 let mut mem_patterns_2 = create_test_memory("pat_2", "Pattern 2");
3419 mem_patterns_2.embedding = Some(embedding.clone());
3420 mem_patterns_2.namespace = crate::models::Namespace::Patterns;
3421
3422 let _ = service.persistence.store(&mem_decisions_1);
3424 let _ = service.persistence.store(&mem_decisions_2);
3425 let _ = service.persistence.store(&mem_patterns_1);
3426 let _ = service.persistence.store(&mem_patterns_2);
3427
3428 let recall = crate::services::RecallService::new()
3429 .with_embedder(Arc::new(MockEmbedder))
3430 .with_vector(Arc::new(MockVectorBackend));
3431
3432 let mut config = crate::config::ConsolidationConfig::new();
3434 config.enabled = true;
3435 config.namespace_filter = Some(vec![
3436 crate::models::Namespace::Decisions,
3437 crate::models::Namespace::Patterns,
3438 ]);
3439 config.similarity_threshold = 0.9;
3440 config.min_memories_to_consolidate = 2;
3441
3442 let result = service.consolidate_memories(&recall, &config);
3443 assert!(result.is_ok());
3444
3445 let stats = result.unwrap();
3447 assert_eq!(stats.processed, 4);
3448 }
3449
3450 #[test]
3451 #[allow(clippy::excessive_nesting)]
3452 fn test_summarize_group_preserves_memory_details() {
3453 use std::sync::Arc;
3454
3455 struct DetailCheckingMockLlm;
3457 impl crate::llm::LlmProvider for DetailCheckingMockLlm {
3458 fn name(&self) -> &'static str {
3459 "detail_checking_mock"
3460 }
3461 fn complete(&self, prompt: &str) -> Result<String> {
3462 let has_all_details = prompt.contains("mem_detail_1")
3464 && prompt.contains("mem_detail_2")
3465 && prompt.contains("Decisions")
3466 && prompt.contains("Important detail 1")
3467 && prompt.contains("Important detail 2");
3468
3469 if !has_all_details {
3470 return Err(crate::Error::OperationFailed {
3471 operation: "llm_complete".to_string(),
3472 cause: "Details not found in prompt".to_string(),
3473 });
3474 }
3475
3476 Ok("Summary preserving all key technical details from both memories".to_string())
3477 }
3478 fn analyze_for_capture(&self, _content: &str) -> Result<crate::llm::CaptureAnalysis> {
3479 Err(crate::Error::OperationFailed {
3480 operation: "analyze_for_capture".to_string(),
3481 cause: "Not implemented for mock".to_string(),
3482 })
3483 }
3484 }
3485
3486 let temp_dir = tempfile::tempdir().ok();
3487 let path = temp_dir.as_ref().map_or_else(
3488 || std::path::PathBuf::from("/tmp/test_summarize_details"),
3489 |d| d.path().to_path_buf(),
3490 );
3491 let backend = FilesystemBackend::new(&path);
3492
3493 let llm: Arc<dyn crate::llm::LlmProvider + Send + Sync> = Arc::new(DetailCheckingMockLlm);
3494 let service = ConsolidationService::new(backend).with_llm(llm);
3495
3496 let mut memory_a = create_test_memory("mem_detail_1", "Important detail 1");
3498 memory_a.namespace = crate::models::Namespace::Decisions;
3499
3500 let mut memory_b = create_test_memory("mem_detail_2", "Important detail 2");
3501 memory_b.namespace = crate::models::Namespace::Decisions;
3502
3503 let memories = vec![memory_a, memory_b];
3504
3505 let result = service.summarize_group(&memories);
3507 assert!(result.is_ok());
3508
3509 let summary = result.unwrap();
3510 assert!(summary.contains("preserving all key technical details"));
3511 }
3512}