Skip to main content

subcog/storage/persistence/
postgresql.rs

1//! PostgreSQL-based persistence backend.
2//!
3//! Provides reliable persistence using PostgreSQL as the storage backend.
4
5#[cfg(feature = "postgres")]
6mod implementation {
7    use crate::models::{Domain, Memory, MemoryId, MemoryStatus, Namespace};
8    use crate::storage::migrations::{Migration, MigrationRunner};
9    use crate::storage::resilience::{StorageResilienceConfig, retry_connection};
10    use crate::storage::traits::PersistenceBackend;
11    use crate::{Error, Result};
12    use chrono::{TimeZone, Utc};
13    use deadpool_postgres::{Config, Pool, Runtime};
14    use tokio::runtime::Handle;
15    use tokio_postgres::NoTls;
16
17    /// Embedded migrations compiled into the binary.
18    const MIGRATIONS: &[Migration] = &[
19        Migration {
20            version: 1,
21            description: "Initial memories table",
22            sql: r"
23                CREATE TABLE IF NOT EXISTS {table} (
24                    id TEXT PRIMARY KEY,
25                    content TEXT NOT NULL,
26                    namespace TEXT NOT NULL,
27                    domain_org TEXT,
28                    domain_project TEXT,
29                    domain_repo TEXT,
30                    status TEXT NOT NULL DEFAULT 'active',
31                    tags TEXT[] NOT NULL DEFAULT '{}',
32                    source TEXT,
33                    embedding JSONB,
34                    created_at BIGINT NOT NULL,
35                    updated_at BIGINT NOT NULL
36                );
37                CREATE INDEX IF NOT EXISTS idx_{table}_namespace ON {table} (namespace);
38                CREATE INDEX IF NOT EXISTS idx_{table}_status ON {table} (status);
39                CREATE INDEX IF NOT EXISTS idx_{table}_created_at ON {table} (created_at DESC);
40                CREATE INDEX IF NOT EXISTS idx_{table}_updated_at ON {table} (updated_at DESC);
41            ",
42        },
43        Migration {
44            version: 2,
45            description: "Add tags GIN index",
46            sql: r"
47                CREATE INDEX IF NOT EXISTS idx_{table}_tags ON {table} USING GIN(tags);
48            ",
49        },
50        Migration {
51            version: 3,
52            description: "Add domain composite index",
53            sql: r"
54                CREATE INDEX IF NOT EXISTS idx_{table}_domain ON {table} (domain_org, domain_project, domain_repo);
55            ",
56        },
57        Migration {
58            version: 4,
59            description: "Add tombstoned_at column (ADR-0053)",
60            sql: r"
61                ALTER TABLE {table} ADD COLUMN IF NOT EXISTS tombstoned_at BIGINT;
62                CREATE INDEX IF NOT EXISTS idx_{table}_tombstoned ON {table} (tombstoned_at) WHERE tombstoned_at IS NOT NULL;
63            ",
64        },
65        Migration {
66            version: 5,
67            description: "Add facet columns (ADR-0048/0049)",
68            sql: r"
69                ALTER TABLE {table} ADD COLUMN IF NOT EXISTS project_id TEXT;
70                ALTER TABLE {table} ADD COLUMN IF NOT EXISTS branch TEXT;
71                ALTER TABLE {table} ADD COLUMN IF NOT EXISTS file_path TEXT;
72                CREATE INDEX IF NOT EXISTS idx_{table}_project_id ON {table} (project_id);
73                CREATE INDEX IF NOT EXISTS idx_{table}_project_branch ON {table} (project_id, branch);
74                CREATE INDEX IF NOT EXISTS idx_{table}_file_path ON {table} (file_path);
75            ",
76        },
77    ];
78
79    /// PostgreSQL-based persistence backend.
80    pub struct PostgresBackend {
81        /// Connection pool.
82        pool: Pool,
83        /// Table name for memories.
84        table_name: String,
85    }
86
87    /// Helper to map pool errors.
88    fn pool_error(e: impl std::fmt::Display) -> Error {
89        Error::OperationFailed {
90            operation: "postgres_persistence_get_client".to_string(),
91            cause: e.to_string(),
92        }
93    }
94
95    /// Helper to map query errors.
96    fn query_error(op: &str, e: impl std::fmt::Display) -> Error {
97        Error::OperationFailed {
98            operation: op.to_string(),
99            cause: e.to_string(),
100        }
101    }
102
103    impl PostgresBackend {
104        /// Creates a new PostgreSQL backend.
105        ///
106        /// # Errors
107        ///
108        /// Returns an error if the connection pool fails to initialize.
109        pub fn new(connection_url: &str, table_name: impl Into<String>) -> Result<Self> {
110            Self::with_pool_size(connection_url, table_name, None)
111        }
112
113        /// Creates a new PostgreSQL backend with configurable pool size.
114        ///
115        /// # Arguments
116        ///
117        /// * `connection_url` - PostgreSQL connection URL
118        /// * `table_name` - Name of the table for storing memories
119        /// * `pool_max_size` - Maximum connections in pool (defaults to 20)
120        ///
121        /// # Errors
122        ///
123        /// Returns an error if the connection pool fails to initialize after retries.
124        ///
125        /// # Connection Retry (CHAOS-HIGH-003)
126        ///
127        /// Connection establishment uses exponential backoff with jitter for transient
128        /// failures. This handles scenarios where the database is starting up or
129        /// temporarily unavailable.
130        pub fn with_pool_size(
131            connection_url: &str,
132            table_name: impl Into<String>,
133            pool_max_size: Option<usize>,
134        ) -> Result<Self> {
135            let table_name = table_name.into();
136            let config = Self::parse_connection_url(connection_url)?;
137            let cfg = Self::build_pool_config(&config, pool_max_size);
138
139            // Use connection retry with exponential backoff (CHAOS-HIGH-003)
140            let resilience_config = StorageResilienceConfig::from_env();
141            let pool = retry_connection(&resilience_config, "postgres", "create_pool", || {
142                cfg.create_pool(Some(Runtime::Tokio1), NoTls)
143                    .map_err(|e| Error::OperationFailed {
144                        operation: "postgres_persistence_create_pool".to_string(),
145                        cause: e.to_string(),
146                    })
147            })?;
148
149            let backend = Self { pool, table_name };
150            backend.run_migrations()?;
151            Ok(backend)
152        }
153
154        /// Parses the connection URL into a tokio-postgres config.
155        fn parse_connection_url(url: &str) -> Result<tokio_postgres::Config> {
156            url.parse::<tokio_postgres::Config>()
157                .map_err(|e| Error::OperationFailed {
158                    operation: "postgres_persistence_parse_url".to_string(),
159                    cause: e.to_string(),
160                })
161        }
162
163        /// Extracts host string from tokio-postgres Host.
164        #[cfg(unix)]
165        fn host_to_string(h: &tokio_postgres::config::Host) -> String {
166            match h {
167                tokio_postgres::config::Host::Tcp(s) => s.clone(),
168                tokio_postgres::config::Host::Unix(p) => p.to_string_lossy().to_string(),
169            }
170        }
171
172        /// Extracts host string from tokio-postgres Host (Windows: Tcp only).
173        #[cfg(not(unix))]
174        fn host_to_string(h: &tokio_postgres::config::Host) -> String {
175            let tokio_postgres::config::Host::Tcp(s) = h;
176            s.clone()
177        }
178
179        /// Default maximum connections in pool.
180        const DEFAULT_POOL_MAX_SIZE: usize = 20;
181
182        /// Builds a deadpool config from tokio-postgres config.
183        ///
184        /// # Pool Configuration (HIGH-010, DB-M2)
185        ///
186        /// Configures connection pool with safety limits:
187        /// - Configurable max connections (defaults to 20, prevents pool exhaustion)
188        /// - 5 second acquire timeout (prevents hanging on pool exhaustion)
189        ///
190        /// Pool size can be configured via `StorageBackendConfig.pool_max_size`.
191        fn build_pool_config(
192            config: &tokio_postgres::Config,
193            pool_max_size: Option<usize>,
194        ) -> Config {
195            let mut cfg = Config::new();
196            cfg.host = config.get_hosts().first().map(Self::host_to_string);
197            cfg.port = config.get_ports().first().copied();
198            cfg.user = config.get_user().map(String::from);
199            cfg.password = config
200                .get_password()
201                .map(|p| String::from_utf8_lossy(p).to_string());
202            cfg.dbname = config.get_dbname().map(String::from);
203
204            // Pool configuration with timeout (HIGH-010, DB-M2)
205            let max_size = pool_max_size.unwrap_or(Self::DEFAULT_POOL_MAX_SIZE);
206            cfg.pool = Some(deadpool_postgres::PoolConfig {
207                max_size,
208                timeouts: deadpool_postgres::Timeouts {
209                    wait: Some(std::time::Duration::from_secs(5)),
210                    create: Some(std::time::Duration::from_secs(5)),
211                    recycle: Some(std::time::Duration::from_secs(5)),
212                },
213                ..Default::default()
214            });
215
216            // Configure manager with fast recycling for connection reuse
217            cfg.manager = Some(deadpool_postgres::ManagerConfig {
218                recycling_method: deadpool_postgres::RecyclingMethod::Fast,
219            });
220
221            cfg
222        }
223
224        /// Creates a backend with default settings.
225        ///
226        /// # Errors
227        ///
228        /// Returns an error if the connection fails.
229        pub fn with_defaults() -> Result<Self> {
230            Self::new("postgresql://localhost/subcog", "memories")
231        }
232
233        /// Runs a blocking operation on the async pool.
234        fn block_on<F, T>(&self, f: F) -> Result<T>
235        where
236            F: std::future::Future<Output = Result<T>>,
237        {
238            if let Ok(handle) = Handle::try_current() {
239                handle.block_on(f)
240            } else {
241                let rt = tokio::runtime::Builder::new_current_thread()
242                    .enable_all()
243                    .build()
244                    .map_err(|e| Error::OperationFailed {
245                        operation: "postgres_persistence_create_runtime".to_string(),
246                        cause: e.to_string(),
247                    })?;
248                rt.block_on(f)
249            }
250        }
251
252        /// Runs migrations.
253        fn run_migrations(&self) -> Result<()> {
254            self.block_on(async {
255                let runner = MigrationRunner::new(self.pool.clone(), &self.table_name);
256                runner.run(MIGRATIONS).await
257            })
258        }
259
260        /// Async implementation of store operation.
261        #[allow(clippy::cast_possible_wrap)]
262        async fn store_async(&self, memory: &Memory) -> Result<()> {
263            let client = self.pool.get().await.map_err(pool_error)?;
264
265            let upsert = format!(
266                r"INSERT INTO {} (id, content, namespace, domain_org, domain_project, domain_repo,
267                    project_id, branch, file_path,
268                    status, tags, source, embedding, created_at, updated_at, tombstoned_at)
269                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
270                ON CONFLICT (id) DO UPDATE SET
271                    content = EXCLUDED.content,
272                    namespace = EXCLUDED.namespace,
273                    domain_org = EXCLUDED.domain_org,
274                    domain_project = EXCLUDED.domain_project,
275                    domain_repo = EXCLUDED.domain_repo,
276                    project_id = EXCLUDED.project_id,
277                    branch = EXCLUDED.branch,
278                    file_path = EXCLUDED.file_path,
279                    status = EXCLUDED.status,
280                    tags = EXCLUDED.tags,
281                    source = EXCLUDED.source,
282                    embedding = EXCLUDED.embedding,
283                    updated_at = EXCLUDED.updated_at,
284                    tombstoned_at = EXCLUDED.tombstoned_at",
285                self.table_name
286            );
287
288            let tags: Vec<&str> = memory.tags.iter().map(String::as_str).collect();
289            let embedding_json: Option<serde_json::Value> =
290                memory.embedding.as_ref().map(|e| serde_json::json!(e));
291
292            let tombstoned_at = memory.tombstoned_at.map(|ts| ts.timestamp());
293
294            client
295                .execute(
296                    &upsert,
297                    &[
298                        &memory.id.as_str(),
299                        &memory.content,
300                        &memory.namespace.as_str(),
301                        &memory.domain.organization,
302                        &memory.domain.project,
303                        &memory.domain.repository,
304                        &memory.project_id,
305                        &memory.branch,
306                        &memory.file_path,
307                        &memory.status.as_str(),
308                        &tags,
309                        &memory.source,
310                        &embedding_json,
311                        &(memory.created_at as i64),
312                        &(memory.updated_at as i64),
313                        &tombstoned_at,
314                    ],
315                )
316                .await
317                .map_err(|e| query_error("postgres_persistence_store", e))?;
318
319            Ok(())
320        }
321
322        /// Async implementation of get operation.
323        #[allow(clippy::cast_sign_loss)]
324        async fn get_async(&self, id: &MemoryId) -> Result<Option<Memory>> {
325            let client = self.pool.get().await.map_err(pool_error)?;
326
327            let query = format!(
328                r"SELECT id, content, namespace, domain_org, domain_project, domain_repo,
329                    project_id, branch, file_path,
330                    status, tags, source, embedding, created_at, updated_at, tombstoned_at
331                FROM {}
332                WHERE id = $1",
333                self.table_name
334            );
335
336            let row = client
337                .query_opt(&query, &[&id.as_str()])
338                .await
339                .map_err(|e| query_error("postgres_persistence_get", e))?;
340
341            Ok(row.map(|r| Self::row_to_memory(&r)))
342        }
343
344        /// Converts a database row to a Memory.
345        #[allow(clippy::cast_sign_loss)]
346        fn row_to_memory(row: &tokio_postgres::Row) -> Memory {
347            let id: String = row.get("id");
348            let content: String = row.get("content");
349            let namespace_str: String = row.get("namespace");
350            let domain_org: Option<String> = row.get("domain_org");
351            let domain_project: Option<String> = row.get("domain_project");
352            let domain_repo: Option<String> = row.get("domain_repo");
353            let project_id: Option<String> = row.get("project_id");
354            let branch: Option<String> = row.get("branch");
355            let file_path: Option<String> = row.get("file_path");
356            let status_str: String = row.get("status");
357            let tags: Vec<String> = row.get("tags");
358            let source: Option<String> = row.get("source");
359            let embedding_json: Option<serde_json::Value> = row.get("embedding");
360            let created_at: i64 = row.get("created_at");
361            let updated_at: i64 = row.get("updated_at");
362            let tombstoned_at: Option<i64> = row.get("tombstoned_at");
363
364            let namespace = Namespace::parse(&namespace_str).unwrap_or_default();
365            let status = match status_str.as_str() {
366                "active" => MemoryStatus::Active,
367                "archived" => MemoryStatus::Archived,
368                "superseded" => MemoryStatus::Superseded,
369                "pending" => MemoryStatus::Pending,
370                "deleted" => MemoryStatus::Deleted,
371                "tombstoned" => MemoryStatus::Tombstoned,
372                "consolidated" => MemoryStatus::Consolidated,
373                _ => MemoryStatus::Active,
374            };
375
376            let embedding: Option<Vec<f32>> =
377                embedding_json.and_then(|v| serde_json::from_value(v).ok());
378
379            let tombstoned_at = tombstoned_at.and_then(|ts| Utc.timestamp_opt(ts, 0).single());
380
381            Memory {
382                id: MemoryId::new(id),
383                content,
384                namespace,
385                domain: Domain {
386                    organization: domain_org,
387                    project: domain_project,
388                    repository: domain_repo,
389                },
390                project_id,
391                branch,
392                file_path,
393                status,
394                tags,
395                #[cfg(feature = "group-scope")]
396                group_id: None,
397                source,
398                embedding,
399                created_at: created_at as u64,
400                updated_at: updated_at as u64,
401                tombstoned_at,
402                expires_at: None,
403                is_summary: false,
404                source_memory_ids: None,
405                consolidation_timestamp: None,
406            }
407        }
408
409        /// Async implementation of delete operation.
410        async fn delete_async(&self, id: &MemoryId) -> Result<bool> {
411            let client = self.pool.get().await.map_err(pool_error)?;
412            let delete = format!("DELETE FROM {} WHERE id = $1", self.table_name);
413            let rows = client
414                .execute(&delete, &[&id.as_str()])
415                .await
416                .map_err(|e| query_error("postgres_persistence_delete", e))?;
417            Ok(rows > 0)
418        }
419
420        /// Async implementation of `list_ids` operation.
421        async fn list_ids_async(&self) -> Result<Vec<MemoryId>> {
422            let client = self.pool.get().await.map_err(pool_error)?;
423
424            let query = format!(
425                "SELECT id FROM {} ORDER BY updated_at DESC",
426                self.table_name
427            );
428
429            let rows = client
430                .query(&query, &[])
431                .await
432                .map_err(|e| query_error("postgres_persistence_list_ids", e))?;
433
434            Ok(rows
435                .iter()
436                .map(|row| {
437                    let id: String = row.get(0);
438                    MemoryId::new(id)
439                })
440                .collect())
441        }
442
443        /// Async implementation of `get_batch` operation using single IN query.
444        ///
445        /// Avoids N+1 queries by fetching all IDs in a single round-trip.
446        #[allow(clippy::cast_sign_loss)]
447        async fn get_batch_async(&self, ids: &[MemoryId]) -> Result<Vec<Memory>> {
448            if ids.is_empty() {
449                return Ok(Vec::new());
450            }
451
452            let client = self.pool.get().await.map_err(pool_error)?;
453
454            // Build parameterized IN clause: $1, $2, $3, ...
455            let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("${i}")).collect();
456            let query = format!(
457                r"SELECT id, content, namespace, domain_org, domain_project, domain_repo,
458                    status, tags, source, embedding, created_at, updated_at, tombstoned_at
459                FROM {} WHERE id IN ({})",
460                self.table_name,
461                placeholders.join(", ")
462            );
463
464            // Build params array
465            let id_strs: Vec<&str> = ids.iter().map(MemoryId::as_str).collect();
466            let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
467                id_strs.iter().map(|s| s as _).collect();
468
469            let rows = client
470                .query(&query, &params)
471                .await
472                .map_err(|e| query_error("postgres_persistence_get_batch", e))?;
473
474            Ok(rows.iter().map(Self::row_to_memory).collect())
475        }
476    }
477
478    impl PersistenceBackend for PostgresBackend {
479        fn store(&self, memory: &Memory) -> Result<()> {
480            self.block_on(self.store_async(memory))
481        }
482
483        fn get(&self, id: &MemoryId) -> Result<Option<Memory>> {
484            self.block_on(self.get_async(id))
485        }
486
487        fn delete(&self, id: &MemoryId) -> Result<bool> {
488            self.block_on(self.delete_async(id))
489        }
490
491        fn list_ids(&self) -> Result<Vec<MemoryId>> {
492            self.block_on(self.list_ids_async())
493        }
494
495        /// Optimized batch retrieval using a single IN query (HIGH-PERF-002).
496        fn get_batch(&self, ids: &[MemoryId]) -> Result<Vec<Memory>> {
497            self.block_on(self.get_batch_async(ids))
498        }
499    }
500}
501
502#[cfg(feature = "postgres")]
503pub use implementation::PostgresBackend;
504
505#[cfg(not(feature = "postgres"))]
506mod stub {
507    use crate::models::{Memory, MemoryId};
508    use crate::storage::traits::PersistenceBackend;
509    use crate::{Error, Result};
510
511    /// Stub PostgreSQL backend when feature is not enabled.
512    pub struct PostgresBackend {
513        connection_url: String,
514        table_name: String,
515    }
516
517    impl PostgresBackend {
518        /// Creates a new PostgreSQL backend (stub).
519        #[must_use]
520        pub fn new(connection_url: impl Into<String>, table_name: impl Into<String>) -> Self {
521            Self {
522                connection_url: connection_url.into(),
523                table_name: table_name.into(),
524            }
525        }
526
527        /// Creates a new PostgreSQL backend with configurable pool size (stub).
528        ///
529        /// The pool size is ignored in the stub - requires `postgres` feature.
530        #[must_use]
531        pub fn with_pool_size(
532            connection_url: impl Into<String>,
533            table_name: impl Into<String>,
534            _pool_max_size: Option<usize>,
535        ) -> Self {
536            Self::new(connection_url, table_name)
537        }
538
539        /// Creates a backend with default settings (stub).
540        #[must_use]
541        pub fn with_defaults() -> Self {
542            Self::new("postgresql://localhost/subcog", "memories")
543        }
544    }
545
546    impl PersistenceBackend for PostgresBackend {
547        fn store(&self, _memory: &Memory) -> Result<()> {
548            Err(Error::NotImplemented(format!(
549                "PostgresBackend::store to {} on {}",
550                self.table_name, self.connection_url
551            )))
552        }
553
554        fn get(&self, _id: &MemoryId) -> Result<Option<Memory>> {
555            Err(Error::NotImplemented(format!(
556                "PostgresBackend::get from {} on {}",
557                self.table_name, self.connection_url
558            )))
559        }
560
561        fn delete(&self, _id: &MemoryId) -> Result<bool> {
562            Err(Error::NotImplemented(format!(
563                "PostgresBackend::delete from {} on {}",
564                self.table_name, self.connection_url
565            )))
566        }
567
568        fn list_ids(&self) -> Result<Vec<MemoryId>> {
569            Err(Error::NotImplemented(format!(
570                "PostgresBackend::list_ids from {} on {}",
571                self.table_name, self.connection_url
572            )))
573        }
574    }
575}
576
577#[cfg(not(feature = "postgres"))]
578pub use stub::PostgresBackend;
579
580#[cfg(all(test, feature = "postgres"))]
581mod tests {
582    use super::*;
583    use crate::models::{Domain, Memory, MemoryId, MemoryStatus, Namespace};
584    use crate::storage::traits::PersistenceBackend;
585    use std::env;
586
587    /// Gets test database URL from environment or skips test.
588    fn get_test_db_url() -> Option<String> {
589        env::var("SUBCOG_TEST_POSTGRES_URL").ok()
590    }
591
592    /// Creates a test memory with given ID.
593    fn create_test_memory(id: &str) -> Memory {
594        Memory {
595            id: MemoryId::new(id),
596            content: format!("Test content for {id}"),
597            namespace: Namespace::Decisions,
598            domain: Domain {
599                organization: Some("test-org".to_string()),
600                project: Some("test-project".to_string()),
601                repository: Some("test-repo".to_string()),
602            },
603            project_id: Some("github.com/test-org/test-repo".to_string()),
604            branch: Some("main".to_string()),
605            file_path: Some("src/lib.rs".to_string()),
606            status: MemoryStatus::Active,
607            created_at: 1_700_000_000,
608            updated_at: 1_700_000_000,
609            tombstoned_at: None,
610            embedding: None,
611            tags: vec!["test".to_string(), "integration".to_string()],
612            #[cfg(feature = "group-scope")]
613            group_id: None,
614            source: Some("test.rs".to_string()),
615            is_summary: false,
616            source_memory_ids: None,
617            consolidation_timestamp: None,
618            expires_at: None,
619        }
620    }
621
622    /// Creates a unique table name for test isolation.
623    fn unique_table_name() -> String {
624        use std::time::{SystemTime, UNIX_EPOCH};
625        let ts = SystemTime::now()
626            .duration_since(UNIX_EPOCH)
627            .unwrap_or_default()
628            .as_nanos();
629        format!("test_memories_{ts}")
630    }
631
632    #[test]
633    fn test_store_and_retrieve_memory() {
634        let Some(url) = get_test_db_url() else {
635            eprintln!("Skipping: SUBCOG_TEST_POSTGRES_URL not set");
636            return;
637        };
638
639        let table = unique_table_name();
640        let backend = PostgresBackend::new(&url, &table).expect("Failed to create backend");
641
642        let memory = create_test_memory("test-store-retrieve");
643        backend.store(&memory).expect("Failed to store memory");
644
645        let retrieved = backend
646            .get(&MemoryId::new("test-store-retrieve"))
647            .expect("Failed to get memory");
648
649        assert!(retrieved.is_some());
650        let retrieved = retrieved.unwrap();
651        assert_eq!(retrieved.id.as_str(), "test-store-retrieve");
652        assert_eq!(retrieved.namespace, Namespace::Decisions);
653        assert_eq!(retrieved.status, MemoryStatus::Active);
654        assert!(retrieved.content.contains("test-store-retrieve"));
655    }
656
657    #[test]
658    fn test_get_nonexistent_memory() {
659        let Some(url) = get_test_db_url() else {
660            eprintln!("Skipping: SUBCOG_TEST_POSTGRES_URL not set");
661            return;
662        };
663
664        let table = unique_table_name();
665        let backend = PostgresBackend::new(&url, &table).expect("Failed to create backend");
666
667        let result = backend
668            .get(&MemoryId::new("nonexistent-id"))
669            .expect("Failed to query");
670
671        assert!(result.is_none());
672    }
673
674    #[test]
675    fn test_update_existing_memory() {
676        let Some(url) = get_test_db_url() else {
677            eprintln!("Skipping: SUBCOG_TEST_POSTGRES_URL not set");
678            return;
679        };
680
681        let table = unique_table_name();
682        let backend = PostgresBackend::new(&url, &table).expect("Failed to create backend");
683
684        let mut memory = create_test_memory("test-update");
685        backend.store(&memory).expect("Failed to store initial");
686
687        // Update the memory
688        memory.content = "Updated content".to_string();
689        memory.status = MemoryStatus::Archived;
690        memory.updated_at = 1_700_001_000;
691        backend.store(&memory).expect("Failed to store update");
692
693        let retrieved = backend
694            .get(&MemoryId::new("test-update"))
695            .expect("Failed to get")
696            .expect("Memory not found");
697
698        assert_eq!(retrieved.content, "Updated content");
699        assert_eq!(retrieved.status, MemoryStatus::Archived);
700        assert_eq!(retrieved.updated_at, 1_700_001_000);
701    }
702
703    #[test]
704    fn test_delete_memory() {
705        let Some(url) = get_test_db_url() else {
706            eprintln!("Skipping: SUBCOG_TEST_POSTGRES_URL not set");
707            return;
708        };
709
710        let table = unique_table_name();
711        let backend = PostgresBackend::new(&url, &table).expect("Failed to create backend");
712
713        let memory = create_test_memory("test-delete");
714        backend.store(&memory).expect("Failed to store");
715
716        let deleted = backend
717            .delete(&MemoryId::new("test-delete"))
718            .expect("Failed to delete");
719        assert!(deleted);
720
721        let retrieved = backend
722            .get(&MemoryId::new("test-delete"))
723            .expect("Failed to get");
724        assert!(retrieved.is_none());
725    }
726
727    #[test]
728    fn test_delete_nonexistent_memory() {
729        let Some(url) = get_test_db_url() else {
730            eprintln!("Skipping: SUBCOG_TEST_POSTGRES_URL not set");
731            return;
732        };
733
734        let table = unique_table_name();
735        let backend = PostgresBackend::new(&url, &table).expect("Failed to create backend");
736
737        let deleted = backend
738            .delete(&MemoryId::new("never-existed"))
739            .expect("Failed to delete");
740        assert!(!deleted);
741    }
742
743    #[test]
744    fn test_list_ids() {
745        let Some(url) = get_test_db_url() else {
746            eprintln!("Skipping: SUBCOG_TEST_POSTGRES_URL not set");
747            return;
748        };
749
750        let table = unique_table_name();
751        let backend = PostgresBackend::new(&url, &table).expect("Failed to create backend");
752
753        // Initially empty
754        let ids = backend.list_ids().expect("Failed to list");
755        assert!(ids.is_empty());
756
757        // Add some memories
758        for i in 1..=3 {
759            let memory = create_test_memory(&format!("list-test-{i}"));
760            backend.store(&memory).expect("Failed to store");
761        }
762
763        let ids = backend.list_ids().expect("Failed to list");
764        assert_eq!(ids.len(), 3);
765    }
766
767    #[test]
768    fn test_memory_with_embedding() {
769        let Some(url) = get_test_db_url() else {
770            eprintln!("Skipping: SUBCOG_TEST_POSTGRES_URL not set");
771            return;
772        };
773
774        let table = unique_table_name();
775        let backend = PostgresBackend::new(&url, &table).expect("Failed to create backend");
776
777        let mut memory = create_test_memory("test-embedding");
778        memory.embedding = Some(vec![0.1, 0.2, 0.3, 0.4, 0.5]);
779
780        backend.store(&memory).expect("Failed to store");
781
782        let retrieved = backend
783            .get(&MemoryId::new("test-embedding"))
784            .expect("Failed to get")
785            .expect("Memory not found");
786
787        assert!(retrieved.embedding.is_some());
788        let emb = retrieved.embedding.unwrap();
789        assert_eq!(emb.len(), 5);
790        assert!((emb[0] - 0.1).abs() < f32::EPSILON);
791    }
792
793    #[test]
794    fn test_all_namespaces() {
795        let Some(url) = get_test_db_url() else {
796            eprintln!("Skipping: SUBCOG_TEST_POSTGRES_URL not set");
797            return;
798        };
799
800        let table = unique_table_name();
801        let backend = PostgresBackend::new(&url, &table).expect("Failed to create backend");
802
803        let namespaces = [
804            Namespace::Decisions,
805            Namespace::Patterns,
806            Namespace::Learnings,
807            Namespace::Context,
808            Namespace::TechDebt,
809            Namespace::Apis,
810            Namespace::Config,
811            Namespace::Security,
812            Namespace::Performance,
813            Namespace::Testing,
814        ];
815
816        for (i, ns) in namespaces.iter().enumerate() {
817            let mut memory = create_test_memory(&format!("ns-test-{i}"));
818            memory.namespace = *ns;
819            backend.store(&memory).expect("Failed to store");
820
821            let retrieved = backend
822                .get(&MemoryId::new(format!("ns-test-{i}")))
823                .expect("Failed to get")
824                .expect("Memory not found");
825
826            assert_eq!(retrieved.namespace, *ns);
827        }
828    }
829}
830
831#[cfg(all(test, not(feature = "postgres")))]
832mod stub_tests {
833    use super::*;
834    use crate::models::{Domain, Memory, MemoryId, MemoryStatus, Namespace};
835    use crate::storage::traits::PersistenceBackend;
836
837    fn create_test_memory() -> Memory {
838        Memory {
839            id: MemoryId::new("test-id"),
840            content: "Test content".to_string(),
841            namespace: Namespace::Decisions,
842            domain: Domain::new(),
843            project_id: None,
844            branch: None,
845            file_path: None,
846            status: MemoryStatus::Active,
847            created_at: 1_700_000_000,
848            updated_at: 1_700_000_000,
849            tombstoned_at: None,
850            expires_at: None,
851            embedding: None,
852            tags: vec![],
853            #[cfg(feature = "group-scope")]
854            group_id: None,
855            source: None,
856            is_summary: false,
857            source_memory_ids: None,
858            consolidation_timestamp: None,
859        }
860    }
861
862    #[test]
863    fn test_stub_store_returns_not_implemented() {
864        let backend = PostgresBackend::with_defaults();
865        let memory = create_test_memory();
866        let result = backend.store(&memory);
867        assert!(result.is_err());
868        assert!(matches!(
869            result.unwrap_err(),
870            crate::Error::NotImplemented(_)
871        ));
872    }
873
874    #[test]
875    fn test_stub_get_returns_not_implemented() {
876        let backend = PostgresBackend::with_defaults();
877        let result = backend.get(&MemoryId::new("test"));
878        assert!(result.is_err());
879        assert!(matches!(
880            result.unwrap_err(),
881            crate::Error::NotImplemented(_)
882        ));
883    }
884
885    #[test]
886    fn test_stub_delete_returns_not_implemented() {
887        let backend = PostgresBackend::with_defaults();
888        let result = backend.delete(&MemoryId::new("test"));
889        assert!(result.is_err());
890        assert!(matches!(
891            result.unwrap_err(),
892            crate::Error::NotImplemented(_)
893        ));
894    }
895
896    #[test]
897    fn test_stub_list_ids_returns_not_implemented() {
898        let backend = PostgresBackend::with_defaults();
899        let result = backend.list_ids();
900        assert!(result.is_err());
901        assert!(matches!(
902            result.unwrap_err(),
903            crate::Error::NotImplemented(_)
904        ));
905    }
906
907    #[test]
908    fn test_stub_new_creates_instance() {
909        // Stub constructor always succeeds (returns stub, not Result)
910        let _backend = PostgresBackend::new("postgresql://custom", "custom_table");
911    }
912
913    #[test]
914    fn test_stub_with_defaults_creates_instance() {
915        // with_defaults() always succeeds (returns stub, not Result)
916        let _backend = PostgresBackend::with_defaults();
917    }
918}