1#![allow(clippy::cast_possible_truncation)]
9#![allow(clippy::cast_sign_loss)]
10#![allow(clippy::missing_const_for_fn)]
12#![allow(clippy::redundant_closure_for_method_calls)]
14#![allow(clippy::cast_lossless)]
16#![allow(clippy::cast_possible_wrap)]
18
19use crate::models::graph::{
20 Entity, EntityId, EntityMention, EntityQuery, EntityType, Relationship, RelationshipQuery,
21 RelationshipType, TraversalResult,
22};
23use crate::models::temporal::{BitemporalPoint, TransactionTime, ValidTimeRange};
24use crate::models::{Domain, MemoryId};
25use crate::storage::traits::graph::{GraphBackend, GraphStats};
26use crate::{Error, Result};
27use rusqlite::{Connection, OptionalExtension, Row, params};
28use std::collections::HashMap;
29use std::path::{Path, PathBuf};
30use std::sync::{Mutex, MutexGuard};
31use tracing::instrument;
32
33fn acquire_lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
35 match mutex.lock() {
36 Ok(guard) => guard,
37 Err(poisoned) => {
38 tracing::warn!("Graph SQLite mutex was poisoned, recovering");
39 metrics::counter!("graph_sqlite_mutex_poison_recovery_total").increment(1);
40 poisoned.into_inner()
41 },
42 }
43}
44
45pub struct SqliteGraphBackend {
59 conn: Mutex<Connection>,
61 db_path: Option<PathBuf>,
63}
64
65impl SqliteGraphBackend {
66 pub fn new(db_path: impl Into<PathBuf>) -> Result<Self> {
72 let db_path = db_path.into();
73 let conn = Connection::open(&db_path).map_err(|e| Error::OperationFailed {
74 operation: "open_graph_sqlite".to_string(),
75 cause: e.to_string(),
76 })?;
77
78 let backend = Self {
79 conn: Mutex::new(conn),
80 db_path: Some(db_path),
81 };
82
83 backend.initialize()?;
84 Ok(backend)
85 }
86
87 pub fn in_memory() -> Result<Self> {
93 let conn = Connection::open_in_memory().map_err(|e| Error::OperationFailed {
94 operation: "open_graph_sqlite_memory".to_string(),
95 cause: e.to_string(),
96 })?;
97
98 let backend = Self {
99 conn: Mutex::new(conn),
100 db_path: None,
101 };
102
103 backend.initialize()?;
104 Ok(backend)
105 }
106
107 #[must_use]
109 pub fn db_path(&self) -> Option<&Path> {
110 self.db_path.as_deref()
111 }
112
113 fn initialize(&self) -> Result<()> {
115 let conn = acquire_lock(&self.conn);
116
117 let _ = conn.pragma_update(None, "journal_mode", "WAL");
119 let _ = conn.pragma_update(None, "synchronous", "NORMAL");
120 let _ = conn.pragma_update(None, "busy_timeout", "5000");
121 let _ = conn.pragma_update(None, "foreign_keys", "ON");
123
124 conn.execute(
126 "CREATE TABLE IF NOT EXISTS graph_entities (
127 id TEXT PRIMARY KEY,
128 entity_type TEXT NOT NULL,
129 name TEXT NOT NULL,
130 aliases TEXT,
131 domain_org TEXT,
132 domain_project TEXT,
133 domain_repo TEXT,
134 confidence REAL NOT NULL DEFAULT 1.0,
135 valid_time_start INTEGER,
136 valid_time_end INTEGER,
137 transaction_time INTEGER NOT NULL,
138 properties TEXT,
139 mention_count INTEGER NOT NULL DEFAULT 0
140 )",
141 [],
142 )
143 .map_err(|e| Error::OperationFailed {
144 operation: "create_graph_entities_table".to_string(),
145 cause: e.to_string(),
146 })?;
147
148 conn.execute(
150 "CREATE TABLE IF NOT EXISTS graph_relationships (
151 from_entity_id TEXT NOT NULL,
152 to_entity_id TEXT NOT NULL,
153 relationship_type TEXT NOT NULL,
154 confidence REAL NOT NULL DEFAULT 1.0,
155 valid_time_start INTEGER,
156 valid_time_end INTEGER,
157 transaction_time INTEGER NOT NULL,
158 properties TEXT,
159 PRIMARY KEY (from_entity_id, to_entity_id, relationship_type),
160 FOREIGN KEY (from_entity_id) REFERENCES graph_entities(id) ON DELETE CASCADE,
161 FOREIGN KEY (to_entity_id) REFERENCES graph_entities(id) ON DELETE CASCADE
162 )",
163 [],
164 )
165 .map_err(|e| Error::OperationFailed {
166 operation: "create_graph_relationships_table".to_string(),
167 cause: e.to_string(),
168 })?;
169
170 conn.execute(
172 "CREATE TABLE IF NOT EXISTS graph_entity_mentions (
173 entity_id TEXT NOT NULL,
174 memory_id TEXT NOT NULL,
175 confidence REAL NOT NULL DEFAULT 1.0,
176 start_offset INTEGER,
177 end_offset INTEGER,
178 matched_text TEXT,
179 transaction_time INTEGER NOT NULL,
180 PRIMARY KEY (entity_id, memory_id),
181 FOREIGN KEY (entity_id) REFERENCES graph_entities(id) ON DELETE CASCADE
182 )",
183 [],
184 )
185 .map_err(|e| Error::OperationFailed {
186 operation: "create_graph_entity_mentions_table".to_string(),
187 cause: e.to_string(),
188 })?;
189
190 Self::create_indexes(&conn);
192
193 Ok(())
194 }
195
196 fn create_indexes(conn: &Connection) {
198 let _ = conn.execute(
200 "CREATE INDEX IF NOT EXISTS idx_graph_entities_type ON graph_entities(entity_type)",
201 [],
202 );
203 let _ = conn.execute(
204 "CREATE INDEX IF NOT EXISTS idx_graph_entities_name ON graph_entities(name)",
205 [],
206 );
207 let _ = conn.execute(
208 "CREATE INDEX IF NOT EXISTS idx_graph_entities_domain ON graph_entities(domain_org, domain_project, domain_repo)",
209 [],
210 );
211 let _ = conn.execute(
212 "CREATE INDEX IF NOT EXISTS idx_graph_entities_confidence ON graph_entities(confidence DESC)",
213 [],
214 );
215 let _ = conn.execute(
216 "CREATE INDEX IF NOT EXISTS idx_graph_entities_mention_count ON graph_entities(mention_count DESC)",
217 [],
218 );
219
220 let _ = conn.execute(
222 "CREATE INDEX IF NOT EXISTS idx_graph_relationships_from ON graph_relationships(from_entity_id)",
223 [],
224 );
225 let _ = conn.execute(
226 "CREATE INDEX IF NOT EXISTS idx_graph_relationships_to ON graph_relationships(to_entity_id)",
227 [],
228 );
229 let _ = conn.execute(
230 "CREATE INDEX IF NOT EXISTS idx_graph_relationships_type ON graph_relationships(relationship_type)",
231 [],
232 );
233
234 let _ = conn.execute(
236 "CREATE INDEX IF NOT EXISTS idx_graph_entity_mentions_entity ON graph_entity_mentions(entity_id)",
237 [],
238 );
239 let _ = conn.execute(
240 "CREATE INDEX IF NOT EXISTS idx_graph_entity_mentions_memory ON graph_entity_mentions(memory_id)",
241 [],
242 );
243 }
244
245 fn parse_entity_row(row: &Row<'_>) -> rusqlite::Result<Entity> {
247 let id: String = row.get("id")?;
248 let entity_type_str: String = row.get("entity_type")?;
249 let name: String = row.get("name")?;
250 let aliases_json: Option<String> = row.get("aliases")?;
251 let domain_org: Option<String> = row.get("domain_org")?;
252 let domain_project: Option<String> = row.get("domain_project")?;
253 let domain_repo: Option<String> = row.get("domain_repo")?;
254 let confidence: f32 = row.get("confidence")?;
255 let valid_time_start: Option<i64> = row.get("valid_time_start")?;
256 let valid_time_end: Option<i64> = row.get("valid_time_end")?;
257 let transaction_time: i64 = row.get("transaction_time")?;
258 let properties_json: Option<String> = row.get("properties")?;
259 let mention_count: i64 = row.get("mention_count")?;
260
261 let entity_type = EntityType::parse(&entity_type_str).unwrap_or(EntityType::Concept);
262 let aliases: Vec<String> = aliases_json
263 .and_then(|s| serde_json::from_str(&s).ok())
264 .unwrap_or_default();
265 let properties: HashMap<String, String> = properties_json
266 .and_then(|s| serde_json::from_str(&s).ok())
267 .unwrap_or_default();
268
269 let domain = Domain {
270 organization: domain_org,
271 project: domain_project,
272 repository: domain_repo,
273 };
274
275 let valid_time = ValidTimeRange {
276 start: valid_time_start,
277 end: valid_time_end,
278 };
279
280 Ok(Entity {
281 id: EntityId::new(id),
282 entity_type,
283 name,
284 aliases,
285 domain,
286 confidence,
287 valid_time,
288 transaction_time: TransactionTime::at(transaction_time),
289 properties,
290 mention_count: mention_count as u32,
291 })
292 }
293
294 fn parse_relationship_row(row: &Row<'_>) -> rusqlite::Result<Relationship> {
296 let from_entity_id: String = row.get("from_entity_id")?;
297 let to_entity_id: String = row.get("to_entity_id")?;
298 let relationship_type_str: String = row.get("relationship_type")?;
299 let confidence: f32 = row.get("confidence")?;
300 let valid_time_start: Option<i64> = row.get("valid_time_start")?;
301 let valid_time_end: Option<i64> = row.get("valid_time_end")?;
302 let _transaction_time: i64 = row.get("transaction_time")?;
303 let _properties_json: Option<String> = row.get("properties")?;
304
305 let relationship_type =
306 RelationshipType::parse(&relationship_type_str).unwrap_or(RelationshipType::RelatesTo);
307
308 let valid_time = ValidTimeRange {
309 start: valid_time_start,
310 end: valid_time_end,
311 };
312
313 Ok(Relationship::new(
314 EntityId::new(from_entity_id),
315 EntityId::new(to_entity_id),
316 relationship_type,
317 )
318 .with_confidence(confidence)
319 .with_valid_time(valid_time))
320 }
321
322 fn build_entity_where_clause(query: &EntityQuery) -> (String, Vec<Box<dyn rusqlite::ToSql>>) {
324 let mut conditions = Vec::new();
325 let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
326
327 if let Some(ref entity_type) = query.entity_type {
328 conditions.push("entity_type = ?".to_string());
329 params.push(Box::new(entity_type.as_str().to_string()));
330 }
331
332 if let Some(ref name) = query.name {
333 conditions.push("name LIKE ?".to_string());
334 params.push(Box::new(format!("%{name}%")));
335 }
336
337 if let Some(ref domain) = query.domain {
338 if let Some(ref org) = domain.organization {
339 conditions.push("domain_org = ?".to_string());
340 params.push(Box::new(org.clone()));
341 }
342 if let Some(ref project) = domain.project {
343 conditions.push("domain_project = ?".to_string());
344 params.push(Box::new(project.clone()));
345 }
346 if let Some(ref repo) = domain.repository {
347 conditions.push("domain_repo = ?".to_string());
348 params.push(Box::new(repo.clone()));
349 }
350 }
351
352 if let Some(min_confidence) = query.min_confidence {
353 conditions.push("confidence >= ?".to_string());
354 params.push(Box::new(f64::from(min_confidence)));
355 }
356
357 let where_clause = if conditions.is_empty() {
358 String::new()
359 } else {
360 format!("WHERE {}", conditions.join(" AND "))
361 };
362
363 (where_clause, params)
364 }
365}
366
367impl GraphBackend for SqliteGraphBackend {
368 #[instrument(skip(self, entity), fields(entity_id = %entity.id))]
373 fn store_entity(&self, entity: &Entity) -> Result<()> {
374 let conn = acquire_lock(&self.conn);
375
376 let aliases_json =
377 serde_json::to_string(&entity.aliases).unwrap_or_else(|_| "[]".to_string());
378 let properties_json =
379 serde_json::to_string(&entity.properties).unwrap_or_else(|_| "{}".to_string());
380
381 conn.execute(
382 "INSERT INTO graph_entities (
383 id, entity_type, name, aliases, domain_org, domain_project, domain_repo,
384 confidence, valid_time_start, valid_time_end, transaction_time, properties, mention_count
385 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
386 ON CONFLICT(id) DO UPDATE SET
387 entity_type = excluded.entity_type,
388 name = excluded.name,
389 aliases = excluded.aliases,
390 domain_org = excluded.domain_org,
391 domain_project = excluded.domain_project,
392 domain_repo = excluded.domain_repo,
393 confidence = excluded.confidence,
394 valid_time_start = excluded.valid_time_start,
395 valid_time_end = excluded.valid_time_end,
396 properties = excluded.properties,
397 mention_count = excluded.mention_count",
398 params![
399 entity.id.as_str(),
400 entity.entity_type.as_str(),
401 entity.name,
402 aliases_json,
403 entity.domain.organization,
404 entity.domain.project,
405 entity.domain.repository,
406 f64::from(entity.confidence),
407 entity.valid_time.start,
408 entity.valid_time.end,
409 entity.transaction_time.timestamp(),
410 properties_json,
411 entity.mention_count,
412 ],
413 )
414 .map_err(|e| Error::OperationFailed {
415 operation: "store_entity".to_string(),
416 cause: e.to_string(),
417 })?;
418
419 metrics::counter!("graph_entities_stored_total").increment(1);
420 Ok(())
421 }
422
423 #[instrument(skip(self), fields(entity_id = %id))]
424 fn get_entity(&self, id: &EntityId) -> Result<Option<Entity>> {
425 let conn = acquire_lock(&self.conn);
426
427 let result = conn
428 .query_row(
429 "SELECT * FROM graph_entities WHERE id = ?1",
430 params![id.as_str()],
431 Self::parse_entity_row,
432 )
433 .optional()
434 .map_err(|e| Error::OperationFailed {
435 operation: "get_entity".to_string(),
436 cause: e.to_string(),
437 })?;
438
439 Ok(result)
440 }
441
442 #[instrument(skip(self, query))]
443 fn query_entities(&self, query: &EntityQuery) -> Result<Vec<Entity>> {
444 let conn = acquire_lock(&self.conn);
445
446 let (where_clause, params) = Self::build_entity_where_clause(query);
447 let limit = query.limit.unwrap_or(100);
448
449 let sql = format!(
450 "SELECT * FROM graph_entities {where_clause} ORDER BY mention_count DESC, confidence DESC LIMIT {limit}"
451 );
452
453 let mut stmt = conn.prepare(&sql).map_err(|e| Error::OperationFailed {
454 operation: "query_entities_prepare".to_string(),
455 cause: e.to_string(),
456 })?;
457
458 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
459
460 let entities = stmt
461 .query_map(param_refs.as_slice(), Self::parse_entity_row)
462 .map_err(|e| Error::OperationFailed {
463 operation: "query_entities".to_string(),
464 cause: e.to_string(),
465 })?
466 .filter_map(|r| r.ok())
467 .collect();
468
469 Ok(entities)
470 }
471
472 #[instrument(skip(self), fields(entity_id = %id))]
473 fn delete_entity(&self, id: &EntityId) -> Result<bool> {
474 let conn = acquire_lock(&self.conn);
475
476 let rows = conn
478 .execute(
479 "DELETE FROM graph_entities WHERE id = ?1",
480 params![id.as_str()],
481 )
482 .map_err(|e| Error::OperationFailed {
483 operation: "delete_entity".to_string(),
484 cause: e.to_string(),
485 })?;
486
487 if rows > 0 {
488 metrics::counter!("graph_entities_deleted_total").increment(1);
489 }
490
491 Ok(rows > 0)
492 }
493
494 #[instrument(skip(self, entity_ids))]
495 fn merge_entities(&self, entity_ids: &[EntityId], canonical_name: &str) -> Result<Entity> {
496 if entity_ids.is_empty() {
497 return Err(Error::OperationFailed {
498 operation: "merge_entities".to_string(),
499 cause: "No entity IDs provided".to_string(),
500 });
501 }
502
503 let conn = acquire_lock(&self.conn);
504
505 let canonical_id = &entity_ids[0];
507 let canonical_entity: Entity = conn
508 .query_row(
509 "SELECT * FROM graph_entities WHERE id = ?1",
510 params![canonical_id.as_str()],
511 Self::parse_entity_row,
512 )
513 .map_err(|e| Error::OperationFailed {
514 operation: "merge_entities_get_canonical".to_string(),
515 cause: e.to_string(),
516 })?;
517
518 let mut all_aliases = canonical_entity.aliases.clone();
520 all_aliases.push(canonical_entity.name.clone());
521
522 for other_id in entity_ids.iter().skip(1) {
523 if let Ok(other) = conn.query_row(
524 "SELECT * FROM graph_entities WHERE id = ?1",
525 params![other_id.as_str()],
526 Self::parse_entity_row,
527 ) {
528 all_aliases.push(other.name);
529 all_aliases.extend(other.aliases);
530
531 conn.execute(
533 "UPDATE graph_relationships SET from_entity_id = ?1 WHERE from_entity_id = ?2",
534 params![canonical_id.as_str(), other_id.as_str()],
535 )
536 .ok();
537 conn.execute(
538 "UPDATE graph_relationships SET to_entity_id = ?1 WHERE to_entity_id = ?2",
539 params![canonical_id.as_str(), other_id.as_str()],
540 )
541 .ok();
542
543 conn.execute(
545 "UPDATE OR IGNORE graph_entity_mentions SET entity_id = ?1 WHERE entity_id = ?2",
546 params![canonical_id.as_str(), other_id.as_str()],
547 )
548 .ok();
549
550 conn.execute(
552 "DELETE FROM graph_entities WHERE id = ?1",
553 params![other_id.as_str()],
554 )
555 .ok();
556 }
557 }
558
559 all_aliases.sort();
561 all_aliases.dedup();
562 all_aliases.retain(|a| a != canonical_name);
564
565 let aliases_json = serde_json::to_string(&all_aliases).unwrap_or_else(|_| "[]".to_string());
567
568 conn.execute(
569 "UPDATE graph_entities SET name = ?1, aliases = ?2 WHERE id = ?3",
570 params![canonical_name, aliases_json, canonical_id.as_str()],
571 )
572 .map_err(|e| Error::OperationFailed {
573 operation: "merge_entities_update".to_string(),
574 cause: e.to_string(),
575 })?;
576
577 let merged = Entity::new(
579 canonical_entity.entity_type,
580 canonical_name,
581 canonical_entity.domain.clone(),
582 )
583 .with_id(canonical_entity.id)
584 .with_confidence(canonical_entity.confidence)
585 .with_aliases(all_aliases);
586
587 metrics::counter!("graph_entities_merged_total").increment(1);
588 Ok(merged)
589 }
590
591 #[instrument(skip(self))]
592 fn find_entities_by_name(
593 &self,
594 name: &str,
595 entity_type: Option<EntityType>,
596 domain: Option<&Domain>,
597 limit: usize,
598 ) -> Result<Vec<Entity>> {
599 let conn = acquire_lock(&self.conn);
600
601 let mut conditions = vec!["(name LIKE ?1 OR aliases LIKE ?1)".to_string()];
602 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(format!("%{name}%"))];
603
604 if let Some(ref et) = entity_type {
605 conditions.push(format!("entity_type = ?{}", params.len() + 1));
606 params.push(Box::new(et.as_str().to_string()));
607 }
608
609 if let Some(d) = domain {
610 if let Some(ref org) = d.organization {
611 conditions.push(format!("domain_org = ?{}", params.len() + 1));
612 params.push(Box::new(org.clone()));
613 }
614 if let Some(ref project) = d.project {
615 conditions.push(format!("domain_project = ?{}", params.len() + 1));
616 params.push(Box::new(project.clone()));
617 }
618 if let Some(ref repo) = d.repository {
619 conditions.push(format!("domain_repo = ?{}", params.len() + 1));
620 params.push(Box::new(repo.clone()));
621 }
622 }
623
624 let sql = format!(
625 "SELECT * FROM graph_entities WHERE {} ORDER BY confidence DESC LIMIT {}",
626 conditions.join(" AND "),
627 limit
628 );
629
630 let mut stmt = conn.prepare(&sql).map_err(|e| Error::OperationFailed {
631 operation: "find_entities_by_name_prepare".to_string(),
632 cause: e.to_string(),
633 })?;
634
635 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
636
637 let entities = stmt
638 .query_map(param_refs.as_slice(), Self::parse_entity_row)
639 .map_err(|e| Error::OperationFailed {
640 operation: "find_entities_by_name".to_string(),
641 cause: e.to_string(),
642 })?
643 .filter_map(|r| r.ok())
644 .collect();
645
646 Ok(entities)
647 }
648
649 #[instrument(skip(self, relationship))]
654 fn store_relationship(&self, relationship: &Relationship) -> Result<()> {
655 let conn = acquire_lock(&self.conn);
656
657 let properties_json =
658 serde_json::to_string(&relationship.properties).unwrap_or_else(|_| "{}".to_string());
659
660 conn.execute(
661 "INSERT INTO graph_relationships (
662 from_entity_id, to_entity_id, relationship_type, confidence,
663 valid_time_start, valid_time_end, transaction_time, properties
664 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
665 ON CONFLICT(from_entity_id, to_entity_id, relationship_type) DO UPDATE SET
666 confidence = excluded.confidence,
667 valid_time_start = excluded.valid_time_start,
668 valid_time_end = excluded.valid_time_end,
669 properties = excluded.properties",
670 params![
671 relationship.from_entity.as_str(),
672 relationship.to_entity.as_str(),
673 relationship.relationship_type.as_str(),
674 f64::from(relationship.confidence),
675 relationship.valid_time.start,
676 relationship.valid_time.end,
677 relationship.transaction_time.timestamp(),
678 properties_json,
679 ],
680 )
681 .map_err(|e| Error::OperationFailed {
682 operation: "store_relationship".to_string(),
683 cause: e.to_string(),
684 })?;
685
686 metrics::counter!("graph_relationships_stored_total").increment(1);
687 Ok(())
688 }
689
690 #[instrument(skip(self, query))]
691 fn query_relationships(&self, query: &RelationshipQuery) -> Result<Vec<Relationship>> {
692 let conn = acquire_lock(&self.conn);
693
694 let mut conditions = Vec::new();
695 let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
696
697 if let Some(ref from_entity) = query.from_entity {
698 conditions.push(format!("from_entity_id = ?{}", params.len() + 1));
699 params.push(Box::new(from_entity.as_str().to_string()));
700 }
701
702 if let Some(ref to_entity) = query.to_entity {
703 conditions.push(format!("to_entity_id = ?{}", params.len() + 1));
704 params.push(Box::new(to_entity.as_str().to_string()));
705 }
706
707 if let Some(ref relationship_type) = query.relationship_type {
708 conditions.push(format!("relationship_type = ?{}", params.len() + 1));
709 params.push(Box::new(relationship_type.as_str().to_string()));
710 }
711
712 if let Some(min_confidence) = query.min_confidence {
713 conditions.push(format!("confidence >= ?{}", params.len() + 1));
714 params.push(Box::new(f64::from(min_confidence)));
715 }
716
717 let where_clause = if conditions.is_empty() {
718 String::new()
719 } else {
720 format!("WHERE {}", conditions.join(" AND "))
721 };
722
723 let limit = query.limit.unwrap_or(100);
724 let sql = format!(
725 "SELECT * FROM graph_relationships {where_clause} ORDER BY confidence DESC LIMIT {limit}"
726 );
727
728 let mut stmt = conn.prepare(&sql).map_err(|e| Error::OperationFailed {
729 operation: "query_relationships_prepare".to_string(),
730 cause: e.to_string(),
731 })?;
732
733 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
734
735 let relationships = stmt
736 .query_map(param_refs.as_slice(), Self::parse_relationship_row)
737 .map_err(|e| Error::OperationFailed {
738 operation: "query_relationships".to_string(),
739 cause: e.to_string(),
740 })?
741 .filter_map(|r| r.ok())
742 .collect();
743
744 Ok(relationships)
745 }
746
747 #[instrument(skip(self, query))]
748 fn delete_relationships(&self, query: &RelationshipQuery) -> Result<usize> {
749 let conn = acquire_lock(&self.conn);
750
751 let mut conditions = Vec::new();
752 let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
753
754 if let Some(ref from_entity) = query.from_entity {
755 conditions.push(format!("from_entity_id = ?{}", params.len() + 1));
756 params.push(Box::new(from_entity.as_str().to_string()));
757 }
758
759 if let Some(ref to_entity) = query.to_entity {
760 conditions.push(format!("to_entity_id = ?{}", params.len() + 1));
761 params.push(Box::new(to_entity.as_str().to_string()));
762 }
763
764 if let Some(ref relationship_type) = query.relationship_type {
765 conditions.push(format!("relationship_type = ?{}", params.len() + 1));
766 params.push(Box::new(relationship_type.as_str().to_string()));
767 }
768
769 if conditions.is_empty() {
770 return Ok(0); }
772
773 let sql = format!(
774 "DELETE FROM graph_relationships WHERE {}",
775 conditions.join(" AND ")
776 );
777
778 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
779
780 let rows =
781 conn.execute(&sql, param_refs.as_slice())
782 .map_err(|e| Error::OperationFailed {
783 operation: "delete_relationships".to_string(),
784 cause: e.to_string(),
785 })?;
786
787 if rows > 0 {
788 metrics::counter!("graph_relationships_deleted_total").increment(rows as u64);
789 }
790
791 Ok(rows)
792 }
793
794 #[instrument(skip(self))]
795 fn get_relationship_types(
796 &self,
797 from_entity: &EntityId,
798 to_entity: &EntityId,
799 ) -> Result<Vec<RelationshipType>> {
800 let conn = acquire_lock(&self.conn);
801
802 let mut stmt = conn
803 .prepare(
804 "SELECT DISTINCT relationship_type FROM graph_relationships
805 WHERE from_entity_id = ?1 AND to_entity_id = ?2",
806 )
807 .map_err(|e| Error::OperationFailed {
808 operation: "get_relationship_types_prepare".to_string(),
809 cause: e.to_string(),
810 })?;
811
812 let types = stmt
813 .query_map(params![from_entity.as_str(), to_entity.as_str()], |row| {
814 let type_str: String = row.get(0)?;
815 Ok(RelationshipType::parse(&type_str).unwrap_or(RelationshipType::RelatesTo))
816 })
817 .map_err(|e| Error::OperationFailed {
818 operation: "get_relationship_types".to_string(),
819 cause: e.to_string(),
820 })?
821 .filter_map(|r| r.ok())
822 .collect();
823
824 Ok(types)
825 }
826
827 #[instrument(skip(self, mention))]
832 fn store_mention(&self, mention: &EntityMention) -> Result<()> {
833 let conn = acquire_lock(&self.conn);
834
835 conn.execute(
836 "INSERT INTO graph_entity_mentions (entity_id, memory_id, confidence, start_offset, end_offset, matched_text, transaction_time)
837 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
838 ON CONFLICT(entity_id, memory_id) DO UPDATE SET
839 confidence = excluded.confidence,
840 start_offset = excluded.start_offset,
841 end_offset = excluded.end_offset,
842 matched_text = excluded.matched_text",
843 params![
844 mention.entity_id.as_str(),
845 mention.memory_id.as_str(),
846 f64::from(mention.confidence),
847 mention.start_offset.map(|v| v as i64),
848 mention.end_offset.map(|v| v as i64),
849 mention.matched_text,
850 mention.transaction_time.timestamp(),
851 ],
852 )
853 .map_err(|e| Error::OperationFailed {
854 operation: "store_mention".to_string(),
855 cause: e.to_string(),
856 })?;
857
858 conn.execute(
860 "UPDATE graph_entities SET mention_count = mention_count + 1 WHERE id = ?1",
861 params![mention.entity_id.as_str()],
862 )
863 .ok();
864
865 metrics::counter!("graph_mentions_stored_total").increment(1);
866 Ok(())
867 }
868
869 #[instrument(skip(self))]
870 fn get_mentions_for_entity(&self, entity_id: &EntityId) -> Result<Vec<EntityMention>> {
871 let conn = acquire_lock(&self.conn);
872
873 let mut stmt = conn
874 .prepare(
875 "SELECT entity_id, memory_id, confidence, start_offset, end_offset, matched_text, transaction_time
876 FROM graph_entity_mentions WHERE entity_id = ?1 ORDER BY transaction_time DESC",
877 )
878 .map_err(|e| Error::OperationFailed {
879 operation: "get_mentions_for_entity_prepare".to_string(),
880 cause: e.to_string(),
881 })?;
882
883 let mentions = stmt
884 .query_map(params![entity_id.as_str()], |row| {
885 let start_offset: Option<i64> = row.get("start_offset")?;
886 let end_offset: Option<i64> = row.get("end_offset")?;
887 let tx_time: i64 = row.get("transaction_time")?;
888
889 Ok(EntityMention {
890 entity_id: EntityId::new(row.get::<_, String>("entity_id")?),
891 memory_id: MemoryId::new(row.get::<_, String>("memory_id")?),
892 confidence: row.get("confidence")?,
893 start_offset: start_offset.map(|v| v as usize),
894 end_offset: end_offset.map(|v| v as usize),
895 matched_text: row.get("matched_text")?,
896 transaction_time: TransactionTime::at(tx_time),
897 })
898 })
899 .map_err(|e| Error::OperationFailed {
900 operation: "get_mentions_for_entity".to_string(),
901 cause: e.to_string(),
902 })?
903 .filter_map(|r| r.ok())
904 .collect();
905
906 Ok(mentions)
907 }
908
909 #[instrument(skip(self))]
910 fn get_entities_in_memory(&self, memory_id: &MemoryId) -> Result<Vec<Entity>> {
911 let conn = acquire_lock(&self.conn);
912
913 let mut stmt = conn
914 .prepare(
915 "SELECT e.* FROM graph_entities e
916 INNER JOIN graph_entity_mentions m ON e.id = m.entity_id
917 WHERE m.memory_id = ?1
918 ORDER BY m.confidence DESC",
919 )
920 .map_err(|e| Error::OperationFailed {
921 operation: "get_entities_in_memory_prepare".to_string(),
922 cause: e.to_string(),
923 })?;
924
925 let entities = stmt
926 .query_map(params![memory_id.as_str()], Self::parse_entity_row)
927 .map_err(|e| Error::OperationFailed {
928 operation: "get_entities_in_memory".to_string(),
929 cause: e.to_string(),
930 })?
931 .filter_map(|r| r.ok())
932 .collect();
933
934 Ok(entities)
935 }
936
937 #[instrument(skip(self))]
938 fn delete_mentions_for_entity(&self, entity_id: &EntityId) -> Result<usize> {
939 let conn = acquire_lock(&self.conn);
940
941 let rows = conn
942 .execute(
943 "DELETE FROM graph_entity_mentions WHERE entity_id = ?1",
944 params![entity_id.as_str()],
945 )
946 .map_err(|e| Error::OperationFailed {
947 operation: "delete_mentions_for_entity".to_string(),
948 cause: e.to_string(),
949 })?;
950
951 conn.execute(
953 "UPDATE graph_entities SET mention_count = 0 WHERE id = ?1",
954 params![entity_id.as_str()],
955 )
956 .ok();
957
958 Ok(rows)
959 }
960
961 #[instrument(skip(self))]
962 fn delete_mentions_for_memory(&self, memory_id: &MemoryId) -> Result<usize> {
963 let conn = acquire_lock(&self.conn);
964
965 let mut stmt = conn
967 .prepare("SELECT entity_id FROM graph_entity_mentions WHERE memory_id = ?1")
968 .map_err(|e| Error::OperationFailed {
969 operation: "delete_mentions_for_memory_prepare".to_string(),
970 cause: e.to_string(),
971 })?;
972
973 let entity_ids: Vec<String> = stmt
974 .query_map(params![memory_id.as_str()], |row| row.get(0))
975 .map_err(|e| Error::OperationFailed {
976 operation: "delete_mentions_for_memory_get_entities".to_string(),
977 cause: e.to_string(),
978 })?
979 .filter_map(|r| r.ok())
980 .collect();
981
982 let rows = conn
984 .execute(
985 "DELETE FROM graph_entity_mentions WHERE memory_id = ?1",
986 params![memory_id.as_str()],
987 )
988 .map_err(|e| Error::OperationFailed {
989 operation: "delete_mentions_for_memory".to_string(),
990 cause: e.to_string(),
991 })?;
992
993 for entity_id in entity_ids {
995 conn.execute(
996 "UPDATE graph_entities SET mention_count = MAX(0, mention_count - 1) WHERE id = ?1",
997 params![entity_id],
998 )
999 .ok();
1000 }
1001
1002 Ok(rows)
1003 }
1004
1005 #[instrument(skip(self))]
1010 fn traverse(
1011 &self,
1012 start: &EntityId,
1013 max_depth: u32,
1014 relationship_types: Option<&[RelationshipType]>,
1015 min_confidence: Option<f32>,
1016 ) -> Result<TraversalResult> {
1017 let conn = acquire_lock(&self.conn);
1018
1019 let type_filter = relationship_types
1021 .map(|types| {
1022 let type_strs: Vec<String> =
1023 types.iter().map(|t| format!("'{}'", t.as_str())).collect();
1024 format!("AND r.relationship_type IN ({})", type_strs.join(", "))
1025 })
1026 .unwrap_or_default();
1027
1028 let confidence_filter = min_confidence
1029 .map(|c| format!("AND r.confidence >= {c}"))
1030 .unwrap_or_default();
1031
1032 let sql = format!(
1034 "WITH RECURSIVE reachable(entity_id, depth, path) AS (
1035 -- Base case: start node
1036 SELECT ?1, 0, ?1
1037 UNION ALL
1038 -- Recursive case: follow relationships
1039 SELECT r.to_entity_id, reachable.depth + 1, reachable.path || ',' || r.to_entity_id
1040 FROM reachable
1041 JOIN graph_relationships r ON r.from_entity_id = reachable.entity_id
1042 WHERE reachable.depth < ?2
1043 AND instr(reachable.path, r.to_entity_id) = 0
1044 {type_filter}
1045 {confidence_filter}
1046 )
1047 SELECT DISTINCT e.*, reachable.depth
1048 FROM reachable
1049 JOIN graph_entities e ON e.id = reachable.entity_id
1050 ORDER BY reachable.depth, e.mention_count DESC"
1051 );
1052
1053 let mut stmt = conn.prepare(&sql).map_err(|e| Error::OperationFailed {
1054 operation: "traverse_prepare".to_string(),
1055 cause: e.to_string(),
1056 })?;
1057
1058 let entities: Vec<Entity> = stmt
1059 .query_map(params![start.as_str(), max_depth], |row| {
1060 Self::parse_entity_row(row)
1061 })
1062 .map_err(|e| Error::OperationFailed {
1063 operation: "traverse".to_string(),
1064 cause: e.to_string(),
1065 })?
1066 .filter_map(|r| r.ok())
1067 .collect();
1068
1069 let total_count = entities.len();
1070
1071 let entity_ids: Vec<String> = entities.iter().map(|e| e.id.as_str().to_string()).collect();
1073 let relationships = if entity_ids.is_empty() {
1074 Vec::new()
1075 } else {
1076 let from_placeholders: Vec<String> =
1078 (1..=entity_ids.len()).map(|i| format!("?{i}")).collect();
1079 let to_placeholders: Vec<String> = (entity_ids.len() + 1..=entity_ids.len() * 2)
1080 .map(|i| format!("?{i}"))
1081 .collect();
1082
1083 let from_clause = from_placeholders.join(", ");
1084 let to_clause = to_placeholders.join(", ");
1085
1086 let rel_sql = format!(
1087 "SELECT * FROM graph_relationships
1088 WHERE from_entity_id IN ({from_clause}) AND to_entity_id IN ({to_clause})"
1089 );
1090
1091 let mut rel_stmt = conn.prepare(&rel_sql).map_err(|e| Error::OperationFailed {
1092 operation: "traverse_relationships_prepare".to_string(),
1093 cause: e.to_string(),
1094 })?;
1095
1096 let mut params_vec: Vec<&dyn rusqlite::ToSql> = Vec::new();
1098 for id in &entity_ids {
1099 params_vec.push(id);
1100 }
1101 for id in &entity_ids {
1102 params_vec.push(id);
1103 }
1104
1105 rel_stmt
1106 .query_map(params_vec.as_slice(), Self::parse_relationship_row)
1107 .map_err(|e| Error::OperationFailed {
1108 operation: "traverse_relationships".to_string(),
1109 cause: e.to_string(),
1110 })?
1111 .filter_map(|r| r.ok())
1112 .collect()
1113 };
1114
1115 Ok(TraversalResult {
1116 entities,
1117 relationships,
1118 total_count,
1119 })
1120 }
1121
1122 #[instrument(skip(self))]
1123 fn find_path(
1124 &self,
1125 from: &EntityId,
1126 to: &EntityId,
1127 max_depth: u32,
1128 ) -> Result<Option<TraversalResult>> {
1129 let conn = acquire_lock(&self.conn);
1130
1131 let sql = "WITH RECURSIVE path_finder(entity_id, depth, path) AS (
1133 SELECT ?1, 0, ?1
1134 UNION ALL
1135 SELECT r.to_entity_id, path_finder.depth + 1, path_finder.path || ',' || r.to_entity_id
1136 FROM path_finder
1137 JOIN graph_relationships r ON r.from_entity_id = path_finder.entity_id
1138 WHERE path_finder.depth < ?3
1139 AND instr(path_finder.path, r.to_entity_id) = 0
1140 )
1141 SELECT path FROM path_finder WHERE entity_id = ?2 ORDER BY depth LIMIT 1";
1142
1143 let path: Option<String> = conn
1144 .query_row(sql, params![from.as_str(), to.as_str(), max_depth], |row| {
1145 row.get(0)
1146 })
1147 .optional()
1148 .map_err(|e| Error::OperationFailed {
1149 operation: "find_path".to_string(),
1150 cause: e.to_string(),
1151 })?;
1152
1153 if let Some(path_str) = path {
1154 let entity_ids: Vec<&str> = path_str.split(',').collect();
1155
1156 let placeholders: Vec<String> =
1158 (1..=entity_ids.len()).map(|i| format!("?{i}")).collect();
1159 let in_clause = placeholders.join(", ");
1160
1161 let entity_sql = format!("SELECT * FROM graph_entities WHERE id IN ({in_clause})");
1162 let mut stmt = conn
1163 .prepare(&entity_sql)
1164 .map_err(|e| Error::OperationFailed {
1165 operation: "find_path_entities_prepare".to_string(),
1166 cause: e.to_string(),
1167 })?;
1168
1169 let params: Vec<&dyn rusqlite::ToSql> = entity_ids
1170 .iter()
1171 .map(|s| s as &dyn rusqlite::ToSql)
1172 .collect();
1173
1174 let entities: Vec<Entity> = stmt
1175 .query_map(params.as_slice(), Self::parse_entity_row)
1176 .map_err(|e| Error::OperationFailed {
1177 operation: "find_path_entities".to_string(),
1178 cause: e.to_string(),
1179 })?
1180 .filter_map(|r| r.ok())
1181 .collect();
1182
1183 let total_count = entities.len();
1184
1185 let mut relationships = Vec::new();
1187 for window in entity_ids.windows(2) {
1188 if let [from_id, to_id] = window {
1189 let rel: Option<Relationship> = conn
1190 .query_row(
1191 "SELECT * FROM graph_relationships WHERE from_entity_id = ?1 AND to_entity_id = ?2 LIMIT 1",
1192 params![from_id, to_id],
1193 Self::parse_relationship_row,
1194 )
1195 .optional()
1196 .ok()
1197 .flatten();
1198
1199 if let Some(r) = rel {
1200 relationships.push(r);
1201 }
1202 }
1203 }
1204
1205 Ok(Some(TraversalResult {
1206 entities,
1207 relationships,
1208 total_count,
1209 }))
1210 } else {
1211 Ok(None)
1212 }
1213 }
1214
1215 #[instrument(skip(self, query, point))]
1220 fn query_entities_at(
1221 &self,
1222 query: &EntityQuery,
1223 point: &BitemporalPoint,
1224 ) -> Result<Vec<Entity>> {
1225 let conn = acquire_lock(&self.conn);
1226
1227 let (base_where, mut params) = Self::build_entity_where_clause(query);
1228
1229 let temporal_conditions = format!(
1231 "AND (valid_time_start IS NULL OR valid_time_start <= ?{}) \
1232 AND (valid_time_end IS NULL OR valid_time_end > ?{}) \
1233 AND transaction_time <= ?{}",
1234 params.len() + 1,
1235 params.len() + 2,
1236 params.len() + 3
1237 );
1238
1239 params.push(Box::new(point.valid_at));
1240 params.push(Box::new(point.valid_at));
1241 params.push(Box::new(point.as_of));
1242
1243 let where_clause = if base_where.is_empty() {
1244 format!("WHERE 1=1 {temporal_conditions}")
1245 } else {
1246 format!("{base_where} {temporal_conditions}")
1247 };
1248
1249 let limit = query.limit.unwrap_or(100);
1250 let sql = format!(
1251 "SELECT * FROM graph_entities {where_clause} ORDER BY mention_count DESC LIMIT {limit}"
1252 );
1253
1254 let mut stmt = conn.prepare(&sql).map_err(|e| Error::OperationFailed {
1255 operation: "query_entities_at_prepare".to_string(),
1256 cause: e.to_string(),
1257 })?;
1258
1259 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
1260
1261 let entities = stmt
1262 .query_map(param_refs.as_slice(), Self::parse_entity_row)
1263 .map_err(|e| Error::OperationFailed {
1264 operation: "query_entities_at".to_string(),
1265 cause: e.to_string(),
1266 })?
1267 .filter_map(|r| r.ok())
1268 .collect();
1269
1270 Ok(entities)
1271 }
1272
1273 #[instrument(skip(self, query, point))]
1274 fn query_relationships_at(
1275 &self,
1276 query: &RelationshipQuery,
1277 point: &BitemporalPoint,
1278 ) -> Result<Vec<Relationship>> {
1279 let conn = acquire_lock(&self.conn);
1280
1281 let mut conditions = Vec::new();
1282 let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
1283
1284 if let Some(ref from_entity) = query.from_entity {
1285 conditions.push(format!("from_entity_id = ?{}", params.len() + 1));
1286 params.push(Box::new(from_entity.as_str().to_string()));
1287 }
1288
1289 if let Some(ref to_entity) = query.to_entity {
1290 conditions.push(format!("to_entity_id = ?{}", params.len() + 1));
1291 params.push(Box::new(to_entity.as_str().to_string()));
1292 }
1293
1294 if let Some(ref relationship_type) = query.relationship_type {
1295 conditions.push(format!("relationship_type = ?{}", params.len() + 1));
1296 params.push(Box::new(relationship_type.as_str().to_string()));
1297 }
1298
1299 conditions.push(format!(
1301 "(valid_time_start IS NULL OR valid_time_start <= ?{})",
1302 params.len() + 1
1303 ));
1304 params.push(Box::new(point.valid_at));
1305
1306 conditions.push(format!(
1307 "(valid_time_end IS NULL OR valid_time_end > ?{})",
1308 params.len() + 1
1309 ));
1310 params.push(Box::new(point.valid_at));
1311
1312 conditions.push(format!("transaction_time <= ?{}", params.len() + 1));
1313 params.push(Box::new(point.as_of));
1314
1315 let where_clause = format!("WHERE {}", conditions.join(" AND "));
1316 let limit = query.limit.unwrap_or(100);
1317
1318 let sql = format!(
1319 "SELECT * FROM graph_relationships {where_clause} ORDER BY confidence DESC LIMIT {limit}"
1320 );
1321
1322 let mut stmt = conn.prepare(&sql).map_err(|e| Error::OperationFailed {
1323 operation: "query_relationships_at_prepare".to_string(),
1324 cause: e.to_string(),
1325 })?;
1326
1327 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
1328
1329 let relationships = stmt
1330 .query_map(param_refs.as_slice(), Self::parse_relationship_row)
1331 .map_err(|e| Error::OperationFailed {
1332 operation: "query_relationships_at".to_string(),
1333 cause: e.to_string(),
1334 })?
1335 .filter_map(|r| r.ok())
1336 .collect();
1337
1338 Ok(relationships)
1339 }
1340
1341 #[instrument(skip(self))]
1342 fn close_entity_valid_time(&self, id: &EntityId, end_time: i64) -> Result<()> {
1343 let conn = acquire_lock(&self.conn);
1344
1345 let rows = conn
1346 .execute(
1347 "UPDATE graph_entities SET valid_time_end = ?1 WHERE id = ?2",
1348 params![end_time, id.as_str()],
1349 )
1350 .map_err(|e| Error::OperationFailed {
1351 operation: "close_entity_valid_time".to_string(),
1352 cause: e.to_string(),
1353 })?;
1354
1355 if rows == 0 {
1356 return Err(Error::OperationFailed {
1357 operation: "close_entity_valid_time".to_string(),
1358 cause: format!("Entity not found: {}", id.as_str()),
1359 });
1360 }
1361
1362 Ok(())
1363 }
1364
1365 #[instrument(skip(self))]
1366 fn close_relationship_valid_time(
1367 &self,
1368 from_entity: &EntityId,
1369 to_entity: &EntityId,
1370 relationship_type: RelationshipType,
1371 end_time: i64,
1372 ) -> Result<()> {
1373 let conn = acquire_lock(&self.conn);
1374
1375 let rows = conn
1376 .execute(
1377 "UPDATE graph_relationships SET valid_time_end = ?1
1378 WHERE from_entity_id = ?2 AND to_entity_id = ?3 AND relationship_type = ?4",
1379 params![
1380 end_time,
1381 from_entity.as_str(),
1382 to_entity.as_str(),
1383 relationship_type.as_str()
1384 ],
1385 )
1386 .map_err(|e| Error::OperationFailed {
1387 operation: "close_relationship_valid_time".to_string(),
1388 cause: e.to_string(),
1389 })?;
1390
1391 if rows == 0 {
1392 return Err(Error::OperationFailed {
1393 operation: "close_relationship_valid_time".to_string(),
1394 cause: "Relationship not found".to_string(),
1395 });
1396 }
1397
1398 Ok(())
1399 }
1400
1401 #[instrument(skip(self))]
1406 fn get_stats(&self) -> Result<GraphStats> {
1407 let conn = acquire_lock(&self.conn);
1408
1409 let entity_count: i64 = conn
1410 .query_row("SELECT COUNT(*) FROM graph_entities", [], |row| row.get(0))
1411 .unwrap_or(0);
1412
1413 let relationship_count: i64 = conn
1414 .query_row("SELECT COUNT(*) FROM graph_relationships", [], |row| {
1415 row.get(0)
1416 })
1417 .unwrap_or(0);
1418
1419 let mention_count: i64 = conn
1420 .query_row("SELECT COUNT(*) FROM graph_entity_mentions", [], |row| {
1421 row.get(0)
1422 })
1423 .unwrap_or(0);
1424
1425 let mut entities_by_type = HashMap::new();
1427 let mut stmt = conn
1428 .prepare("SELECT entity_type, COUNT(*) FROM graph_entities GROUP BY entity_type")
1429 .map_err(|e| Error::OperationFailed {
1430 operation: "get_stats_entities_by_type".to_string(),
1431 cause: e.to_string(),
1432 })?;
1433
1434 let type_counts = stmt
1435 .query_map([], |row| {
1436 let type_str: String = row.get(0)?;
1437 let count: i64 = row.get(1)?;
1438 Ok((type_str, count))
1439 })
1440 .map_err(|e| Error::OperationFailed {
1441 operation: "get_stats_entities_by_type_query".to_string(),
1442 cause: e.to_string(),
1443 })?;
1444
1445 for result in type_counts.flatten() {
1446 if let Some(entity_type) = EntityType::parse(&result.0) {
1447 entities_by_type.insert(entity_type, result.1 as usize);
1448 }
1449 }
1450
1451 let mut relationships_by_type = HashMap::new();
1453 let mut stmt = conn
1454 .prepare(
1455 "SELECT relationship_type, COUNT(*) FROM graph_relationships GROUP BY relationship_type",
1456 )
1457 .map_err(|e| Error::OperationFailed {
1458 operation: "get_stats_relationships_by_type".to_string(),
1459 cause: e.to_string(),
1460 })?;
1461
1462 let rel_counts = stmt
1463 .query_map([], |row| {
1464 let type_str: String = row.get(0)?;
1465 let count: i64 = row.get(1)?;
1466 Ok((type_str, count))
1467 })
1468 .map_err(|e| Error::OperationFailed {
1469 operation: "get_stats_relationships_by_type_query".to_string(),
1470 cause: e.to_string(),
1471 })?;
1472
1473 for result in rel_counts.flatten() {
1474 if let Some(rel_type) = RelationshipType::parse(&result.0) {
1475 relationships_by_type.insert(rel_type, result.1 as usize);
1476 }
1477 }
1478
1479 let avg_relationships_per_entity = if entity_count > 0 {
1480 relationship_count as f32 / entity_count as f32
1481 } else {
1482 0.0
1483 };
1484
1485 Ok(GraphStats {
1486 entity_count: entity_count as usize,
1487 entities_by_type,
1488 relationship_count: relationship_count as usize,
1489 relationships_by_type,
1490 mention_count: mention_count as usize,
1491 avg_relationships_per_entity,
1492 })
1493 }
1494
1495 #[instrument(skip(self))]
1496 fn clear(&self) -> Result<()> {
1497 let conn = acquire_lock(&self.conn);
1498
1499 conn.execute("DELETE FROM graph_entity_mentions", [])
1500 .map_err(|e| Error::OperationFailed {
1501 operation: "clear_mentions".to_string(),
1502 cause: e.to_string(),
1503 })?;
1504
1505 conn.execute("DELETE FROM graph_relationships", [])
1506 .map_err(|e| Error::OperationFailed {
1507 operation: "clear_relationships".to_string(),
1508 cause: e.to_string(),
1509 })?;
1510
1511 conn.execute("DELETE FROM graph_entities", [])
1512 .map_err(|e| Error::OperationFailed {
1513 operation: "clear_entities".to_string(),
1514 cause: e.to_string(),
1515 })?;
1516
1517 metrics::counter!("graph_cleared_total").increment(1);
1518 Ok(())
1519 }
1520}
1521
1522#[cfg(test)]
1523#[allow(clippy::redundant_clone)]
1524mod tests {
1525 use super::*;
1526
1527 fn create_test_entity(name: &str, entity_type: EntityType) -> Entity {
1528 Entity::new(entity_type, name, Domain::for_user())
1529 }
1530
1531 #[test]
1532 fn test_store_and_get_entity() {
1533 let backend = SqliteGraphBackend::in_memory().unwrap();
1534 let entity = create_test_entity("Alice", EntityType::Person);
1535
1536 backend.store_entity(&entity).unwrap();
1537
1538 let retrieved = backend.get_entity(&entity.id).unwrap();
1539 assert!(retrieved.is_some());
1540 let retrieved = retrieved.unwrap();
1541 assert_eq!(retrieved.name, "Alice");
1542 assert_eq!(retrieved.entity_type, EntityType::Person);
1543 }
1544
1545 #[test]
1546 fn test_query_entities_by_type() {
1547 let backend = SqliteGraphBackend::in_memory().unwrap();
1548
1549 backend
1550 .store_entity(&create_test_entity("Alice", EntityType::Person))
1551 .unwrap();
1552 backend
1553 .store_entity(&create_test_entity("Bob", EntityType::Person))
1554 .unwrap();
1555 backend
1556 .store_entity(&create_test_entity("Acme Inc", EntityType::Organization))
1557 .unwrap();
1558
1559 let query = EntityQuery::new().with_type(EntityType::Person);
1560 let results = backend.query_entities(&query).unwrap();
1561
1562 assert_eq!(results.len(), 2);
1563 assert!(results.iter().all(|e| e.entity_type == EntityType::Person));
1564 }
1565
1566 #[test]
1567 fn test_delete_entity() {
1568 let backend = SqliteGraphBackend::in_memory().unwrap();
1569 let entity = create_test_entity("Alice", EntityType::Person);
1570 let entity_id = entity.id.clone();
1571
1572 backend.store_entity(&entity).unwrap();
1573 assert!(backend.get_entity(&entity_id).unwrap().is_some());
1574
1575 let deleted = backend.delete_entity(&entity_id).unwrap();
1576 assert!(deleted);
1577
1578 assert!(backend.get_entity(&entity_id).unwrap().is_none());
1579 }
1580
1581 #[test]
1582 fn test_store_and_query_relationships() {
1583 let backend = SqliteGraphBackend::in_memory().unwrap();
1584
1585 let alice = create_test_entity("Alice", EntityType::Person);
1586 let acme = create_test_entity("Acme Inc", EntityType::Organization);
1587
1588 backend.store_entity(&alice).unwrap();
1589 backend.store_entity(&acme).unwrap();
1590
1591 let relationship =
1592 Relationship::new(alice.id.clone(), acme.id.clone(), RelationshipType::WorksAt);
1593
1594 backend.store_relationship(&relationship).unwrap();
1595
1596 let query = RelationshipQuery::new().from(alice.id.clone());
1597 let results = backend.query_relationships(&query).unwrap();
1598
1599 assert_eq!(results.len(), 1);
1600 assert_eq!(results[0].relationship_type, RelationshipType::WorksAt);
1601 }
1602
1603 #[test]
1604 fn test_entity_mentions() {
1605 let backend = SqliteGraphBackend::in_memory().unwrap();
1606
1607 let entity = create_test_entity("Alice", EntityType::Person);
1608 backend.store_entity(&entity).unwrap();
1609
1610 let mention = EntityMention::new(entity.id.clone(), MemoryId::new("mem_123"))
1611 .with_confidence(0.9)
1612 .with_span(0, 5, "Alice");
1613
1614 backend.store_mention(&mention).unwrap();
1615
1616 let mentions = backend.get_mentions_for_entity(&entity.id).unwrap();
1617 assert_eq!(mentions.len(), 1);
1618 assert_eq!(mentions[0].memory_id.as_str(), "mem_123");
1619
1620 let updated_entity = backend.get_entity(&entity.id).unwrap().unwrap();
1622 assert_eq!(updated_entity.mention_count, 2); }
1624
1625 #[test]
1626 fn test_graph_traversal() {
1627 let backend = SqliteGraphBackend::in_memory().unwrap();
1628
1629 let alice = create_test_entity("Alice", EntityType::Person);
1631 let acme = create_test_entity("Acme Inc", EntityType::Organization);
1632 let project = create_test_entity("Secret Project", EntityType::Concept);
1633
1634 backend.store_entity(&alice).unwrap();
1635 backend.store_entity(&acme).unwrap();
1636 backend.store_entity(&project).unwrap();
1637
1638 backend
1639 .store_relationship(&Relationship::new(
1640 alice.id.clone(),
1641 acme.id.clone(),
1642 RelationshipType::WorksAt,
1643 ))
1644 .unwrap();
1645
1646 backend
1647 .store_relationship(&Relationship::new(
1648 acme.id.clone(),
1649 project.id.clone(),
1650 RelationshipType::Created,
1651 ))
1652 .unwrap();
1653
1654 let result = backend.traverse(&alice.id, 2, None, None).unwrap();
1656
1657 assert_eq!(result.entities.len(), 3);
1659 assert!(result.entities.iter().any(|e| e.name == "Alice"));
1660 assert!(result.entities.iter().any(|e| e.name == "Acme Inc"));
1661 assert!(result.entities.iter().any(|e| e.name == "Secret Project"));
1662 }
1663
1664 #[test]
1665 fn test_find_path() {
1666 let backend = SqliteGraphBackend::in_memory().unwrap();
1667
1668 let alice = create_test_entity("Alice", EntityType::Person);
1669 let bob = create_test_entity("Bob", EntityType::Person);
1670 let acme = create_test_entity("Acme Inc", EntityType::Organization);
1671
1672 backend.store_entity(&alice).unwrap();
1673 backend.store_entity(&bob).unwrap();
1674 backend.store_entity(&acme).unwrap();
1675
1676 backend
1678 .store_relationship(&Relationship::new(
1679 alice.id.clone(),
1680 acme.id.clone(),
1681 RelationshipType::WorksAt,
1682 ))
1683 .unwrap();
1684
1685 backend
1686 .store_relationship(&Relationship::new(
1687 acme.id.clone(),
1688 bob.id.clone(),
1689 RelationshipType::RelatesTo,
1690 ))
1691 .unwrap();
1692
1693 let path = backend.find_path(&alice.id, &bob.id, 3).unwrap();
1694 assert!(path.is_some());
1695 let path = path.unwrap();
1696 assert_eq!(path.entities.len(), 3);
1697 }
1698
1699 #[test]
1700 fn test_temporal_queries() {
1701 let backend = SqliteGraphBackend::in_memory().unwrap();
1702
1703 let entity = Entity::new(EntityType::Person, "Alice", Domain::for_user())
1705 .with_valid_time(ValidTimeRange::between(100, 200));
1706
1707 backend.store_entity(&entity).unwrap();
1708
1709 let point = BitemporalPoint::new(150, i64::MAX);
1711 let query = EntityQuery::new();
1712 let results = backend.query_entities_at(&query, &point).unwrap();
1713 assert_eq!(results.len(), 1);
1714
1715 let point_before = BitemporalPoint::new(50, i64::MAX);
1717 let results_before = backend.query_entities_at(&query, &point_before).unwrap();
1718 assert_eq!(results_before.len(), 0);
1719
1720 let point_after = BitemporalPoint::new(250, i64::MAX);
1722 let results_after = backend.query_entities_at(&query, &point_after).unwrap();
1723 assert_eq!(results_after.len(), 0);
1724 }
1725
1726 #[test]
1727 fn test_merge_entities() {
1728 let backend = SqliteGraphBackend::in_memory().unwrap();
1729
1730 let alice1 = create_test_entity("Alice Smith", EntityType::Person);
1731 let alice2 = create_test_entity("A. Smith", EntityType::Person);
1732
1733 backend.store_entity(&alice1).unwrap();
1734 backend.store_entity(&alice2).unwrap();
1735
1736 let merged = backend
1737 .merge_entities(&[alice1.id.clone(), alice2.id.clone()], "Alice Smith")
1738 .unwrap();
1739
1740 assert_eq!(merged.name, "Alice Smith");
1741 assert!(merged.aliases.contains(&"A. Smith".to_string()));
1742
1743 assert!(backend.get_entity(&alice1.id).unwrap().is_some());
1745 assert!(backend.get_entity(&alice2.id).unwrap().is_none());
1747 }
1748
1749 #[test]
1750 fn test_get_stats() {
1751 let backend = SqliteGraphBackend::in_memory().unwrap();
1752
1753 backend
1754 .store_entity(&create_test_entity("Alice", EntityType::Person))
1755 .unwrap();
1756 backend
1757 .store_entity(&create_test_entity("Bob", EntityType::Person))
1758 .unwrap();
1759 backend
1760 .store_entity(&create_test_entity("Acme", EntityType::Organization))
1761 .unwrap();
1762
1763 let stats = backend.get_stats().unwrap();
1764
1765 assert_eq!(stats.entity_count, 3);
1766 assert_eq!(stats.entities_by_type.get(&EntityType::Person), Some(&2));
1767 assert_eq!(
1768 stats.entities_by_type.get(&EntityType::Organization),
1769 Some(&1)
1770 );
1771 }
1772
1773 #[test]
1774 fn test_clear() {
1775 let backend = SqliteGraphBackend::in_memory().unwrap();
1776
1777 backend
1778 .store_entity(&create_test_entity("Alice", EntityType::Person))
1779 .unwrap();
1780 backend
1781 .store_entity(&create_test_entity("Bob", EntityType::Person))
1782 .unwrap();
1783
1784 backend.clear().unwrap();
1785
1786 let stats = backend.get_stats().unwrap();
1787 assert_eq!(stats.entity_count, 0);
1788 }
1789
1790 #[test]
1795 fn test_entity_upsert() {
1796 let backend = SqliteGraphBackend::in_memory().unwrap();
1797
1798 let mut entity = create_test_entity("Alice", EntityType::Person);
1799 backend.store_entity(&entity).unwrap();
1800
1801 entity.confidence = 0.95;
1803 backend.store_entity(&entity).unwrap();
1804
1805 let retrieved = backend.get_entity(&entity.id).unwrap().unwrap();
1806 assert!((retrieved.confidence - 0.95).abs() < f32::EPSILON);
1807
1808 let stats = backend.get_stats().unwrap();
1810 assert_eq!(stats.entity_count, 1);
1811 }
1812
1813 #[test]
1814 fn test_entity_aliases() {
1815 let backend = SqliteGraphBackend::in_memory().unwrap();
1816
1817 let entity = Entity::new(EntityType::Person, "Robert", Domain::for_user())
1818 .with_aliases(vec!["Bob".to_string(), "Rob".to_string()]);
1819
1820 backend.store_entity(&entity).unwrap();
1821
1822 let results = backend
1824 .find_entities_by_name("Bob", Some(EntityType::Person), None, 10)
1825 .unwrap();
1826 assert_eq!(results.len(), 1);
1827 assert_eq!(results[0].name, "Robert");
1828 }
1829
1830 #[test]
1831 fn test_find_entities_case_insensitive() {
1832 let backend = SqliteGraphBackend::in_memory().unwrap();
1833
1834 backend
1835 .store_entity(&create_test_entity("Alice", EntityType::Person))
1836 .unwrap();
1837
1838 let results = backend
1840 .find_entities_by_name("alice", Some(EntityType::Person), None, 10)
1841 .unwrap();
1842 assert_eq!(results.len(), 1);
1843
1844 let results_upper = backend
1845 .find_entities_by_name("ALICE", Some(EntityType::Person), None, 10)
1846 .unwrap();
1847 assert_eq!(results_upper.len(), 1);
1848 }
1849
1850 #[test]
1851 fn test_query_entities_with_offset() {
1852 let backend = SqliteGraphBackend::in_memory().unwrap();
1853
1854 for i in 0..5 {
1856 backend
1857 .store_entity(&create_test_entity(
1858 &format!("Person{i}"),
1859 EntityType::Person,
1860 ))
1861 .unwrap();
1862 }
1863
1864 let query = EntityQuery::new()
1865 .with_type(EntityType::Person)
1866 .with_limit(2)
1867 .with_offset(2);
1868
1869 let results = backend.query_entities(&query).unwrap();
1870 assert_eq!(results.len(), 2);
1871 }
1872
1873 #[test]
1874 fn test_query_entities_by_domain() {
1875 let backend = SqliteGraphBackend::in_memory().unwrap();
1876
1877 let domain1 = Domain {
1878 organization: Some("org1".to_string()),
1879 project: None,
1880 repository: None,
1881 };
1882 let domain2 = Domain {
1883 organization: Some("org2".to_string()),
1884 project: None,
1885 repository: None,
1886 };
1887
1888 let entity1 = Entity::new(EntityType::Person, "Alice", domain1.clone());
1889 let entity2 = Entity::new(EntityType::Person, "Bob", domain2);
1890
1891 backend.store_entity(&entity1).unwrap();
1892 backend.store_entity(&entity2).unwrap();
1893
1894 let query = EntityQuery::new().with_domain(domain1);
1895 let results = backend.query_entities(&query).unwrap();
1896
1897 assert_eq!(results.len(), 1);
1898 assert_eq!(results[0].name, "Alice");
1899 }
1900
1901 #[test]
1902 fn test_query_entities_by_confidence() {
1903 let backend = SqliteGraphBackend::in_memory().unwrap();
1904
1905 let high_conf =
1906 Entity::new(EntityType::Person, "Alice", Domain::for_user()).with_confidence(0.9);
1907 let low_conf =
1908 Entity::new(EntityType::Person, "Bob", Domain::for_user()).with_confidence(0.3);
1909
1910 backend.store_entity(&high_conf).unwrap();
1911 backend.store_entity(&low_conf).unwrap();
1912
1913 let query = EntityQuery::new().with_min_confidence(0.5);
1914 let results = backend.query_entities(&query).unwrap();
1915
1916 assert_eq!(results.len(), 1);
1917 assert_eq!(results[0].name, "Alice");
1918 }
1919
1920 #[test]
1925 fn test_relationship_upsert() {
1926 let backend = SqliteGraphBackend::in_memory().unwrap();
1927
1928 let alice = create_test_entity("Alice", EntityType::Person);
1929 let acme = create_test_entity("Acme", EntityType::Organization);
1930
1931 backend.store_entity(&alice).unwrap();
1932 backend.store_entity(&acme).unwrap();
1933
1934 let mut rel =
1935 Relationship::new(alice.id.clone(), acme.id.clone(), RelationshipType::WorksAt);
1936 backend.store_relationship(&rel).unwrap();
1937
1938 rel.confidence = 0.95;
1940 backend.store_relationship(&rel).unwrap();
1941
1942 let query = RelationshipQuery::new().from(alice.id.clone());
1943 let results = backend.query_relationships(&query).unwrap();
1944
1945 assert_eq!(results.len(), 1);
1946 assert!((results[0].confidence - 0.95).abs() < f32::EPSILON);
1947 }
1948
1949 #[test]
1950 fn test_query_relationships_by_type() {
1951 let backend = SqliteGraphBackend::in_memory().unwrap();
1952
1953 let alice = create_test_entity("Alice", EntityType::Person);
1954 let bob = create_test_entity("Bob", EntityType::Person);
1955 let acme = create_test_entity("Acme", EntityType::Organization);
1956
1957 backend.store_entity(&alice).unwrap();
1958 backend.store_entity(&bob).unwrap();
1959 backend.store_entity(&acme).unwrap();
1960
1961 backend
1962 .store_relationship(&Relationship::new(
1963 alice.id.clone(),
1964 acme.id.clone(),
1965 RelationshipType::WorksAt,
1966 ))
1967 .unwrap();
1968 backend
1969 .store_relationship(&Relationship::new(
1970 alice.id.clone(),
1971 bob.id.clone(),
1972 RelationshipType::RelatesTo,
1973 ))
1974 .unwrap();
1975
1976 let query = RelationshipQuery::new().with_type(RelationshipType::WorksAt);
1977 let results = backend.query_relationships(&query).unwrap();
1978
1979 assert_eq!(results.len(), 1);
1980 assert_eq!(results[0].to_entity, acme.id);
1981 }
1982
1983 #[test]
1984 fn test_delete_relationships() {
1985 let backend = SqliteGraphBackend::in_memory().unwrap();
1986
1987 let alice = create_test_entity("Alice", EntityType::Person);
1988 let bob = create_test_entity("Bob", EntityType::Person);
1989
1990 backend.store_entity(&alice).unwrap();
1991 backend.store_entity(&bob).unwrap();
1992
1993 backend
1994 .store_relationship(&Relationship::new(
1995 alice.id.clone(),
1996 bob.id.clone(),
1997 RelationshipType::RelatesTo,
1998 ))
1999 .unwrap();
2000
2001 let query = RelationshipQuery::new().from(alice.id.clone());
2002 let deleted = backend.delete_relationships(&query).unwrap();
2003
2004 assert_eq!(deleted, 1);
2005
2006 let remaining = backend.query_relationships(&query).unwrap();
2007 assert!(remaining.is_empty());
2008 }
2009
2010 #[test]
2011 fn test_get_relationship_types() {
2012 let backend = SqliteGraphBackend::in_memory().unwrap();
2013
2014 let alice = create_test_entity("Alice", EntityType::Person);
2015 let acme = create_test_entity("Acme", EntityType::Organization);
2016
2017 backend.store_entity(&alice).unwrap();
2018 backend.store_entity(&acme).unwrap();
2019
2020 backend
2022 .store_relationship(&Relationship::new(
2023 alice.id.clone(),
2024 acme.id.clone(),
2025 RelationshipType::WorksAt,
2026 ))
2027 .unwrap();
2028 backend
2029 .store_relationship(&Relationship::new(
2030 alice.id.clone(),
2031 acme.id.clone(),
2032 RelationshipType::Created,
2033 ))
2034 .unwrap();
2035
2036 let types = backend.get_relationship_types(&alice.id, &acme.id).unwrap();
2037
2038 assert_eq!(types.len(), 2);
2039 assert!(types.contains(&RelationshipType::WorksAt));
2040 assert!(types.contains(&RelationshipType::Created));
2041 }
2042
2043 #[test]
2048 fn test_get_entities_in_memory() {
2049 let backend = SqliteGraphBackend::in_memory().unwrap();
2050
2051 let alice = create_test_entity("Alice", EntityType::Person);
2052 let bob = create_test_entity("Bob", EntityType::Person);
2053
2054 backend.store_entity(&alice).unwrap();
2055 backend.store_entity(&bob).unwrap();
2056
2057 let memory_id = MemoryId::new("mem_123");
2058
2059 backend
2060 .store_mention(&EntityMention::new(alice.id.clone(), memory_id.clone()))
2061 .unwrap();
2062 backend
2063 .store_mention(&EntityMention::new(bob.id.clone(), memory_id.clone()))
2064 .unwrap();
2065
2066 let entities = backend.get_entities_in_memory(&memory_id).unwrap();
2067
2068 assert_eq!(entities.len(), 2);
2069 }
2070
2071 #[test]
2072 fn test_delete_mentions_for_memory() {
2073 let backend = SqliteGraphBackend::in_memory().unwrap();
2074
2075 let alice = create_test_entity("Alice", EntityType::Person);
2076 backend.store_entity(&alice).unwrap();
2077
2078 let mem1 = MemoryId::new("mem_1");
2079 let mem2 = MemoryId::new("mem_2");
2080
2081 backend
2082 .store_mention(&EntityMention::new(alice.id.clone(), mem1.clone()))
2083 .unwrap();
2084 backend
2085 .store_mention(&EntityMention::new(alice.id.clone(), mem2.clone()))
2086 .unwrap();
2087
2088 let deleted = backend.delete_mentions_for_memory(&mem1).unwrap();
2089 assert_eq!(deleted, 1);
2090
2091 let remaining = backend.get_mentions_for_entity(&alice.id).unwrap();
2092 assert_eq!(remaining.len(), 1);
2093 assert_eq!(remaining[0].memory_id, mem2);
2094 }
2095
2096 #[test]
2101 fn test_traverse_with_relationship_filter() {
2102 let backend = SqliteGraphBackend::in_memory().unwrap();
2103
2104 let alice = create_test_entity("Alice", EntityType::Person);
2105 let acme = create_test_entity("Acme", EntityType::Organization);
2106 let bob = create_test_entity("Bob", EntityType::Person);
2107
2108 backend.store_entity(&alice).unwrap();
2109 backend.store_entity(&acme).unwrap();
2110 backend.store_entity(&bob).unwrap();
2111
2112 backend
2113 .store_relationship(&Relationship::new(
2114 alice.id.clone(),
2115 acme.id.clone(),
2116 RelationshipType::WorksAt,
2117 ))
2118 .unwrap();
2119 backend
2120 .store_relationship(&Relationship::new(
2121 alice.id.clone(),
2122 bob.id.clone(),
2123 RelationshipType::RelatesTo,
2124 ))
2125 .unwrap();
2126
2127 let result = backend
2129 .traverse(&alice.id, 2, Some(&[RelationshipType::WorksAt]), None)
2130 .unwrap();
2131
2132 assert_eq!(result.entities.len(), 2);
2133 assert!(result.entities.iter().any(|e| e.name == "Alice"));
2134 assert!(result.entities.iter().any(|e| e.name == "Acme"));
2135 assert!(!result.entities.iter().any(|e| e.name == "Bob"));
2136 }
2137
2138 #[test]
2139 fn test_traverse_with_confidence_filter() {
2140 let backend = SqliteGraphBackend::in_memory().unwrap();
2141
2142 let alice = create_test_entity("Alice", EntityType::Person);
2143 let bob = create_test_entity("Bob", EntityType::Person);
2144 let charlie = create_test_entity("Charlie", EntityType::Person);
2145
2146 backend.store_entity(&alice).unwrap();
2147 backend.store_entity(&bob).unwrap();
2148 backend.store_entity(&charlie).unwrap();
2149
2150 backend
2151 .store_relationship(
2152 &Relationship::new(
2153 alice.id.clone(),
2154 bob.id.clone(),
2155 RelationshipType::RelatesTo,
2156 )
2157 .with_confidence(0.9),
2158 )
2159 .unwrap();
2160 backend
2161 .store_relationship(
2162 &Relationship::new(
2163 alice.id.clone(),
2164 charlie.id.clone(),
2165 RelationshipType::RelatesTo,
2166 )
2167 .with_confidence(0.3),
2168 )
2169 .unwrap();
2170
2171 let result = backend.traverse(&alice.id, 2, None, Some(0.5)).unwrap();
2173
2174 assert_eq!(result.entities.len(), 2);
2175 assert!(result.entities.iter().any(|e| e.name == "Bob"));
2176 assert!(!result.entities.iter().any(|e| e.name == "Charlie"));
2177 }
2178
2179 #[test]
2180 fn test_traverse_depth_limit() {
2181 let backend = SqliteGraphBackend::in_memory().unwrap();
2182
2183 let a = create_test_entity("A", EntityType::Concept);
2184 let b = create_test_entity("B", EntityType::Concept);
2185 let c = create_test_entity("C", EntityType::Concept);
2186 let d = create_test_entity("D", EntityType::Concept);
2187
2188 backend.store_entity(&a).unwrap();
2189 backend.store_entity(&b).unwrap();
2190 backend.store_entity(&c).unwrap();
2191 backend.store_entity(&d).unwrap();
2192
2193 backend
2195 .store_relationship(&Relationship::new(
2196 a.id.clone(),
2197 b.id.clone(),
2198 RelationshipType::RelatesTo,
2199 ))
2200 .unwrap();
2201 backend
2202 .store_relationship(&Relationship::new(
2203 b.id.clone(),
2204 c.id.clone(),
2205 RelationshipType::RelatesTo,
2206 ))
2207 .unwrap();
2208 backend
2209 .store_relationship(&Relationship::new(
2210 c.id.clone(),
2211 d.id.clone(),
2212 RelationshipType::RelatesTo,
2213 ))
2214 .unwrap();
2215
2216 let result = backend.traverse(&a.id, 1, None, None).unwrap();
2218 assert_eq!(result.entities.len(), 2);
2219
2220 let result = backend.traverse(&a.id, 2, None, None).unwrap();
2222 assert_eq!(result.entities.len(), 3);
2223 }
2224
2225 #[test]
2226 fn test_find_path_no_path() {
2227 let backend = SqliteGraphBackend::in_memory().unwrap();
2228
2229 let alice = create_test_entity("Alice", EntityType::Person);
2230 let bob = create_test_entity("Bob", EntityType::Person);
2231
2232 backend.store_entity(&alice).unwrap();
2233 backend.store_entity(&bob).unwrap();
2234
2235 let path = backend.find_path(&alice.id, &bob.id, 5).unwrap();
2237 assert!(path.is_none());
2238 }
2239
2240 #[test]
2241 fn test_find_path_direct() {
2242 let backend = SqliteGraphBackend::in_memory().unwrap();
2243
2244 let alice = create_test_entity("Alice", EntityType::Person);
2245 let bob = create_test_entity("Bob", EntityType::Person);
2246
2247 backend.store_entity(&alice).unwrap();
2248 backend.store_entity(&bob).unwrap();
2249
2250 backend
2251 .store_relationship(&Relationship::new(
2252 alice.id.clone(),
2253 bob.id.clone(),
2254 RelationshipType::RelatesTo,
2255 ))
2256 .unwrap();
2257
2258 let path = backend.find_path(&alice.id, &bob.id, 5).unwrap();
2259 assert!(path.is_some());
2260 let path = path.unwrap();
2261 assert_eq!(path.entities.len(), 2);
2262 assert_eq!(path.relationships.len(), 1);
2263 }
2264
2265 #[test]
2270 fn test_close_entity_valid_time() {
2271 let backend = SqliteGraphBackend::in_memory().unwrap();
2272
2273 let entity = Entity::new(EntityType::Person, "Alice", Domain::for_user())
2274 .with_valid_time(ValidTimeRange::from(100));
2275
2276 backend.store_entity(&entity).unwrap();
2277
2278 backend.close_entity_valid_time(&entity.id, 200).unwrap();
2280
2281 let point = BitemporalPoint::new(150, i64::MAX);
2283 let results = backend
2284 .query_entities_at(&EntityQuery::new(), &point)
2285 .unwrap();
2286 assert_eq!(results.len(), 1);
2287
2288 let point = BitemporalPoint::new(250, i64::MAX);
2290 let results = backend
2291 .query_entities_at(&EntityQuery::new(), &point)
2292 .unwrap();
2293 assert_eq!(results.len(), 0);
2294 }
2295
2296 #[test]
2297 fn test_close_relationship_valid_time() {
2298 let backend = SqliteGraphBackend::in_memory().unwrap();
2299
2300 let alice = create_test_entity("Alice", EntityType::Person);
2301 let acme = create_test_entity("Acme", EntityType::Organization);
2302
2303 backend.store_entity(&alice).unwrap();
2304 backend.store_entity(&acme).unwrap();
2305
2306 let rel = Relationship::new(alice.id.clone(), acme.id.clone(), RelationshipType::WorksAt)
2307 .with_valid_time(ValidTimeRange::from(100));
2308
2309 backend.store_relationship(&rel).unwrap();
2310
2311 backend
2313 .close_relationship_valid_time(&alice.id, &acme.id, RelationshipType::WorksAt, 200)
2314 .unwrap();
2315
2316 let point = BitemporalPoint::new(150, i64::MAX);
2318 let results = backend
2319 .query_relationships_at(&RelationshipQuery::new(), &point)
2320 .unwrap();
2321 assert_eq!(results.len(), 1);
2322
2323 let point = BitemporalPoint::new(250, i64::MAX);
2325 let results = backend
2326 .query_relationships_at(&RelationshipQuery::new(), &point)
2327 .unwrap();
2328 assert_eq!(results.len(), 0);
2329 }
2330
2331 #[test]
2332 fn test_relationship_temporal_queries() {
2333 let backend = SqliteGraphBackend::in_memory().unwrap();
2334
2335 let alice = create_test_entity("Alice", EntityType::Person);
2336 let acme = create_test_entity("Acme", EntityType::Organization);
2337
2338 backend.store_entity(&alice).unwrap();
2339 backend.store_entity(&acme).unwrap();
2340
2341 let rel = Relationship::new(alice.id.clone(), acme.id.clone(), RelationshipType::WorksAt)
2342 .with_valid_time(ValidTimeRange::between(100, 200));
2343
2344 backend.store_relationship(&rel).unwrap();
2345
2346 let point = BitemporalPoint::new(150, i64::MAX);
2348 let results = backend
2349 .query_relationships_at(&RelationshipQuery::new(), &point)
2350 .unwrap();
2351 assert_eq!(results.len(), 1);
2352
2353 let point = BitemporalPoint::new(50, i64::MAX);
2355 let results = backend
2356 .query_relationships_at(&RelationshipQuery::new(), &point)
2357 .unwrap();
2358 assert_eq!(results.len(), 0);
2359 }
2360}