Skip to main content

subcog/storage/index/
sqlite.rs

1//! `SQLite` + FTS5 index backend.
2//!
3//! Provides full-text search using `SQLite`'s FTS5 extension.
4
5use crate::models::{Memory, MemoryId, SearchFilter};
6use crate::storage::traits::IndexBackend;
7use crate::{Error, Result};
8use chrono::{TimeZone, Utc};
9use rusqlite::{Connection, OptionalExtension, params};
10use std::path::{Path, PathBuf};
11use std::sync::{Mutex, MutexGuard};
12use std::time::{Duration, Instant};
13use tracing::instrument;
14
15/// Timeout for acquiring mutex lock (5 seconds).
16/// Reserved for future use when upgrading to `parking_lot::Mutex`.
17#[allow(dead_code)]
18const MUTEX_LOCK_TIMEOUT: Duration = Duration::from_secs(5);
19
20/// Helper to acquire mutex lock with poison recovery.
21///
22/// If the mutex is poisoned (due to a panic in a previous critical section),
23/// we recover the inner value and log a warning. This prevents cascading
24/// failures when one operation panics.
25fn acquire_lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
26    // First try to acquire lock normally
27    match mutex.lock() {
28        Ok(guard) => guard,
29        Err(poisoned) => {
30            // Recover from poison - this is safe because we log the issue
31            // and the connection state should still be valid
32            tracing::warn!("SQLite mutex was poisoned, recovering");
33            metrics::counter!("sqlite_mutex_poison_recovery_total").increment(1);
34            poisoned.into_inner()
35        },
36    }
37}
38
39/// Alternative lock acquisition with spin-wait timeout.
40///
41/// Note: Rust's `std::sync::Mutex` doesn't have a native `try_lock_for`,
42/// so we implement a spin-wait with sleep. For production, consider
43/// using `parking_lot::Mutex` which has proper timed locking.
44///
45/// Reserved for future use - currently using simpler `acquire_lock` with poison recovery.
46#[allow(dead_code)]
47fn acquire_lock_with_timeout<T>(mutex: &Mutex<T>, timeout: Duration) -> Result<MutexGuard<'_, T>> {
48    let start = Instant::now();
49    let sleep_duration = Duration::from_millis(10);
50
51    loop {
52        match mutex.try_lock() {
53            Ok(guard) => return Ok(guard),
54            Err(std::sync::TryLockError::Poisoned(poisoned)) => {
55                tracing::warn!("SQLite mutex was poisoned, recovering");
56                metrics::counter!("sqlite_mutex_poison_recovery_total").increment(1);
57                return Ok(poisoned.into_inner());
58            },
59            Err(std::sync::TryLockError::WouldBlock) => {
60                if start.elapsed() > timeout {
61                    metrics::counter!("sqlite_mutex_timeout_total").increment(1);
62                    return Err(Error::OperationFailed {
63                        operation: "acquire_lock".to_string(),
64                        cause: format!("Lock acquisition timed out after {timeout:?}"),
65                    });
66                }
67                std::thread::sleep(sleep_duration);
68            },
69        }
70    }
71}
72
73/// Escapes SQL LIKE wildcards in a string (SEC-M4).
74///
75/// `SQLite` LIKE patterns treat `%` as "any characters" and `_` as "single character".
76/// If user input contains these characters, they must be escaped to be treated literally.
77/// Uses `\` as the escape character (requires `ESCAPE '\'` in LIKE clause).
78///
79/// # Examples
80///
81/// ```ignore
82/// assert_eq!(escape_like_wildcards("100%"), "100\\%");
83/// assert_eq!(escape_like_wildcards("user_name"), "user\\_name");
84/// assert_eq!(escape_like_wildcards("path\\file"), "path\\\\file");
85/// ```
86fn escape_like_wildcards(s: &str) -> String {
87    let mut result = String::with_capacity(s.len());
88    for c in s.chars() {
89        match c {
90            '%' | '_' | '\\' => {
91                result.push('\\');
92                result.push(c);
93            },
94            _ => result.push(c),
95        }
96    }
97    result
98}
99
100/// Converts a glob pattern to a SQL LIKE pattern safely (HIGH-SEC-005).
101///
102/// First escapes existing SQL LIKE wildcards (`%`, `_`, `\`), then converts
103/// glob wildcards (`*` → `%`, `?` → `_`). This prevents SQL injection via
104/// patterns like `foo%bar` where `%` would otherwise be a SQL wildcard.
105///
106/// # Examples
107///
108/// ```ignore
109/// // Glob wildcards are converted
110/// assert_eq!(glob_to_like_pattern("src/*.rs"), "src/%\\.rs");
111/// // Literal % is escaped
112/// assert_eq!(glob_to_like_pattern("100%"), "100\\%");
113/// // Combined: literal % escaped, glob * converted
114/// assert_eq!(glob_to_like_pattern("foo%*bar"), "foo\\%%bar");
115/// ```
116fn glob_to_like_pattern(pattern: &str) -> String {
117    let mut result = String::with_capacity(pattern.len() * 2);
118    for c in pattern.chars() {
119        match c {
120            // Escape existing SQL LIKE wildcards (they're meant to be literal)
121            '%' | '_' | '\\' => {
122                result.push('\\');
123                result.push(c);
124            },
125            // Convert glob wildcards to SQL LIKE wildcards
126            '*' => result.push('%'),
127            '?' => result.push('_'),
128            _ => result.push(c),
129        }
130    }
131    result
132}
133
134/// `SQLite`-based index backend with FTS5.
135///
136/// # Concurrency Model
137///
138/// Uses a `Mutex<Connection>` for thread-safe access. While this serializes
139/// database operations, `SQLite`'s WAL mode and `busy_timeout` pragma mitigate
140/// contention:
141///
142/// - **WAL mode**: Allows concurrent readers with a single writer
143/// - **`busy_timeout`**: Waits up to 5 seconds for locks instead of failing immediately
144/// - **NORMAL synchronous**: Balances durability with performance
145///
146/// For high-throughput scenarios requiring true connection pooling, consider
147/// using `r2d2-rusqlite` or `deadpool-sqlite`. This would require refactoring
148/// to use `Pool<SqliteConnectionManager>` instead of `Mutex<Connection>`.
149pub struct SqliteBackend {
150    /// Connection to the `SQLite` database.
151    ///
152    /// Protected by Mutex because `rusqlite::Connection` is not `Sync`.
153    /// WAL mode and `busy_timeout` handle concurrent access gracefully.
154    conn: Mutex<Connection>,
155    /// Path to the `SQLite` database (None for in-memory).
156    db_path: Option<PathBuf>,
157}
158
159struct MemoryRow {
160    id: String,
161    namespace: String,
162    domain: Option<String>,
163    project_id: Option<String>,
164    branch: Option<String>,
165    file_path: Option<String>,
166    status: String,
167    created_at: i64,
168    tombstoned_at: Option<i64>,
169    expires_at: Option<i64>,
170    tags: Option<String>,
171    source: Option<String>,
172    content: String,
173    is_summary: bool,
174    source_memory_ids: Option<String>,
175    consolidation_timestamp: Option<i64>,
176    #[cfg(feature = "group-scope")]
177    group_id: Option<String>,
178}
179
180impl SqliteBackend {
181    /// Creates a new `SQLite` backend.
182    ///
183    /// # Errors
184    ///
185    /// Returns an error if the database cannot be opened or initialized.
186    pub fn new(db_path: impl Into<PathBuf>) -> Result<Self> {
187        let db_path = db_path.into();
188        let conn = Connection::open(&db_path).map_err(|e| Error::OperationFailed {
189            operation: "open_sqlite".to_string(),
190            cause: e.to_string(),
191        })?;
192
193        let backend = Self {
194            conn: Mutex::new(conn),
195            db_path: Some(db_path),
196        };
197
198        backend.initialize()?;
199        Ok(backend)
200    }
201
202    /// Creates an in-memory `SQLite` backend (useful for testing).
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if the database cannot be initialized.
207    pub fn in_memory() -> Result<Self> {
208        let conn = Connection::open_in_memory().map_err(|e| Error::OperationFailed {
209            operation: "open_sqlite_memory".to_string(),
210            cause: e.to_string(),
211        })?;
212
213        let backend = Self {
214            conn: Mutex::new(conn),
215            db_path: None,
216        };
217
218        backend.initialize()?;
219        Ok(backend)
220    }
221
222    /// Returns the database path.
223    #[must_use]
224    pub fn db_path(&self) -> Option<&Path> {
225        self.db_path.as_deref()
226    }
227
228    /// Initializes the database schema.
229    fn initialize(&self) -> Result<()> {
230        let conn = acquire_lock(&self.conn);
231
232        // Enable WAL mode for better concurrent read performance
233        // Note: pragma_update returns the result which we ignore - journal_mode returns
234        // a string like "wal" which would cause execute_batch to fail
235        let _ = conn.pragma_update(None, "journal_mode", "WAL");
236        let _ = conn.pragma_update(None, "synchronous", "NORMAL");
237        // Set busy timeout to 5 seconds to handle lock contention gracefully
238        // This prevents SQLITE_BUSY errors during high concurrent access
239        let _ = conn.pragma_update(None, "busy_timeout", "5000");
240
241        // Create the main table for memory metadata
242        conn.execute(
243            "CREATE TABLE IF NOT EXISTS memories (
244                id TEXT PRIMARY KEY,
245                namespace TEXT NOT NULL,
246                domain TEXT,
247                project_id TEXT,
248                branch TEXT,
249                file_path TEXT,
250                status TEXT NOT NULL,
251                created_at INTEGER NOT NULL,
252                tags TEXT,
253                source TEXT
254            )",
255            [],
256        )
257        .map_err(|e| Error::OperationFailed {
258            operation: "create_memories_table".to_string(),
259            cause: e.to_string(),
260        })?;
261
262        // Add source column if it doesn't exist (for migration)
263        let _ = conn.execute("ALTER TABLE memories ADD COLUMN source TEXT", []);
264
265        // Add facet columns if they don't exist (ADR-0048/0049)
266        let _ = conn.execute("ALTER TABLE memories ADD COLUMN project_id TEXT", []);
267        let _ = conn.execute("ALTER TABLE memories ADD COLUMN branch TEXT", []);
268        let _ = conn.execute("ALTER TABLE memories ADD COLUMN file_path TEXT", []);
269
270        // Add tombstoned_at column if it doesn't exist (ADR-0053)
271        let _ = conn.execute("ALTER TABLE memories ADD COLUMN tombstoned_at INTEGER", []);
272
273        // Add expires_at column for TTL-based expiration (Memory Expiration feature)
274        let _ = conn.execute("ALTER TABLE memories ADD COLUMN expires_at INTEGER", []);
275
276        // Add consolidation-related columns (summary nodes)
277        let _ = conn.execute(
278            "ALTER TABLE memories ADD COLUMN is_summary INTEGER DEFAULT 0",
279            [],
280        );
281        let _ = conn.execute("ALTER TABLE memories ADD COLUMN source_memory_ids TEXT", []);
282        let _ = conn.execute(
283            "ALTER TABLE memories ADD COLUMN consolidation_timestamp INTEGER",
284            [],
285        );
286
287        // Add group_id column for group-scoped memories (ADR-0057: Group Memory Graphs)
288        #[cfg(feature = "group-scope")]
289        let _ = conn.execute("ALTER TABLE memories ADD COLUMN group_id TEXT", []);
290
291        // Create FTS5 virtual table for full-text search (standalone, not synced with memories)
292        // Note: FTS5 virtual tables use inverted indexes for MATCH queries and don't support
293        // traditional B-tree indexes. Joins with the memories table use memories.id (PRIMARY KEY)
294        // which is already indexed. The FTS5 MATCH operation returns a small result set first,
295        // making the join efficient. See: https://sqlite.org/fts5.html
296        conn.execute(
297            "CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
298                id,
299                content,
300                tags
301            )",
302            [],
303        )
304        .map_err(|e| Error::OperationFailed {
305            operation: "create_fts_table".to_string(),
306            cause: e.to_string(),
307        })?;
308
309        // Create memory_edges table for relationship tracking (consolidation service)
310        conn.execute(
311            "CREATE TABLE IF NOT EXISTS memory_edges (
312                from_id TEXT NOT NULL,
313                to_id TEXT NOT NULL,
314                edge_type TEXT NOT NULL,
315                created_at INTEGER NOT NULL,
316                PRIMARY KEY (from_id, to_id, edge_type),
317                FOREIGN KEY (from_id) REFERENCES memories(id) ON DELETE CASCADE,
318                FOREIGN KEY (to_id) REFERENCES memories(id) ON DELETE CASCADE
319            )",
320            [],
321        )
322        .map_err(|e| Error::OperationFailed {
323            operation: "create_edges_table".to_string(),
324            cause: e.to_string(),
325        })?;
326
327        // Create indexes for common query patterns (DB-H1)
328        // NOTE: This must be called AFTER all tables are created (including memory_edges)
329        Self::create_indexes(&conn);
330
331        Ok(())
332    }
333
334    /// Creates indexes for optimized queries.
335    fn create_indexes(conn: &Connection) {
336        // Index on namespace for filtered searches
337        let _ = conn.execute(
338            "CREATE INDEX IF NOT EXISTS idx_memories_namespace ON memories(namespace)",
339            [],
340        );
341
342        // Index on domain for domain-scoped searches
343        let _ = conn.execute(
344            "CREATE INDEX IF NOT EXISTS idx_memories_domain ON memories(domain)",
345            [],
346        );
347
348        // Index on status for filtered searches
349        let _ = conn.execute(
350            "CREATE INDEX IF NOT EXISTS idx_memories_status ON memories(status)",
351            [],
352        );
353
354        // Index on created_at for time-based queries
355        let _ = conn.execute(
356            "CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at DESC)",
357            [],
358        );
359
360        // Partial index on tombstoned_at for cleanup queries
361        let _ = conn.execute(
362            "CREATE INDEX IF NOT EXISTS idx_memories_tombstoned_at ON memories(tombstoned_at) WHERE tombstoned_at IS NOT NULL",
363            [],
364        );
365
366        // Partial index on expires_at for expiration queries (TTL feature)
367        // Only indexes non-NULL values since we query for expired memories
368        let _ = conn.execute(
369            "CREATE INDEX IF NOT EXISTS idx_memories_expires_at ON memories(expires_at) WHERE expires_at IS NOT NULL",
370            [],
371        );
372
373        // Composite index for common filter patterns
374        let _ = conn.execute(
375            "CREATE INDEX IF NOT EXISTS idx_memories_namespace_status ON memories(namespace, status)",
376            [],
377        );
378
379        // Compound index for time-filtered namespace queries (Phase 15 fix)
380        let _ = conn.execute(
381            "CREATE INDEX IF NOT EXISTS idx_memories_namespace_created ON memories(namespace, created_at DESC)",
382            [],
383        );
384
385        // Compound index for source filtering with status (Phase 15 fix)
386        let _ = conn.execute(
387            "CREATE INDEX IF NOT EXISTS idx_memories_source_status ON memories(source, status)",
388            [],
389        );
390
391        // Facet indexes (ADR-0049)
392        let _ = conn.execute(
393            "CREATE INDEX IF NOT EXISTS idx_memories_project_id ON memories(project_id)",
394            [],
395        );
396        let _ = conn.execute(
397            "CREATE INDEX IF NOT EXISTS idx_memories_project_branch ON memories(project_id, branch)",
398            [],
399        );
400        let _ = conn.execute(
401            "CREATE INDEX IF NOT EXISTS idx_memories_file_path ON memories(file_path)",
402            [],
403        );
404
405        // Partial index for tombstoned memories (ADR-0053)
406        let _ = conn.execute(
407            "CREATE INDEX IF NOT EXISTS idx_memories_tombstoned ON memories(tombstoned_at) WHERE tombstoned_at IS NOT NULL",
408            [],
409        );
410
411        // Memory edges indexes for consolidation service
412        // Index on from_id for finding edges from a memory
413        let _ = conn.execute(
414            "CREATE INDEX IF NOT EXISTS idx_memory_edges_from_id ON memory_edges(from_id)",
415            [],
416        );
417
418        // Index on to_id for finding edges to a memory
419        let _ = conn.execute(
420            "CREATE INDEX IF NOT EXISTS idx_memory_edges_to_id ON memory_edges(to_id)",
421            [],
422        );
423
424        // Index on edge_type for filtering by relationship type
425        let _ = conn.execute(
426            "CREATE INDEX IF NOT EXISTS idx_memory_edges_edge_type ON memory_edges(edge_type)",
427            [],
428        );
429
430        // Compound index for querying edges by type from a source
431        let _ = conn.execute(
432            "CREATE INDEX IF NOT EXISTS idx_memory_edges_from_type ON memory_edges(from_id, edge_type)",
433            [],
434        );
435
436        // Compound index for querying edges by type to a target
437        let _ = conn.execute(
438            "CREATE INDEX IF NOT EXISTS idx_memory_edges_to_type ON memory_edges(to_id, edge_type)",
439            [],
440        );
441    }
442
443    /// Builds a WHERE clause from a search filter with numbered parameters.
444    /// Returns the clause string, the parameters, and the next parameter index.
445    fn build_filter_clause_numbered(
446        &self,
447        filter: &SearchFilter,
448        start_param: usize,
449    ) -> (String, Vec<String>, usize) {
450        let mut conditions = Vec::new();
451        let mut params = Vec::new();
452        let mut param_idx = start_param;
453
454        if !filter.namespaces.is_empty() {
455            let placeholders: Vec<String> = filter
456                .namespaces
457                .iter()
458                .map(|_| {
459                    let p = format!("?{param_idx}");
460                    param_idx += 1;
461                    p
462                })
463                .collect();
464            conditions.push(format!("m.namespace IN ({})", placeholders.join(",")));
465            for ns in &filter.namespaces {
466                params.push(ns.as_str().to_string());
467            }
468        }
469
470        if !filter.statuses.is_empty() {
471            let placeholders: Vec<String> = filter
472                .statuses
473                .iter()
474                .map(|_| {
475                    let p = format!("?{param_idx}");
476                    param_idx += 1;
477                    p
478                })
479                .collect();
480            conditions.push(format!("m.status IN ({})", placeholders.join(",")));
481            for s in &filter.statuses {
482                params.push(s.as_str().to_string());
483            }
484        }
485
486        // Tag filtering (AND logic - must have ALL tags)
487        // Use ',tag,' pattern with wrapped column to match whole tags only
488        // Escape LIKE wildcards in tags to prevent SQL injection (SEC-M4)
489        for tag in &filter.tags {
490            conditions.push(format!(
491                "(',' || m.tags || ',') LIKE ?{param_idx} ESCAPE '\\'"
492            ));
493            param_idx += 1;
494            params.push(format!("%,{},%", escape_like_wildcards(tag)));
495        }
496
497        // Tag filtering (OR logic - must have ANY tag)
498        if !filter.tags_any.is_empty() {
499            let or_conditions: Vec<String> = filter
500                .tags_any
501                .iter()
502                .map(|tag| {
503                    let cond = format!("(',' || m.tags || ',') LIKE ?{param_idx} ESCAPE '\\'");
504                    param_idx += 1;
505                    params.push(format!("%,{},%", escape_like_wildcards(tag)));
506                    cond
507                })
508                .collect();
509            conditions.push(format!("({})", or_conditions.join(" OR ")));
510        }
511
512        // Excluded tags (NOT LIKE) - match whole tags only
513        // Escape LIKE wildcards (SEC-M4)
514        for tag in &filter.excluded_tags {
515            conditions.push(format!(
516                "(',' || m.tags || ',') NOT LIKE ?{param_idx} ESCAPE '\\'"
517            ));
518            param_idx += 1;
519            params.push(format!("%,{},%", escape_like_wildcards(tag)));
520        }
521
522        // Source pattern (glob-style converted to SQL LIKE)
523        // HIGH-SEC-005: Use glob_to_like_pattern to escape SQL wildcards before conversion
524        if let Some(ref pattern) = filter.source_pattern {
525            conditions.push(format!("m.source LIKE ?{param_idx} ESCAPE '\\'"));
526            param_idx += 1;
527            params.push(glob_to_like_pattern(pattern));
528        }
529
530        if let Some(ref project_id) = filter.project_id {
531            conditions.push(format!("m.project_id = ?{param_idx}"));
532            param_idx += 1;
533            params.push(project_id.clone());
534        }
535
536        if let Some(ref branch) = filter.branch {
537            conditions.push(format!("m.branch = ?{param_idx}"));
538            param_idx += 1;
539            params.push(branch.clone());
540        }
541
542        if let Some(ref file_path) = filter.file_path {
543            conditions.push(format!("m.file_path = ?{param_idx}"));
544            param_idx += 1;
545            params.push(file_path.clone());
546        }
547
548        if let Some(after) = filter.created_after {
549            conditions.push(format!("m.created_at >= ?{param_idx}"));
550            param_idx += 1;
551            params.push(after.to_string());
552        }
553
554        if let Some(before) = filter.created_before {
555            conditions.push(format!("m.created_at <= ?{param_idx}"));
556            param_idx += 1;
557            params.push(before.to_string());
558        }
559
560        // Exclude tombstoned memories by default (ADR-0053)
561        if !filter.include_tombstoned {
562            conditions.push("m.status != 'tombstoned'".to_string());
563        }
564
565        let clause = if conditions.is_empty() {
566            String::new()
567        } else {
568            format!(" AND {}", conditions.join(" AND "))
569        };
570
571        (clause, params, param_idx)
572    }
573
574    fn record_operation_metrics(
575        &self,
576        operation: &'static str,
577        start: Instant,
578        status: &'static str,
579    ) {
580        metrics::counter!(
581            "storage_operations_total",
582            "backend" => "sqlite",
583            "operation" => operation,
584            "status" => status
585        )
586        .increment(1);
587        metrics::histogram!(
588            "storage_operation_duration_ms",
589            "backend" => "sqlite",
590            "operation" => operation,
591            "status" => status
592        )
593        .record(start.elapsed().as_secs_f64() * 1000.0);
594    }
595
596    /// Performs a WAL checkpoint to merge WAL file into main database (RES-M3).
597    ///
598    /// This is useful for:
599    /// - Graceful shutdown (ensure WAL is flushed)
600    /// - Periodic maintenance (prevent WAL file growth)
601    /// - Before backup operations
602    ///
603    /// Uses TRUNCATE mode which blocks briefly but ensures WAL is fully merged
604    /// and then truncated to zero bytes.
605    ///
606    /// # Returns
607    ///
608    /// Returns a tuple of (`pages_written`, `pages_remaining`) on success.
609    /// `pages_remaining` should be 0 if checkpoint completed fully.
610    ///
611    /// # Errors
612    ///
613    /// Returns an error if the checkpoint operation fails.
614    #[instrument(skip(self), fields(operation = "checkpoint", backend = "sqlite"))]
615    pub fn checkpoint(&self) -> Result<(u32, u32)> {
616        let start = Instant::now();
617        let conn = acquire_lock(&self.conn);
618
619        // PRAGMA wal_checkpoint(TRUNCATE) checkpoints and truncates the WAL file
620        // Returns: (busy, log_pages, checkpointed_pages)
621        // - busy: 0 if not blocked, 1 if another connection blocked us
622        // - log_pages: total pages in WAL
623        // - checkpointed_pages: pages successfully written to main database
624        let result: std::result::Result<(i32, i32, i32), _> =
625            conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", [], |row| {
626                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
627            });
628
629        match result {
630            Ok((busy, log_pages, checkpointed_pages)) => {
631                #[allow(clippy::cast_sign_loss)]
632                let (log, checkpointed) = (log_pages as u32, checkpointed_pages as u32);
633
634                tracing::info!(
635                    busy = busy,
636                    log_pages = log,
637                    checkpointed_pages = checkpointed,
638                    duration_ms = start.elapsed().as_millis(),
639                    "WAL checkpoint completed"
640                );
641
642                metrics::counter!(
643                    "sqlite_checkpoint_total",
644                    "status" => "success"
645                )
646                .increment(1);
647                metrics::gauge!("sqlite_wal_pages_checkpointed").set(f64::from(checkpointed));
648                metrics::histogram!("sqlite_checkpoint_duration_ms")
649                    .record(start.elapsed().as_secs_f64() * 1000.0);
650
651                Ok((checkpointed, log.saturating_sub(checkpointed)))
652            },
653            Err(e) => {
654                tracing::warn!(
655                    error = %e,
656                    duration_ms = start.elapsed().as_millis(),
657                    "WAL checkpoint failed"
658                );
659
660                metrics::counter!(
661                    "sqlite_checkpoint_total",
662                    "status" => "error"
663                )
664                .increment(1);
665
666                Err(Error::OperationFailed {
667                    operation: "wal_checkpoint".to_string(),
668                    cause: e.to_string(),
669                })
670            },
671        }
672    }
673
674    /// Returns the current WAL file size in pages.
675    ///
676    /// Useful for monitoring and deciding when to trigger a checkpoint.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if the query fails.
681    #[must_use]
682    pub fn wal_size(&self) -> Option<u32> {
683        let conn = acquire_lock(&self.conn);
684
685        // PRAGMA wal_checkpoint(PASSIVE) returns current state without blocking
686        let result: std::result::Result<(i32, i32, i32), _> =
687            conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", [], |row| {
688                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
689            });
690
691        result.ok().map(|(_, log_pages, _)| {
692            #[allow(clippy::cast_sign_loss)]
693            let pages = log_pages as u32;
694            pages
695        })
696    }
697
698    /// Checkpoints the WAL if it exceeds the given threshold in pages.
699    ///
700    /// Default `SQLite` auto-checkpoint threshold is 1000 pages (~4MB with 4KB pages).
701    /// This method allows explicit control over checkpointing.
702    ///
703    /// # Arguments
704    ///
705    /// * `threshold_pages` - Checkpoint if WAL size exceeds this number of pages.
706    ///
707    /// # Returns
708    ///
709    /// Returns `Some((pages_written, pages_remaining))` if checkpoint was performed,
710    /// `None` if WAL was below threshold.
711    ///
712    /// # Errors
713    ///
714    /// Returns an error if the checkpoint operation fails.
715    pub fn checkpoint_if_needed(&self, threshold_pages: u32) -> Result<Option<(u32, u32)>> {
716        if let Some(current_size) = self.wal_size()
717            && current_size > threshold_pages
718        {
719            tracing::debug!(
720                current_pages = current_size,
721                threshold = threshold_pages,
722                "WAL exceeds threshold, triggering checkpoint"
723            );
724            return self.checkpoint().map(Some);
725        }
726        Ok(None)
727    }
728
729    /// Stores a memory edge relationship in the database.
730    ///
731    /// Creates a directed edge from one memory to another with the specified edge type.
732    /// Used by the consolidation service to link original memories to their summaries.
733    ///
734    /// # Arguments
735    ///
736    /// * `from_id` - The source memory ID
737    /// * `to_id` - The target memory ID
738    /// * `edge_type` - The type of relationship
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the database operation fails or if the memory IDs don't exist.
743    ///
744    /// # Examples
745    ///
746    /// ```rust,ignore
747    /// use subcog::storage::index::SqliteBackend;
748    /// use subcog::models::{MemoryId, EdgeType};
749    ///
750    /// let backend = SqliteBackend::new("./index.db")?;
751    /// let from_id = MemoryId::new("original_memory");
752    /// let to_id = MemoryId::new("summary_memory");
753    ///
754    /// backend.store_edge(&from_id, &to_id, EdgeType::SummarizedBy)?;
755    /// # Ok::<(), subcog::Error>(())
756    /// ```
757    #[instrument(
758        skip(self),
759        fields(
760            operation = "store_edge",
761            from_id = %from_id.as_str(),
762            to_id = %to_id.as_str(),
763            edge_type = %edge_type.as_str()
764        )
765    )]
766    pub fn store_edge(
767        &self,
768        from_id: &MemoryId,
769        to_id: &MemoryId,
770        edge_type: crate::models::EdgeType,
771    ) -> Result<()> {
772        let conn = acquire_lock(&self.conn);
773
774        let created_at = crate::current_timestamp();
775        #[allow(clippy::cast_possible_wrap)]
776        let created_at_i64 = created_at as i64;
777
778        conn.execute(
779            "INSERT INTO memory_edges (from_id, to_id, edge_type, created_at)
780             VALUES (?1, ?2, ?3, ?4)
781             ON CONFLICT(from_id, to_id, edge_type) DO UPDATE SET created_at = ?4",
782            params![
783                from_id.as_str(),
784                to_id.as_str(),
785                edge_type.as_str(),
786                created_at_i64
787            ],
788        )
789        .map_err(|e| {
790            tracing::error!(
791                error = %e,
792                from_id = %from_id.as_str(),
793                to_id = %to_id.as_str(),
794                edge_type = %edge_type.as_str(),
795                "Failed to store memory edge"
796            );
797            Error::OperationFailed {
798                operation: "store_edge".to_string(),
799                cause: format!("Failed to insert edge: {e}"),
800            }
801        })?;
802
803        tracing::debug!(
804            from_id = %from_id.as_str(),
805            to_id = %to_id.as_str(),
806            edge_type = %edge_type.as_str(),
807            "Successfully stored memory edge"
808        );
809
810        Ok(())
811    }
812
813    /// Queries edges for a given memory ID and edge type.
814    ///
815    /// Returns a list of target memory IDs that have the specified edge type
816    /// relationship with the source memory ID.
817    ///
818    /// # Arguments
819    ///
820    /// * `from_id` - The source memory ID
821    /// * `edge_type` - The type of relationship to query
822    ///
823    /// # Errors
824    ///
825    /// Returns an error if the database query fails.
826    ///
827    /// # Examples
828    ///
829    /// ```rust,ignore
830    /// use subcog::storage::index::SqliteBackend;
831    /// use subcog::models::{MemoryId, EdgeType};
832    ///
833    /// let backend = SqliteBackend::new("./index.db")?;
834    /// let from_id = MemoryId::new("original_memory");
835    ///
836    /// let targets = backend.query_edges(&from_id, EdgeType::SummarizedBy)?;
837    /// # Ok::<(), subcog::Error>(())
838    /// ```
839    pub fn query_edges(
840        &self,
841        from_id: &MemoryId,
842        edge_type: crate::models::EdgeType,
843    ) -> Result<Vec<MemoryId>> {
844        let conn = acquire_lock(&self.conn);
845
846        let mut stmt = conn
847            .prepare("SELECT to_id FROM memory_edges WHERE from_id = ?1 AND edge_type = ?2")
848            .map_err(|e| Error::OperationFailed {
849                operation: "query_edges_prepare".to_string(),
850                cause: e.to_string(),
851            })?;
852
853        let results = stmt
854            .query_map(params![from_id.as_str(), edge_type.as_str()], |row| {
855                row.get::<_, String>(0)
856            })
857            .map_err(|e| Error::OperationFailed {
858                operation: "query_edges_map".to_string(),
859                cause: e.to_string(),
860            })?
861            .collect::<std::result::Result<Vec<String>, _>>()
862            .map_err(|e| Error::OperationFailed {
863                operation: "query_edges_collect".to_string(),
864                cause: e.to_string(),
865            })?;
866
867        Ok(results.into_iter().map(MemoryId::new).collect())
868    }
869}
870
871fn fetch_memory_row(conn: &Connection, id: &MemoryId) -> Result<Option<MemoryRow>> {
872    #[cfg(feature = "group-scope")]
873    let query = "SELECT m.id, m.namespace, m.domain, m.project_id, m.branch, m.file_path, m.status, m.created_at,
874                    m.tombstoned_at, m.expires_at, m.tags, m.source, f.content, m.is_summary, m.source_memory_ids, m.consolidation_timestamp, m.group_id
875             FROM memories m
876             JOIN memories_fts f ON m.id = f.id
877             WHERE m.id = ?1";
878    #[cfg(not(feature = "group-scope"))]
879    let query = "SELECT m.id, m.namespace, m.domain, m.project_id, m.branch, m.file_path, m.status, m.created_at,
880                    m.tombstoned_at, m.expires_at, m.tags, m.source, f.content, m.is_summary, m.source_memory_ids, m.consolidation_timestamp
881             FROM memories m
882             JOIN memories_fts f ON m.id = f.id
883             WHERE m.id = ?1";
884
885    let mut stmt = conn.prepare(query).map_err(|e| Error::OperationFailed {
886        operation: "prepare_get_memory".to_string(),
887        cause: e.to_string(),
888    })?;
889
890    let result: std::result::Result<Option<_>, _> = stmt
891        .query_row(params![id.as_str()], |row| {
892            Ok(MemoryRow {
893                id: row.get(0)?,
894                namespace: row.get(1)?,
895                domain: row.get(2)?,
896                project_id: row.get(3)?,
897                branch: row.get(4)?,
898                file_path: row.get(5)?,
899                status: row.get(6)?,
900                created_at: row.get(7)?,
901                tombstoned_at: row.get(8)?,
902                expires_at: row.get(9)?,
903                tags: row.get(10)?,
904                source: row.get(11)?,
905                content: row.get(12)?,
906                is_summary: row.get::<_, Option<bool>>(13)?.unwrap_or(false),
907                source_memory_ids: row.get(14)?,
908                consolidation_timestamp: row.get(15)?,
909                #[cfg(feature = "group-scope")]
910                group_id: row.get(16)?,
911            })
912        })
913        .optional();
914
915    result.map_err(|e| Error::OperationFailed {
916        operation: "get_memory".to_string(),
917        cause: e.to_string(),
918    })
919}
920
921fn build_memory_from_row(row: MemoryRow) -> Memory {
922    use crate::models::{Domain, MemoryStatus, Namespace};
923
924    let namespace = Namespace::parse(&row.namespace).unwrap_or_default();
925    let domain = row.domain.map_or_else(Domain::new, |d: String| {
926        if d.is_empty() || d == "project" {
927            Domain::new()
928        } else {
929            let parts: Vec<&str> = d.split('/').collect();
930            match parts.len() {
931                1 => Domain {
932                    organization: Some(parts[0].to_string()),
933                    project: None,
934                    repository: None,
935                },
936                2 => Domain {
937                    organization: Some(parts[0].to_string()),
938                    project: None,
939                    repository: Some(parts[1].to_string()),
940                },
941                _ => Domain::new(),
942            }
943        }
944    });
945
946    let status = match row.status.to_lowercase().as_str() {
947        "active" => MemoryStatus::Active,
948        "archived" => MemoryStatus::Archived,
949        "superseded" => MemoryStatus::Superseded,
950        "pending" => MemoryStatus::Pending,
951        "deleted" => MemoryStatus::Deleted,
952        "tombstoned" => MemoryStatus::Tombstoned,
953        "consolidated" => MemoryStatus::Consolidated,
954        _ => MemoryStatus::Active,
955    };
956
957    let tags: Vec<String> = row
958        .tags
959        .map(|t| {
960            t.split(',')
961                .map(|s| s.trim().to_string())
962                .filter(|s| !s.is_empty())
963                .collect()
964        })
965        .unwrap_or_default();
966
967    #[allow(clippy::cast_sign_loss)]
968    let created_at_u64 = row.created_at as u64;
969    let tombstoned_at = row
970        .tombstoned_at
971        .and_then(|ts| Utc.timestamp_opt(ts, 0).single());
972    #[allow(clippy::cast_sign_loss)]
973    let expires_at = row.expires_at.map(|t| t as u64);
974
975    Memory {
976        id: MemoryId::new(row.id),
977        content: row.content,
978        namespace,
979        domain,
980        project_id: row.project_id,
981        branch: row.branch,
982        file_path: row.file_path,
983        status,
984        created_at: created_at_u64,
985        updated_at: created_at_u64,
986        tombstoned_at,
987        expires_at,
988        embedding: None,
989        tags,
990        #[cfg(feature = "group-scope")]
991        group_id: row.group_id,
992        source: row.source,
993        is_summary: row.is_summary,
994        source_memory_ids: row.source_memory_ids.as_ref().map(|json| {
995            serde_json::from_str::<Vec<String>>(json)
996                .unwrap_or_default()
997                .into_iter()
998                .map(|s| MemoryId::new(&s))
999                .collect()
1000        }),
1001        #[allow(clippy::cast_sign_loss)]
1002        consolidation_timestamp: row.consolidation_timestamp.map(|t| t as u64),
1003    }
1004}
1005
1006impl IndexBackend for SqliteBackend {
1007    #[instrument(
1008        skip(self, memory),
1009        fields(
1010            operation = "index",
1011            backend = "sqlite",
1012            memory.id = %memory.id.as_str(),
1013            namespace = %memory.namespace.as_str(),
1014            domain = %memory.domain.to_string()
1015        )
1016    )]
1017    #[allow(clippy::too_many_lines)]
1018    fn index(&self, memory: &Memory) -> Result<()> {
1019        let start = Instant::now();
1020        let result = (|| {
1021            let conn = acquire_lock(&self.conn);
1022
1023            let tags_str = memory.tags.join(",");
1024            let domain_str = memory.domain.to_string();
1025
1026            // Use transaction for atomicity (DB-H2)
1027            conn.execute("BEGIN IMMEDIATE", [])
1028                .map_err(|e| Error::OperationFailed {
1029                    operation: "begin_transaction".to_string(),
1030                    cause: e.to_string(),
1031                })?;
1032
1033            let result = (|| {
1034                // Insert or replace in main table
1035                // Note: Cast u64 to i64 for SQLite compatibility (rusqlite doesn't impl ToSql for u64)
1036                #[allow(clippy::cast_possible_wrap)]
1037                let created_at_i64 = memory.created_at as i64;
1038                let tombstoned_at_i64 = memory.tombstoned_at.map(|t| t.timestamp());
1039                #[allow(clippy::cast_possible_wrap)]
1040                let consolidation_ts_i64 = memory.consolidation_timestamp.map(|t| t as i64);
1041                let source_ids_json = memory
1042                    .source_memory_ids
1043                    .as_ref()
1044                    .map(|ids| serde_json::to_string(ids).unwrap_or_default());
1045                let expires_at_i64 = memory.expires_at.map(u64::cast_signed);
1046                #[cfg(feature = "group-scope")]
1047                let group_id = memory.group_id.as_deref();
1048                #[cfg(feature = "group-scope")]
1049                conn.execute(
1050                    "INSERT OR REPLACE INTO memories (id, namespace, domain, project_id, branch, file_path, status, created_at, tags, source, tombstoned_at, expires_at, is_summary, source_memory_ids, consolidation_timestamp, group_id)
1051                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
1052                    params![
1053                        memory.id.as_str(),
1054                        memory.namespace.as_str(),
1055                        domain_str,
1056                        memory.project_id.as_deref(),
1057                        memory.branch.as_deref(),
1058                        memory.file_path.as_deref(),
1059                        memory.status.as_str(),
1060                        created_at_i64,
1061                        tags_str,
1062                        memory.source.as_deref(),
1063                        tombstoned_at_i64,
1064                        expires_at_i64,
1065                        memory.is_summary,
1066                        source_ids_json,
1067                        consolidation_ts_i64,
1068                        group_id
1069                    ],
1070                )
1071                .map_err(|e| Error::OperationFailed {
1072                    operation: "insert_memory".to_string(),
1073                    cause: e.to_string(),
1074                })?;
1075                #[cfg(not(feature = "group-scope"))]
1076                conn.execute(
1077                    "INSERT OR REPLACE INTO memories (id, namespace, domain, project_id, branch, file_path, status, created_at, tags, source, tombstoned_at, expires_at, is_summary, source_memory_ids, consolidation_timestamp)
1078                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
1079                    params![
1080                        memory.id.as_str(),
1081                        memory.namespace.as_str(),
1082                        domain_str,
1083                        memory.project_id.as_deref(),
1084                        memory.branch.as_deref(),
1085                        memory.file_path.as_deref(),
1086                        memory.status.as_str(),
1087                        created_at_i64,
1088                        tags_str,
1089                        memory.source.as_deref(),
1090                        tombstoned_at_i64,
1091                        expires_at_i64,
1092                        memory.is_summary,
1093                        source_ids_json,
1094                        consolidation_ts_i64
1095                    ],
1096                )
1097                .map_err(|e| Error::OperationFailed {
1098                    operation: "insert_memory".to_string(),
1099                    cause: e.to_string(),
1100                })?;
1101
1102                // Delete from FTS if exists (FTS5 uses rowid internally for matching)
1103                conn.execute(
1104                    "DELETE FROM memories_fts WHERE id = ?1",
1105                    params![memory.id.as_str()],
1106                )
1107                .map_err(|e| Error::OperationFailed {
1108                    operation: "delete_fts".to_string(),
1109                    cause: e.to_string(),
1110                })?;
1111
1112                // Insert into FTS table
1113                conn.execute(
1114                    "INSERT INTO memories_fts (id, content, tags) VALUES (?1, ?2, ?3)",
1115                    params![memory.id.as_str(), memory.content, tags_str],
1116                )
1117                .map_err(|e| Error::OperationFailed {
1118                    operation: "insert_fts".to_string(),
1119                    cause: e.to_string(),
1120                })?;
1121
1122                Ok(())
1123            })();
1124
1125            if result.is_ok() {
1126                conn.execute("COMMIT", [])
1127                    .map_err(|e| Error::OperationFailed {
1128                        operation: "commit_transaction".to_string(),
1129                        cause: e.to_string(),
1130                    })?;
1131            } else {
1132                let _ = conn.execute("ROLLBACK", []);
1133            }
1134
1135            result
1136        })();
1137
1138        let status = if result.is_ok() { "success" } else { "error" };
1139        self.record_operation_metrics("index", start, status);
1140        result
1141    }
1142
1143    #[instrument(skip(self), fields(operation = "remove", backend = "sqlite", memory.id = %id.as_str()))]
1144    fn remove(&self, id: &MemoryId) -> Result<bool> {
1145        let start = Instant::now();
1146        let result = (|| {
1147            let conn = acquire_lock(&self.conn);
1148
1149            // Use transaction for atomicity (DB-H2)
1150            conn.execute("BEGIN IMMEDIATE", [])
1151                .map_err(|e| Error::OperationFailed {
1152                    operation: "begin_transaction".to_string(),
1153                    cause: e.to_string(),
1154                })?;
1155
1156            let result = (|| {
1157                // Delete from FTS
1158                conn.execute(
1159                    "DELETE FROM memories_fts WHERE id = ?1",
1160                    params![id.as_str()],
1161                )
1162                .map_err(|e| Error::OperationFailed {
1163                    operation: "delete_fts".to_string(),
1164                    cause: e.to_string(),
1165                })?;
1166
1167                // Delete from main table
1168                let deleted = conn
1169                    .execute("DELETE FROM memories WHERE id = ?1", params![id.as_str()])
1170                    .map_err(|e| Error::OperationFailed {
1171                        operation: "delete_memory".to_string(),
1172                        cause: e.to_string(),
1173                    })?;
1174
1175                Ok(deleted > 0)
1176            })();
1177
1178            if result.is_ok() {
1179                conn.execute("COMMIT", [])
1180                    .map_err(|e| Error::OperationFailed {
1181                        operation: "commit_transaction".to_string(),
1182                        cause: e.to_string(),
1183                    })?;
1184            } else {
1185                let _ = conn.execute("ROLLBACK", []);
1186            }
1187
1188            result
1189        })();
1190
1191        let status = if result.is_ok() { "success" } else { "error" };
1192        self.record_operation_metrics("remove", start, status);
1193        result
1194    }
1195
1196    #[instrument(
1197        skip(self, query, filter),
1198        fields(operation = "search", backend = "sqlite", query_length = query.len(), limit = limit)
1199    )]
1200    fn search(
1201        &self,
1202        query: &str,
1203        filter: &SearchFilter,
1204        limit: usize,
1205    ) -> Result<Vec<(MemoryId, f32)>> {
1206        let start = Instant::now();
1207        let result = (|| {
1208            let conn = acquire_lock(&self.conn);
1209
1210            // Build filter clause with numbered parameters starting from ?2
1211            // ?1 is the FTS query
1212            let (filter_clause, filter_params, next_param) =
1213                self.build_filter_clause_numbered(filter, 2);
1214
1215            // Use FTS5 MATCH for search with BM25 ranking
1216            // Limit parameter comes after all filter parameters
1217            let sql = format!(
1218                "SELECT f.id, bm25(memories_fts) as score
1219                 FROM memories_fts f
1220                 JOIN memories m ON f.id = m.id
1221                 WHERE memories_fts MATCH ?1 {filter_clause}
1222                 ORDER BY score
1223                 LIMIT ?{next_param}"
1224            );
1225
1226            let mut stmt = conn.prepare(&sql).map_err(|e| Error::OperationFailed {
1227                operation: "prepare_search".to_string(),
1228                cause: e.to_string(),
1229            })?;
1230
1231            // Build parameters: query, filter params, limit
1232            let mut results = Vec::new();
1233
1234            // FTS5 query - escape special characters and wrap terms in quotes
1235            // FTS5 special chars: - (NOT), * (prefix), " (phrase), : (column)
1236            // Pre-allocate: each term becomes ~term.len() + 6 chars ("term" OR )
1237            let terms: Vec<_> = query.split_whitespace().collect();
1238            let estimated_len = terms.iter().map(|t| t.len() + 8).sum::<usize>();
1239            let mut fts_query = String::with_capacity(estimated_len);
1240            for (i, term) in terms.iter().enumerate() {
1241                if i > 0 {
1242                    fts_query.push_str(" OR ");
1243                }
1244                fts_query.push('"');
1245                // Escape double quotes for literal matching
1246                for c in term.chars() {
1247                    if c == '"' {
1248                        fts_query.push_str("\"\"");
1249                    } else {
1250                        fts_query.push(c);
1251                    }
1252                }
1253                fts_query.push('"');
1254            }
1255
1256            let rows = stmt
1257                .query_map(
1258                    rusqlite::params_from_iter(
1259                        std::iter::once(fts_query)
1260                            .chain(filter_params.into_iter())
1261                            .chain(std::iter::once(limit.to_string())),
1262                    ),
1263                    |row| {
1264                        let id: String = row.get(0)?;
1265                        let score: f64 = row.get(1)?;
1266                        Ok((id, score))
1267                    },
1268                )
1269                .map_err(|e| Error::OperationFailed {
1270                    operation: "execute_search".to_string(),
1271                    cause: e.to_string(),
1272                })?;
1273
1274            for row in rows {
1275                let (id, score) = row.map_err(|e| Error::OperationFailed {
1276                    operation: "read_search_row".to_string(),
1277                    cause: e.to_string(),
1278                })?;
1279
1280                // Normalize BM25 score (DB-H3 fix)
1281                // SQLite FTS5 bm25() returns negative values where MORE NEGATIVE = BETTER MATCH
1282                // For example: -10.0 is a better match than -2.0
1283                //
1284                // We negate and apply sigmoid normalization to map to 0-1 range:
1285                // - Negate: makes higher values = better matches
1286                // - Sigmoid: 1.0 / (1.0 + e^(-k*x)) where k controls steepness
1287                // - This gives values in (0, 1) with ~0.5 for score=0
1288                #[allow(clippy::cast_possible_truncation)]
1289                let normalized_score = {
1290                    // Negate so higher = better, apply sigmoid with k=0.5 for gentle curve
1291                    let positive_score = -score;
1292                    let sigmoid = 1.0 / (1.0 + (-0.5 * positive_score).exp());
1293                    sigmoid.clamp(0.0, 1.0) as f32
1294                };
1295
1296                results.push((MemoryId::new(id), normalized_score));
1297            }
1298
1299            // Apply minimum score filter if specified
1300            if let Some(min_score) = filter.min_score {
1301                results.retain(|(_, score)| *score >= min_score);
1302            }
1303
1304            Ok(results)
1305        })();
1306
1307        let status = if result.is_ok() { "success" } else { "error" };
1308        self.record_operation_metrics("search", start, status);
1309        result
1310    }
1311
1312    #[instrument(skip(self), fields(operation = "clear", backend = "sqlite"))]
1313    fn clear(&self) -> Result<()> {
1314        let start = Instant::now();
1315        let result = (|| {
1316            let conn = acquire_lock(&self.conn);
1317
1318            // Use transaction for atomicity (DB-H2)
1319            conn.execute("BEGIN IMMEDIATE", [])
1320                .map_err(|e| Error::OperationFailed {
1321                    operation: "begin_transaction".to_string(),
1322                    cause: e.to_string(),
1323                })?;
1324
1325            let result = (|| {
1326                conn.execute("DELETE FROM memories_fts", []).map_err(|e| {
1327                    Error::OperationFailed {
1328                        operation: "clear_fts".to_string(),
1329                        cause: e.to_string(),
1330                    }
1331                })?;
1332
1333                conn.execute("DELETE FROM memories", [])
1334                    .map_err(|e| Error::OperationFailed {
1335                        operation: "clear_memories".to_string(),
1336                        cause: e.to_string(),
1337                    })?;
1338
1339                Ok(())
1340            })();
1341
1342            if result.is_ok() {
1343                conn.execute("COMMIT", [])
1344                    .map_err(|e| Error::OperationFailed {
1345                        operation: "commit_transaction".to_string(),
1346                        cause: e.to_string(),
1347                    })?;
1348            } else {
1349                let _ = conn.execute("ROLLBACK", []);
1350            }
1351
1352            result
1353        })();
1354
1355        let status = if result.is_ok() { "success" } else { "error" };
1356        self.record_operation_metrics("clear", start, status);
1357        result
1358    }
1359
1360    #[instrument(
1361        skip(self, filter),
1362        fields(operation = "list_all", backend = "sqlite", limit = limit)
1363    )]
1364    fn list_all(&self, filter: &SearchFilter, limit: usize) -> Result<Vec<(MemoryId, f32)>> {
1365        let start = Instant::now();
1366        let result = (|| {
1367            let conn = acquire_lock(&self.conn);
1368            let max_limit = usize::try_from(i64::MAX).unwrap_or(usize::MAX);
1369            let limit = limit.min(max_limit);
1370
1371            // Build filter clause (starting at parameter 1, no FTS query)
1372            let (filter_clause, filter_params, next_param) =
1373                self.build_filter_clause_numbered(filter, 1);
1374
1375            // Query all memories without FTS MATCH, ordered by created_at desc
1376            let sql = format!(
1377                "SELECT m.id, 1.0 as score
1378                 FROM memories m
1379                 WHERE 1=1 {filter_clause}
1380                 ORDER BY m.created_at DESC
1381                 LIMIT ?{next_param}"
1382            );
1383
1384            let mut stmt = conn.prepare(&sql).map_err(|e| Error::OperationFailed {
1385                operation: "prepare_list_all".to_string(),
1386                cause: e.to_string(),
1387            })?;
1388
1389            let mut results = Vec::new();
1390
1391            let rows = stmt
1392                .query_map(
1393                    rusqlite::params_from_iter(
1394                        filter_params
1395                            .into_iter()
1396                            .chain(std::iter::once(limit.to_string())),
1397                    ),
1398                    |row| {
1399                        let id: String = row.get(0)?;
1400                        let score: f64 = row.get(1)?;
1401                        Ok((id, score))
1402                    },
1403                )
1404                .map_err(|e| Error::OperationFailed {
1405                    operation: "list_all".to_string(),
1406                    cause: e.to_string(),
1407                })?;
1408
1409            for row in rows {
1410                let (id, score) = row.map_err(|e| Error::OperationFailed {
1411                    operation: "read_list_row".to_string(),
1412                    cause: e.to_string(),
1413                })?;
1414
1415                #[allow(clippy::cast_possible_truncation)]
1416                results.push((MemoryId::new(id), score as f32));
1417            }
1418
1419            Ok(results)
1420        })();
1421
1422        let status = if result.is_ok() { "success" } else { "error" };
1423        self.record_operation_metrics("list_all", start, status);
1424        result
1425    }
1426
1427    #[instrument(skip(self), fields(operation = "get_memory", backend = "sqlite", memory.id = %id.as_str()))]
1428    fn get_memory(&self, id: &MemoryId) -> Result<Option<Memory>> {
1429        let start = Instant::now();
1430        let result = (|| {
1431            let conn = acquire_lock(&self.conn);
1432            let row = fetch_memory_row(&conn, id)?;
1433            Ok(row.map(build_memory_from_row))
1434        })();
1435
1436        let status = if result.is_ok() { "success" } else { "error" };
1437        self.record_operation_metrics("get_memory", start, status);
1438        result
1439    }
1440
1441    /// Retrieves multiple memories in a single batch query (PERF-C1 fix).
1442    ///
1443    /// Uses a single SQL query with IN clause instead of N individual queries.
1444    #[instrument(skip(self, ids), fields(operation = "get_memories_batch", backend = "sqlite", count = ids.len()))]
1445    fn get_memories_batch(&self, ids: &[MemoryId]) -> Result<Vec<Option<Memory>>> {
1446        let start = Instant::now();
1447
1448        if ids.is_empty() {
1449            return Ok(Vec::new());
1450        }
1451
1452        let result = (|| {
1453            let conn = acquire_lock(&self.conn);
1454
1455            // Build placeholders for IN clause
1456            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{i}")).collect();
1457
1458            #[cfg(feature = "group-scope")]
1459            let sql = format!(
1460                "SELECT m.id, m.namespace, m.domain, m.project_id, m.branch, m.file_path, m.status, m.created_at,
1461                        m.tombstoned_at, m.expires_at, m.tags, m.source, f.content, m.is_summary, m.source_memory_ids, m.consolidation_timestamp, m.group_id
1462                 FROM memories m
1463                 JOIN memories_fts f ON m.id = f.id
1464                 WHERE m.id IN ({})",
1465                placeholders.join(", ")
1466            );
1467            #[cfg(not(feature = "group-scope"))]
1468            let sql = format!(
1469                "SELECT m.id, m.namespace, m.domain, m.project_id, m.branch, m.file_path, m.status, m.created_at,
1470                        m.tombstoned_at, m.expires_at, m.tags, m.source, f.content, m.is_summary, m.source_memory_ids, m.consolidation_timestamp
1471                 FROM memories m
1472                 JOIN memories_fts f ON m.id = f.id
1473                 WHERE m.id IN ({})",
1474                placeholders.join(", ")
1475            );
1476
1477            let mut stmt = conn.prepare(&sql).map_err(|e| Error::OperationFailed {
1478                operation: "prepare_get_memories_batch".to_string(),
1479                cause: e.to_string(),
1480            })?;
1481
1482            // Collect results into a HashMap for O(1) lookup
1483            let id_strs: Vec<&str> = ids.iter().map(MemoryId::as_str).collect();
1484            let mut memory_map: std::collections::HashMap<String, Memory> =
1485                std::collections::HashMap::with_capacity(ids.len());
1486
1487            let rows = stmt
1488                .query_map(rusqlite::params_from_iter(id_strs.iter()), |row| {
1489                    Ok(MemoryRow {
1490                        id: row.get(0)?,
1491                        namespace: row.get(1)?,
1492                        domain: row.get(2)?,
1493                        project_id: row.get(3)?,
1494                        branch: row.get(4)?,
1495                        file_path: row.get(5)?,
1496                        status: row.get(6)?,
1497                        created_at: row.get(7)?,
1498                        tombstoned_at: row.get(8)?,
1499                        expires_at: row.get(9)?,
1500                        tags: row.get(10)?,
1501                        source: row.get(11)?,
1502                        content: row.get(12)?,
1503                        is_summary: row.get::<_, Option<bool>>(13)?.unwrap_or(false),
1504                        source_memory_ids: row.get(14)?,
1505                        consolidation_timestamp: row.get(15)?,
1506                        #[cfg(feature = "group-scope")]
1507                        group_id: row.get(16)?,
1508                    })
1509                })
1510                .map_err(|e| Error::OperationFailed {
1511                    operation: "execute_get_memories_batch".to_string(),
1512                    cause: e.to_string(),
1513                })?;
1514
1515            for row in rows {
1516                let memory_row = row.map_err(|e| Error::OperationFailed {
1517                    operation: "read_batch_row".to_string(),
1518                    cause: e.to_string(),
1519                })?;
1520                let id = memory_row.id.clone();
1521                memory_map.insert(id, build_memory_from_row(memory_row));
1522            }
1523
1524            // Return memories in the same order as input IDs
1525            Ok(ids
1526                .iter()
1527                .map(|id| memory_map.remove(id.as_str()))
1528                .collect())
1529        })();
1530
1531        let status = if result.is_ok() { "success" } else { "error" };
1532        self.record_operation_metrics("get_memories_batch", start, status);
1533        result
1534    }
1535
1536    /// Re-indexes all memories in a single transaction (DB-H2).
1537    ///
1538    /// This is more efficient than the default implementation which creates
1539    /// a transaction per memory.
1540    #[instrument(skip(self, memories), fields(operation = "reindex", backend = "sqlite", count = memories.len()))]
1541    fn reindex(&self, memories: &[Memory]) -> Result<()> {
1542        let start = Instant::now();
1543
1544        if memories.is_empty() {
1545            return Ok(());
1546        }
1547
1548        let result = (|| {
1549            let conn = acquire_lock(&self.conn);
1550
1551            // Use a single transaction for all operations
1552            conn.execute("BEGIN IMMEDIATE", [])
1553                .map_err(|e| Error::OperationFailed {
1554                    operation: "begin_transaction".to_string(),
1555                    cause: e.to_string(),
1556                })?;
1557
1558            let result = (|| {
1559                for memory in memories {
1560                    let tags_str = memory.tags.join(",");
1561                    let domain_str = memory.domain.to_string();
1562
1563                    // Insert or replace in main table
1564                    // Note: Cast u64 to i64 for SQLite compatibility (rusqlite doesn't impl ToSql for u64)
1565                    #[allow(clippy::cast_possible_wrap)]
1566                    let created_at_i64 = memory.created_at as i64;
1567                    let tombstoned_at_i64 = memory.tombstoned_at.map(|t| t.timestamp());
1568                    #[allow(clippy::cast_possible_wrap)]
1569                    let consolidation_ts_i64 = memory.consolidation_timestamp.map(|t| t as i64);
1570                    let source_ids_json = memory
1571                        .source_memory_ids
1572                        .as_ref()
1573                        .map(|ids| serde_json::to_string(ids).unwrap_or_default());
1574                    let expires_at_i64 = memory.expires_at.map(u64::cast_signed);
1575                    conn.execute(
1576                        "INSERT OR REPLACE INTO memories (id, namespace, domain, project_id, branch, file_path, status, created_at, tags, source, tombstoned_at, expires_at, is_summary, source_memory_ids, consolidation_timestamp)
1577                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
1578                        params![
1579                            memory.id.as_str(),
1580                            memory.namespace.as_str(),
1581                            domain_str,
1582                            memory.project_id.as_deref(),
1583                            memory.branch.as_deref(),
1584                            memory.file_path.as_deref(),
1585                            memory.status.as_str(),
1586                            created_at_i64,
1587                            tags_str,
1588                            memory.source.as_deref(),
1589                            tombstoned_at_i64,
1590                            expires_at_i64,
1591                            memory.is_summary,
1592                            source_ids_json,
1593                            consolidation_ts_i64
1594                        ],
1595                    )
1596                    .map_err(|e| Error::OperationFailed {
1597                        operation: "insert_memory".to_string(),
1598                        cause: e.to_string(),
1599                    })?;
1600
1601                    // Delete from FTS if exists
1602                    conn.execute(
1603                        "DELETE FROM memories_fts WHERE id = ?1",
1604                        params![memory.id.as_str()],
1605                    )
1606                    .map_err(|e| Error::OperationFailed {
1607                        operation: "delete_fts".to_string(),
1608                        cause: e.to_string(),
1609                    })?;
1610
1611                    // Insert into FTS table
1612                    conn.execute(
1613                        "INSERT INTO memories_fts (id, content, tags) VALUES (?1, ?2, ?3)",
1614                        params![memory.id.as_str(), memory.content, tags_str],
1615                    )
1616                    .map_err(|e| Error::OperationFailed {
1617                        operation: "insert_fts".to_string(),
1618                        cause: e.to_string(),
1619                    })?;
1620                }
1621                Ok(())
1622            })();
1623
1624            if result.is_ok() {
1625                conn.execute("COMMIT", [])
1626                    .map_err(|e| Error::OperationFailed {
1627                        operation: "commit_transaction".to_string(),
1628                        cause: e.to_string(),
1629                    })?;
1630            } else {
1631                let _ = conn.execute("ROLLBACK", []);
1632            }
1633
1634            result
1635        })();
1636
1637        let status = if result.is_ok() { "success" } else { "error" };
1638        self.record_operation_metrics("reindex", start, status);
1639        result
1640    }
1641}
1642
1643// Implement PersistenceBackend for SqliteBackend so it can be used with ConsolidationService
1644impl crate::storage::traits::PersistenceBackend for SqliteBackend {
1645    fn store(&self, memory: &Memory) -> Result<()> {
1646        // Delegate to index() which stores the full memory
1647        self.index(memory)
1648    }
1649
1650    fn get(&self, id: &MemoryId) -> Result<Option<Memory>> {
1651        self.get_memory(id)
1652    }
1653
1654    fn delete(&self, id: &MemoryId) -> Result<bool> {
1655        self.remove(id)
1656    }
1657
1658    fn list_ids(&self) -> Result<Vec<MemoryId>> {
1659        let start = Instant::now();
1660        let result = (|| {
1661            let conn = self.conn.lock().map_err(|e| Error::OperationFailed {
1662                operation: "lock_connection".to_string(),
1663                cause: e.to_string(),
1664            })?;
1665
1666            let mut stmt =
1667                conn.prepare("SELECT id FROM memories")
1668                    .map_err(|e| Error::OperationFailed {
1669                        operation: "prepare_list_ids".to_string(),
1670                        cause: e.to_string(),
1671                    })?;
1672
1673            let ids: Vec<MemoryId> = stmt
1674                .query_map([], |row| {
1675                    let id: String = row.get(0)?;
1676                    Ok(MemoryId::new(&id))
1677                })
1678                .map_err(|e| Error::OperationFailed {
1679                    operation: "list_ids".to_string(),
1680                    cause: e.to_string(),
1681                })?
1682                .filter_map(std::result::Result::ok)
1683                .collect();
1684
1685            Ok(ids)
1686        })();
1687
1688        let status = if result.is_ok() { "success" } else { "error" };
1689        self.record_operation_metrics("list_ids", start, status);
1690        result
1691    }
1692}
1693
1694#[cfg(test)]
1695mod tests {
1696    use super::*;
1697    use crate::models::{Domain, MemoryStatus, Namespace};
1698
1699    fn create_test_memory(id: &str, content: &str, namespace: Namespace) -> Memory {
1700        Memory {
1701            id: MemoryId::new(id),
1702            content: content.to_string(),
1703            namespace,
1704            domain: Domain::new(),
1705            project_id: None,
1706            branch: None,
1707            file_path: None,
1708            status: MemoryStatus::Active,
1709            created_at: 1_234_567_890,
1710            updated_at: 1_234_567_890,
1711            tombstoned_at: None,
1712            expires_at: None,
1713            embedding: None,
1714            tags: vec!["test".to_string()],
1715            #[cfg(feature = "group-scope")]
1716            group_id: None,
1717            source: None,
1718            is_summary: false,
1719            source_memory_ids: None,
1720            consolidation_timestamp: None,
1721        }
1722    }
1723
1724    #[test]
1725    fn test_index_and_search() {
1726        let backend = SqliteBackend::in_memory().unwrap();
1727
1728        let memory1 = create_test_memory("id1", "Rust programming language", Namespace::Decisions);
1729        let memory2 = create_test_memory("id2", "Python scripting", Namespace::Learnings);
1730        let memory3 =
1731            create_test_memory("id3", "Rust ownership and borrowing", Namespace::Patterns);
1732
1733        backend.index(&memory1).unwrap();
1734        backend.index(&memory2).unwrap();
1735        backend.index(&memory3).unwrap();
1736
1737        // Search for "Rust"
1738        let results = backend.search("Rust", &SearchFilter::new(), 10).unwrap();
1739
1740        assert_eq!(results.len(), 2);
1741        let ids: Vec<_> = results.iter().map(|(id, _)| id.as_str()).collect();
1742        assert!(ids.contains(&"id1"));
1743        assert!(ids.contains(&"id3"));
1744    }
1745
1746    #[test]
1747    fn test_search_with_namespace_filter() {
1748        let backend = SqliteBackend::in_memory().unwrap();
1749
1750        let memory1 = create_test_memory("id1", "Rust programming", Namespace::Decisions);
1751        let memory2 = create_test_memory("id2", "Rust patterns", Namespace::Patterns);
1752
1753        backend.index(&memory1).unwrap();
1754        backend.index(&memory2).unwrap();
1755
1756        // Search with namespace filter
1757        let filter = SearchFilter::new().with_namespace(Namespace::Patterns);
1758        let results = backend.search("Rust", &filter, 10).unwrap();
1759
1760        assert_eq!(results.len(), 1);
1761        assert_eq!(results[0].0.as_str(), "id2");
1762    }
1763
1764    #[test]
1765    fn test_search_with_facet_filters() {
1766        let backend = SqliteBackend::in_memory().unwrap();
1767
1768        let mut memory = create_test_memory("id1", "Rust facets", Namespace::Decisions);
1769        memory.project_id = Some("github.com/org/repo".to_string());
1770        memory.branch = Some("main".to_string());
1771        memory.file_path = Some("src/lib.rs".to_string());
1772
1773        backend.index(&memory).unwrap();
1774
1775        let filter = SearchFilter::new()
1776            .with_project_id("github.com/org/repo")
1777            .with_branch("main")
1778            .with_file_path("src/lib.rs");
1779
1780        let results = backend.search("Rust", &filter, 10).unwrap();
1781        assert_eq!(results.len(), 1);
1782        assert_eq!(results[0].0.as_str(), "id1");
1783    }
1784
1785    #[test]
1786    fn test_remove() {
1787        let backend = SqliteBackend::in_memory().unwrap();
1788
1789        let memory = create_test_memory("to_remove", "Test content", Namespace::Decisions);
1790        backend.index(&memory).unwrap();
1791
1792        // Verify it exists
1793        let results = backend.search("content", &SearchFilter::new(), 10).unwrap();
1794        assert_eq!(results.len(), 1);
1795
1796        // Remove it
1797        let removed = backend.remove(&MemoryId::new("to_remove")).unwrap();
1798        assert!(removed);
1799
1800        // Verify it's gone
1801        let results = backend.search("content", &SearchFilter::new(), 10).unwrap();
1802        assert!(results.is_empty());
1803    }
1804
1805    #[test]
1806    fn test_clear() {
1807        let backend = SqliteBackend::in_memory().unwrap();
1808
1809        backend
1810            .index(&create_test_memory("id1", "content1", Namespace::Decisions))
1811            .unwrap();
1812        backend
1813            .index(&create_test_memory("id2", "content2", Namespace::Decisions))
1814            .unwrap();
1815
1816        backend.clear().unwrap();
1817
1818        let results = backend.search("content", &SearchFilter::new(), 10).unwrap();
1819        assert!(results.is_empty());
1820    }
1821
1822    #[test]
1823    fn test_reindex() {
1824        let backend = SqliteBackend::in_memory().unwrap();
1825
1826        let memories = vec![
1827            create_test_memory("id1", "memory one", Namespace::Decisions),
1828            create_test_memory("id2", "memory two", Namespace::Learnings),
1829            create_test_memory("id3", "memory three", Namespace::Patterns),
1830        ];
1831
1832        backend.reindex(&memories).unwrap();
1833
1834        let results = backend.search("memory", &SearchFilter::new(), 10).unwrap();
1835        assert_eq!(results.len(), 3);
1836    }
1837
1838    #[test]
1839    fn test_list_all_with_max_limit() {
1840        let backend = SqliteBackend::in_memory().unwrap();
1841
1842        backend
1843            .index(&create_test_memory(
1844                "id1",
1845                "memory one",
1846                Namespace::Decisions,
1847            ))
1848            .unwrap();
1849
1850        let results = backend.list_all(&SearchFilter::new(), usize::MAX).unwrap();
1851
1852        assert_eq!(results.len(), 1);
1853        assert_eq!(results[0].0.as_str(), "id1");
1854    }
1855
1856    #[test]
1857    fn test_update_index() {
1858        let backend = SqliteBackend::in_memory().unwrap();
1859
1860        let mut memory =
1861            create_test_memory("update_test", "original content", Namespace::Decisions);
1862        backend.index(&memory).unwrap();
1863
1864        // Update the memory
1865        memory.content = "updated content completely different".to_string();
1866        backend.index(&memory).unwrap();
1867
1868        // Search for old content should not find it
1869        let old_results = backend
1870            .search("original", &SearchFilter::new(), 10)
1871            .unwrap();
1872        assert!(old_results.is_empty());
1873
1874        // Search for new content should find it
1875        let new_results = backend
1876            .search("different", &SearchFilter::new(), 10)
1877            .unwrap();
1878        assert_eq!(new_results.len(), 1);
1879    }
1880
1881    #[test]
1882    fn test_get_memories_batch() {
1883        let backend = SqliteBackend::in_memory().unwrap();
1884
1885        let memory1 = create_test_memory("batch1", "First memory", Namespace::Decisions);
1886        let memory2 = create_test_memory("batch2", "Second memory", Namespace::Learnings);
1887        let memory3 = create_test_memory("batch3", "Third memory", Namespace::Patterns);
1888
1889        backend.index(&memory1).unwrap();
1890        backend.index(&memory2).unwrap();
1891        backend.index(&memory3).unwrap();
1892
1893        // Fetch all three in a batch
1894        let ids = vec![
1895            MemoryId::new("batch1"),
1896            MemoryId::new("batch2"),
1897            MemoryId::new("batch3"),
1898        ];
1899        let results = backend.get_memories_batch(&ids).unwrap();
1900
1901        assert_eq!(results.len(), 3);
1902        assert!(results[0].is_some());
1903        assert!(results[1].is_some());
1904        assert!(results[2].is_some());
1905
1906        // Verify order is preserved
1907        assert_eq!(results[0].as_ref().unwrap().id.as_str(), "batch1");
1908        assert_eq!(results[1].as_ref().unwrap().id.as_str(), "batch2");
1909        assert_eq!(results[2].as_ref().unwrap().id.as_str(), "batch3");
1910    }
1911
1912    #[test]
1913    fn test_get_memories_batch_with_missing() {
1914        let backend = SqliteBackend::in_memory().unwrap();
1915
1916        let memory1 = create_test_memory("exists1", "Memory one", Namespace::Decisions);
1917        backend.index(&memory1).unwrap();
1918
1919        // Request both existing and non-existing
1920        let ids = vec![
1921            MemoryId::new("exists1"),
1922            MemoryId::new("does_not_exist"),
1923            MemoryId::new("also_missing"),
1924        ];
1925        let results = backend.get_memories_batch(&ids).unwrap();
1926
1927        assert_eq!(results.len(), 3);
1928        assert!(results[0].is_some());
1929        assert!(results[1].is_none());
1930        assert!(results[2].is_none());
1931    }
1932
1933    #[test]
1934    fn test_get_memories_batch_empty() {
1935        let backend = SqliteBackend::in_memory().unwrap();
1936        let results = backend.get_memories_batch(&[]).unwrap();
1937        assert!(results.is_empty());
1938    }
1939
1940    #[test]
1941    fn test_search_with_status_filter() {
1942        let backend = SqliteBackend::in_memory().unwrap();
1943
1944        // Create memories with different statuses
1945        let mut memory1 = create_test_memory("id1", "Rust programming", Namespace::Decisions);
1946        memory1.status = MemoryStatus::Active;
1947
1948        let mut memory2 = create_test_memory("id2", "Rust patterns", Namespace::Decisions);
1949        memory2.status = MemoryStatus::Archived;
1950
1951        backend.index(&memory1).unwrap();
1952        backend.index(&memory2).unwrap();
1953
1954        // Search with status filter
1955        let filter = SearchFilter::new().with_status(MemoryStatus::Active);
1956        let results = backend.search("Rust", &filter, 10).unwrap();
1957
1958        assert_eq!(results.len(), 1);
1959        assert_eq!(results[0].0.as_str(), "id1");
1960    }
1961
1962    #[test]
1963    fn test_search_with_tag_filter() {
1964        let backend = SqliteBackend::in_memory().unwrap();
1965
1966        let mut memory1 = create_test_memory("id1", "Rust guide", Namespace::Decisions);
1967        memory1.tags = vec!["rust".to_string(), "guide".to_string()];
1968
1969        let mut memory2 = create_test_memory("id2", "Rust tutorial", Namespace::Decisions);
1970        memory2.tags = vec!["rust".to_string(), "tutorial".to_string()];
1971
1972        backend.index(&memory1).unwrap();
1973        backend.index(&memory2).unwrap();
1974
1975        // Search with tag filter (using with_tag for single tag)
1976        let filter = SearchFilter::new().with_tag("guide");
1977        let results = backend.search("Rust", &filter, 10).unwrap();
1978
1979        assert_eq!(results.len(), 1);
1980        assert_eq!(results[0].0.as_str(), "id1");
1981    }
1982
1983    #[test]
1984    fn test_search_fts_special_characters() {
1985        let backend = SqliteBackend::in_memory().unwrap();
1986
1987        let memory = create_test_memory(
1988            "special",
1989            "Error: unexpected 'syntax' in /path/to/file.rs:42",
1990            Namespace::Learnings,
1991        );
1992        backend.index(&memory).unwrap();
1993
1994        // Search with special characters should be escaped properly
1995        let results = backend
1996            .search("Error syntax", &SearchFilter::new(), 10)
1997            .unwrap();
1998        assert_eq!(results.len(), 1);
1999
2000        // Search for path-like content
2001        let results = backend.search("file.rs", &SearchFilter::new(), 10).unwrap();
2002        assert_eq!(results.len(), 1);
2003    }
2004
2005    #[test]
2006    fn test_get_memory_single() {
2007        let backend = SqliteBackend::in_memory().unwrap();
2008
2009        let memory = create_test_memory(
2010            "single_get",
2011            "Fetching a single memory",
2012            Namespace::Decisions,
2013        );
2014        backend.index(&memory).unwrap();
2015
2016        // Fetch the memory
2017        let result = backend.get_memory(&MemoryId::new("single_get")).unwrap();
2018        assert!(result.is_some());
2019
2020        let fetched = result.unwrap();
2021        assert_eq!(fetched.id.as_str(), "single_get");
2022        assert_eq!(fetched.content, "Fetching a single memory");
2023        assert_eq!(fetched.namespace, Namespace::Decisions);
2024    }
2025
2026    #[test]
2027    fn test_get_memory_not_found() {
2028        let backend = SqliteBackend::in_memory().unwrap();
2029
2030        let result = backend.get_memory(&MemoryId::new("nonexistent")).unwrap();
2031        assert!(result.is_none());
2032    }
2033
2034    #[test]
2035    fn test_remove_nonexistent() {
2036        let backend = SqliteBackend::in_memory().unwrap();
2037
2038        // Removing a non-existent memory should return false
2039        let removed = backend.remove(&MemoryId::new("does_not_exist")).unwrap();
2040        assert!(!removed);
2041    }
2042
2043    #[test]
2044    fn test_search_whitespace_only_query() {
2045        let backend = SqliteBackend::in_memory().unwrap();
2046
2047        let memory = create_test_memory("id1", "Some content", Namespace::Decisions);
2048        backend.index(&memory).unwrap();
2049
2050        // Whitespace-only query should be handled gracefully
2051        // The search function should either return empty or handle the error
2052        let results = backend.search("   ", &SearchFilter::new(), 10);
2053        // Either empty results or an error is acceptable for whitespace-only queries
2054        assert!(results.is_ok() || results.is_err());
2055    }
2056
2057    #[test]
2058    fn test_search_limit() {
2059        let backend = SqliteBackend::in_memory().unwrap();
2060
2061        // Index 5 memories all containing "test"
2062        for i in 0..5 {
2063            let memory = create_test_memory(
2064                &format!("id{i}"),
2065                &format!("test content {i}"),
2066                Namespace::Decisions,
2067            );
2068            backend.index(&memory).unwrap();
2069        }
2070
2071        // Search with limit of 3
2072        let results = backend.search("test", &SearchFilter::new(), 3).unwrap();
2073        assert_eq!(results.len(), 3);
2074
2075        // Search with limit of 10 (more than available)
2076        let results = backend.search("test", &SearchFilter::new(), 10).unwrap();
2077        assert_eq!(results.len(), 5);
2078    }
2079
2080    #[test]
2081    fn test_index_and_search_with_unicode() {
2082        let backend = SqliteBackend::in_memory().unwrap();
2083
2084        let memory = create_test_memory(
2085            "unicode",
2086            "Testing Unicode support with accents: cafe naive resume",
2087            Namespace::Learnings,
2088        );
2089        backend.index(&memory).unwrap();
2090
2091        // Search for English content
2092        let results = backend
2093            .search("Testing Unicode", &SearchFilter::new(), 10)
2094            .unwrap();
2095        assert_eq!(results.len(), 1);
2096
2097        // Search for accented content (should work with FTS5's unicode tokenizer)
2098        let results = backend.search("cafe", &SearchFilter::new(), 10).unwrap();
2099        // Note: FTS5's default tokenizer may or may not match accented words
2100        // This test validates that unicode content doesn't break indexing
2101        assert!(results.is_empty() || results.len() == 1);
2102    }
2103
2104    #[test]
2105    fn test_db_path() {
2106        let backend = SqliteBackend::in_memory().unwrap();
2107        assert!(backend.db_path().is_none());
2108    }
2109
2110    #[test]
2111    fn test_escape_like_wildcards() {
2112        // No special characters
2113        assert_eq!(escape_like_wildcards("normal"), "normal");
2114        assert_eq!(escape_like_wildcards("test-tag"), "test-tag");
2115
2116        // Percent sign (LIKE wildcard for "any characters")
2117        assert_eq!(escape_like_wildcards("100%"), "100\\%");
2118        assert_eq!(escape_like_wildcards("%prefix"), "\\%prefix");
2119
2120        // Underscore (LIKE wildcard for "single character")
2121        assert_eq!(escape_like_wildcards("user_name"), "user\\_name");
2122        assert_eq!(escape_like_wildcards("_private"), "\\_private");
2123
2124        // Backslash (the escape character itself)
2125        assert_eq!(escape_like_wildcards("path\\file"), "path\\\\file");
2126
2127        // Multiple special characters
2128        assert_eq!(escape_like_wildcards("100%_test\\"), "100\\%\\_test\\\\");
2129
2130        // Empty string
2131        assert_eq!(escape_like_wildcards(""), "");
2132    }
2133
2134    #[test]
2135    fn test_glob_to_like_pattern() {
2136        // Glob wildcards are converted
2137        assert_eq!(glob_to_like_pattern("*"), "%");
2138        assert_eq!(glob_to_like_pattern("?"), "_");
2139        assert_eq!(glob_to_like_pattern("src/*.rs"), "src/%.rs");
2140        assert_eq!(glob_to_like_pattern("test?.txt"), "test_.txt");
2141
2142        // Literal SQL LIKE wildcards are escaped
2143        assert_eq!(glob_to_like_pattern("100%"), "100\\%");
2144        assert_eq!(glob_to_like_pattern("user_name"), "user\\_name");
2145
2146        // Combined: literal % escaped, glob * converted
2147        assert_eq!(glob_to_like_pattern("foo%*bar"), "foo\\%%bar");
2148        assert_eq!(glob_to_like_pattern("*_test%?"), "%\\_test\\%_");
2149
2150        // Backslash is escaped
2151        assert_eq!(glob_to_like_pattern("path\\file*"), "path\\\\file%");
2152
2153        // Complex pattern (** becomes %%, each * is a separate wildcard)
2154        assert_eq!(
2155            glob_to_like_pattern("src/**/test_*.rs"),
2156            "src/%%/test\\_%.rs"
2157        );
2158
2159        // Empty string
2160        assert_eq!(glob_to_like_pattern(""), "");
2161
2162        // No special characters
2163        assert_eq!(glob_to_like_pattern("normal"), "normal");
2164    }
2165
2166    #[test]
2167    fn test_source_pattern_with_sql_wildcards() {
2168        let backend = SqliteBackend::in_memory().unwrap();
2169
2170        // Create memories with various source paths
2171        let mut memory1 = create_test_memory("id1", "Test 100% pass", Namespace::Decisions);
2172        memory1.source = Some("src/100%_file.rs".to_string());
2173        backend.index(&memory1).unwrap();
2174
2175        let mut memory2 = create_test_memory("id2", "Test content", Namespace::Decisions);
2176        memory2.source = Some("src/other_file.rs".to_string());
2177        backend.index(&memory2).unwrap();
2178
2179        // Search with source pattern containing literal % (should be escaped)
2180        let filter = SearchFilter::new().with_source_pattern("src/100%*");
2181        let results = backend.search("Test", &filter, 10).unwrap();
2182        assert_eq!(
2183            results.len(),
2184            1,
2185            "Should find only file with literal 100% in name"
2186        );
2187        assert_eq!(results[0].0.as_str(), "id1");
2188
2189        // Search with glob wildcard only (should match both)
2190        let filter2 = SearchFilter::new().with_source_pattern("src/*");
2191        let results2 = backend.search("Test", &filter2, 10).unwrap();
2192        assert_eq!(
2193            results2.len(),
2194            2,
2195            "Should find both files with glob wildcard"
2196        );
2197    }
2198
2199    #[test]
2200    fn test_tag_filtering_with_special_characters() {
2201        let backend = SqliteBackend::in_memory().unwrap();
2202
2203        // Create memory with tag containing LIKE wildcard characters
2204        let mut memory = create_test_memory("id1", "Test content", Namespace::Decisions);
2205        memory.tags = vec!["100%_complete".to_string(), "normal-tag".to_string()];
2206        backend.index(&memory).unwrap();
2207
2208        // Search with exact tag match (should find it)
2209        let mut filter = SearchFilter::new();
2210        filter.tags.push("100%_complete".to_string());
2211        let results = backend.search("Test", &filter, 10).unwrap();
2212        assert_eq!(
2213            results.len(),
2214            1,
2215            "Should find memory with escaped wildcards"
2216        );
2217
2218        // Search with partial match that would work without escaping (should NOT find)
2219        let mut filter2 = SearchFilter::new();
2220        filter2.tags.push("100".to_string()); // Without escaping, % would match anything
2221        let results2 = backend.search("Test", &filter2, 10).unwrap();
2222        assert_eq!(
2223            results2.len(),
2224            0,
2225            "Should not match partial tag due to proper escaping"
2226        );
2227    }
2228
2229    #[test]
2230    fn test_checkpoint() {
2231        let backend = SqliteBackend::in_memory().unwrap();
2232
2233        // Index some data to create WAL entries
2234        let memory = create_test_memory("checkpoint-test", "Test checkpoint", Namespace::Decisions);
2235        backend.index(&memory).unwrap();
2236
2237        // Checkpoint should succeed (even on empty WAL)
2238        let result = backend.checkpoint();
2239        assert!(result.is_ok(), "Checkpoint should succeed");
2240
2241        let (written, remaining) = result.unwrap();
2242        // In-memory databases may have 0 pages since WAL behavior differs
2243        assert_eq!(
2244            remaining, 0,
2245            "No pages should remain after TRUNCATE checkpoint"
2246        );
2247        // written can be 0 for in-memory DBs
2248        let _ = written;
2249    }
2250
2251    #[test]
2252    fn test_wal_size() {
2253        let backend = SqliteBackend::in_memory().unwrap();
2254
2255        // WAL size should be queryable
2256        let size = backend.wal_size();
2257        // In-memory databases might return Some(0) or None depending on WAL mode
2258        // The important thing is it doesn't crash
2259        let _ = size;
2260    }
2261
2262    #[test]
2263    fn test_checkpoint_if_needed_below_threshold() {
2264        let backend = SqliteBackend::in_memory().unwrap();
2265
2266        // First check current WAL size
2267        let wal_size = backend.wal_size().unwrap_or(0);
2268
2269        // With threshold above current size, checkpoint shouldn't trigger
2270        let result = backend.checkpoint_if_needed(wal_size.saturating_add(1000));
2271        assert!(result.is_ok());
2272        // In-memory databases have minimal WAL, so this should not checkpoint
2273        // unless WAL is already above threshold
2274        let _ = result.unwrap(); // Just verify it doesn't error
2275    }
2276
2277    #[test]
2278    fn test_checkpoint_if_needed_above_threshold() {
2279        let backend = SqliteBackend::in_memory().unwrap();
2280
2281        // With threshold of 0, checkpoint should always trigger (if WAL exists)
2282        let result = backend.checkpoint_if_needed(0);
2283        assert!(result.is_ok());
2284        // Result may be Some or None depending on WAL state
2285    }
2286
2287    #[test]
2288    fn test_memory_edges_table_exists() {
2289        let backend = SqliteBackend::in_memory().unwrap();
2290        let conn = acquire_lock(&backend.conn);
2291
2292        // Verify memory_edges table exists
2293        let result: std::result::Result<i64, _> = conn.query_row(
2294            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='memory_edges'",
2295            [],
2296            |row| row.get(0),
2297        );
2298
2299        assert!(result.is_ok());
2300        assert_eq!(result.unwrap(), 1, "memory_edges table should exist");
2301    }
2302
2303    #[test]
2304    fn test_memory_edges_schema() {
2305        let backend = SqliteBackend::in_memory().unwrap();
2306        let conn = acquire_lock(&backend.conn);
2307
2308        // Verify table has correct columns
2309        let result: std::result::Result<Vec<String>, _> = conn
2310            .prepare("PRAGMA table_info(memory_edges)")
2311            .and_then(|mut stmt| {
2312                let columns = stmt
2313                    .query_map([], |row| row.get::<_, String>(1))?
2314                    .collect::<std::result::Result<Vec<_>, _>>()?;
2315                Ok(columns)
2316            });
2317
2318        assert!(result.is_ok());
2319        let columns = result.unwrap();
2320        assert!(columns.contains(&"from_id".to_string()));
2321        assert!(columns.contains(&"to_id".to_string()));
2322        assert!(columns.contains(&"edge_type".to_string()));
2323        assert!(columns.contains(&"created_at".to_string()));
2324        assert_eq!(
2325            columns.len(),
2326            4,
2327            "memory_edges should have exactly 4 columns"
2328        );
2329    }
2330
2331    #[test]
2332    fn test_memory_edges_indexes_exist() {
2333        let backend = SqliteBackend::in_memory().unwrap();
2334        let conn = acquire_lock(&backend.conn);
2335
2336        // Verify indexes were created
2337        let indexes: std::result::Result<Vec<String>, _> = conn
2338            .prepare(
2339                "SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='memory_edges'",
2340            )
2341            .and_then(|mut stmt| {
2342                let names = stmt
2343                    .query_map([], |row| row.get::<_, String>(0))?
2344                    .collect::<std::result::Result<Vec<_>, _>>()?;
2345                Ok(names)
2346            });
2347
2348        assert!(indexes.is_ok());
2349        let index_names = indexes.unwrap();
2350
2351        // Check for expected indexes
2352        assert!(
2353            index_names.iter().any(|name| name.contains("from_id")),
2354            "Should have index on from_id"
2355        );
2356        assert!(
2357            index_names.iter().any(|name| name.contains("to_id")),
2358            "Should have index on to_id"
2359        );
2360        assert!(
2361            index_names.iter().any(|name| name.contains("edge_type")),
2362            "Should have index on edge_type"
2363        );
2364    }
2365
2366    #[test]
2367    fn test_memory_edges_can_insert_and_query() {
2368        let backend = SqliteBackend::in_memory().unwrap();
2369
2370        // First create some memories to reference
2371        let memory1 = create_test_memory("edge_from", "Source memory", Namespace::Decisions);
2372        let memory2 = create_test_memory("edge_to", "Target memory", Namespace::Decisions);
2373        backend.index(&memory1).unwrap();
2374        backend.index(&memory2).unwrap();
2375
2376        let conn = acquire_lock(&backend.conn);
2377
2378        // Insert an edge
2379        let result = conn.execute(
2380            "INSERT INTO memory_edges (from_id, to_id, edge_type, created_at) VALUES (?1, ?2, ?3, ?4)",
2381            params!["edge_from", "edge_to", "summarized_by", 1_234_567_890_i64],
2382        );
2383
2384        assert!(result.is_ok());
2385        assert_eq!(result.unwrap(), 1, "Should insert one edge");
2386
2387        // Query the edge back
2388        let query_result: std::result::Result<(String, String, String, i64), _> = conn.query_row(
2389            "SELECT from_id, to_id, edge_type, created_at FROM memory_edges WHERE from_id = ?1",
2390            params!["edge_from"],
2391            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
2392        );
2393
2394        assert!(query_result.is_ok());
2395        let (from_id, to_id, edge_type, created_at) = query_result.unwrap();
2396        assert_eq!(from_id, "edge_from");
2397        assert_eq!(to_id, "edge_to");
2398        assert_eq!(edge_type, "summarized_by");
2399        assert_eq!(created_at, 1_234_567_890);
2400    }
2401
2402    #[test]
2403    fn test_memory_edges_foreign_key_constraint() {
2404        let backend = SqliteBackend::in_memory().unwrap();
2405        let conn = acquire_lock(&backend.conn);
2406
2407        // Enable foreign key enforcement
2408        let _ = conn.execute("PRAGMA foreign_keys = ON", []);
2409
2410        // Try to insert an edge with non-existent memory IDs
2411        // This should fail due to foreign key constraint
2412        let result = conn.execute(
2413            "INSERT INTO memory_edges (from_id, to_id, edge_type, created_at) VALUES (?1, ?2, ?3, ?4)",
2414            params!["nonexistent_from", "nonexistent_to", "summarized_by", 1_234_567_890_i64],
2415        );
2416
2417        // Foreign keys might not be enforced in all SQLite configurations,
2418        // so we don't strictly require failure here, but document the behavior
2419        let _ = result; // Just verify it doesn't panic
2420    }
2421
2422    #[test]
2423    fn test_memory_edges_cascade_delete() {
2424        let backend = SqliteBackend::in_memory().unwrap();
2425
2426        // Create memories and an edge
2427        let memory1 = create_test_memory("cascade_from", "Source", Namespace::Decisions);
2428        let memory2 = create_test_memory("cascade_to", "Target", Namespace::Decisions);
2429        backend.index(&memory1).unwrap();
2430        backend.index(&memory2).unwrap();
2431
2432        let conn = acquire_lock(&backend.conn);
2433
2434        // Enable foreign key enforcement
2435        let _ = conn.execute("PRAGMA foreign_keys = ON", []);
2436
2437        // Insert edge
2438        conn.execute(
2439            "INSERT INTO memory_edges (from_id, to_id, edge_type, created_at) VALUES (?1, ?2, ?3, ?4)",
2440            params!["cascade_from", "cascade_to", "summarized_by", 1_234_567_890_i64],
2441        )
2442        .unwrap();
2443
2444        // Verify edge exists
2445        let count: i64 = conn
2446            .query_row(
2447                "SELECT COUNT(*) FROM memory_edges WHERE from_id = ?1",
2448                params!["cascade_from"],
2449                |row| row.get(0),
2450            )
2451            .unwrap();
2452        assert_eq!(count, 1);
2453
2454        // Drop the connection guard BEFORE calling backend.remove() to avoid deadlock.
2455        // The remove() method needs to acquire the same mutex lock.
2456        drop(conn);
2457
2458        // Delete the source memory
2459        backend.remove(&MemoryId::new("cascade_from")).unwrap();
2460
2461        // Re-acquire the connection to verify cascade delete worked
2462        let conn = acquire_lock(&backend.conn);
2463
2464        // Edge should be deleted due to CASCADE
2465        let count_after: i64 = conn
2466            .query_row(
2467                "SELECT COUNT(*) FROM memory_edges WHERE from_id = ?1",
2468                params!["cascade_from"],
2469                |row| row.get(0),
2470            )
2471            .unwrap();
2472        assert_eq!(
2473            count_after, 0,
2474            "Edge should be deleted when source memory is deleted"
2475        );
2476    }
2477
2478    #[test]
2479    fn test_store_edge_and_query_edges() {
2480        use crate::models::EdgeType;
2481
2482        let backend = SqliteBackend::in_memory().unwrap();
2483
2484        // Create and index test memories
2485        let memory1 = create_test_memory("source_mem", "Source memory", Namespace::Decisions);
2486        let memory2 = create_test_memory("target_mem", "Target memory", Namespace::Decisions);
2487        backend.index(&memory1).unwrap();
2488        backend.index(&memory2).unwrap();
2489
2490        // Store an edge using the store_edge method
2491        let result = backend.store_edge(&memory1.id, &memory2.id, EdgeType::SummarizedBy);
2492        assert!(result.is_ok(), "store_edge should succeed");
2493
2494        // Query edges using the query_edges method
2495        let edges = backend
2496            .query_edges(&memory1.id, EdgeType::SummarizedBy)
2497            .expect("query_edges should succeed");
2498
2499        assert_eq!(edges.len(), 1, "Should have 1 edge");
2500        assert_eq!(edges[0], memory2.id, "Edge should point to target memory");
2501
2502        // Query non-existent edge type should return empty
2503        let no_edges = backend
2504            .query_edges(&memory1.id, EdgeType::Contradicts)
2505            .expect("query_edges for non-existent edge should succeed");
2506        assert_eq!(
2507            no_edges.len(),
2508            0,
2509            "Should have no edges for different edge type"
2510        );
2511    }
2512
2513    #[test]
2514    fn test_store_edge_upsert() {
2515        use crate::models::EdgeType;
2516
2517        let backend = SqliteBackend::in_memory().unwrap();
2518
2519        // Create and index test memories
2520        let memory1 = create_test_memory("upsert_source", "Source", Namespace::Decisions);
2521        let memory2 = create_test_memory("upsert_target", "Target", Namespace::Decisions);
2522        backend.index(&memory1).unwrap();
2523        backend.index(&memory2).unwrap();
2524
2525        // Store edge first time
2526        backend
2527            .store_edge(&memory1.id, &memory2.id, EdgeType::SummarizedBy)
2528            .unwrap();
2529
2530        // Store same edge again (should upsert, not error)
2531        let result = backend.store_edge(&memory1.id, &memory2.id, EdgeType::SummarizedBy);
2532        assert!(result.is_ok(), "Duplicate edge should upsert without error");
2533
2534        // Verify still only one edge
2535        let edges = backend
2536            .query_edges(&memory1.id, EdgeType::SummarizedBy)
2537            .unwrap();
2538        assert_eq!(edges.len(), 1, "Should still have only 1 edge after upsert");
2539    }
2540
2541    #[test]
2542    fn test_query_edges_multiple() {
2543        use crate::models::EdgeType;
2544
2545        let backend = SqliteBackend::in_memory().unwrap();
2546
2547        // Create and index test memories
2548        let source = create_test_memory("multi_source", "Source", Namespace::Decisions);
2549        let target1 = create_test_memory("multi_target1", "Target 1", Namespace::Decisions);
2550        let target2 = create_test_memory("multi_target2", "Target 2", Namespace::Decisions);
2551        let target3 = create_test_memory("multi_target3", "Target 3", Namespace::Decisions);
2552
2553        backend.index(&source).unwrap();
2554        backend.index(&target1).unwrap();
2555        backend.index(&target2).unwrap();
2556        backend.index(&target3).unwrap();
2557
2558        // Store multiple edges from one source
2559        backend
2560            .store_edge(&source.id, &target1.id, EdgeType::RelatedTo)
2561            .unwrap();
2562        backend
2563            .store_edge(&source.id, &target2.id, EdgeType::RelatedTo)
2564            .unwrap();
2565        backend
2566            .store_edge(&source.id, &target3.id, EdgeType::RelatedTo)
2567            .unwrap();
2568
2569        // Query should return all three
2570        let edges = backend
2571            .query_edges(&source.id, EdgeType::RelatedTo)
2572            .unwrap();
2573        assert_eq!(edges.len(), 3, "Should have 3 edges");
2574
2575        // Verify all targets are present
2576        assert!(edges.contains(&target1.id));
2577        assert!(edges.contains(&target2.id));
2578        assert!(edges.contains(&target3.id));
2579    }
2580}