subcog/storage/index/
org_router.rs1use crate::config::OrgBackendConfig;
20use crate::storage::traits::IndexBackend;
21use crate::{Error, Result};
22use std::fmt;
23use std::path::PathBuf;
24use std::sync::Arc;
25
26use super::SqliteBackend;
27
28#[cfg(feature = "postgres")]
29use super::PostgresIndexBackend;
30
31#[derive(Clone)]
37pub struct OrgIndexRouter {
38 backend: Arc<dyn IndexBackend + Send + Sync>,
40 backend_type: OrgBackendType,
42 backend_location: String,
44}
45
46impl fmt::Debug for OrgIndexRouter {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 f.debug_struct("OrgIndexRouter")
49 .field("backend_type", &self.backend_type)
50 .field("backend_location", &self.backend_location)
51 .finish_non_exhaustive()
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum OrgBackendType {
58 SqliteShared,
60 Postgresql,
62}
63
64impl OrgBackendType {
65 #[must_use]
67 pub const fn as_str(&self) -> &'static str {
68 match self {
69 Self::SqliteShared => "sqlite",
70 Self::Postgresql => "postgresql",
71 }
72 }
73}
74
75impl OrgIndexRouter {
76 pub fn new(config: &OrgBackendConfig) -> Result<Self> {
89 match config {
90 OrgBackendConfig::SqliteShared { path } => Self::new_sqlite(path),
91 OrgBackendConfig::Postgresql {
92 connection_url,
93 max_connections,
94 timeout_secs,
95 } => Self::new_postgresql(connection_url, *max_connections, *timeout_secs),
96 OrgBackendConfig::None => Err(Error::InvalidInput(
97 "Cannot create org index router with no backend configured".to_string(),
98 )),
99 }
100 }
101
102 fn new_sqlite(path: &PathBuf) -> Result<Self> {
104 if let Some(parent) = path.parent() {
106 std::fs::create_dir_all(parent).map_err(|e| Error::OperationFailed {
107 operation: "create_org_index_dir".to_string(),
108 cause: e.to_string(),
109 })?;
110 }
111
112 let backend = SqliteBackend::new(path)?;
113
114 Ok(Self {
115 backend: Arc::new(backend),
116 backend_type: OrgBackendType::SqliteShared,
117 backend_location: path.display().to_string(),
118 })
119 }
120
121 #[cfg(feature = "postgres")]
123 fn new_postgresql(
124 connection_url: &str,
125 max_connections: u32,
126 timeout_secs: u64,
127 ) -> Result<Self> {
128 let backend = PostgresIndexBackend::new(connection_url, "org_memories_index")?;
129
130 tracing::debug!(
132 max_connections = max_connections,
133 timeout_secs = timeout_secs,
134 "Initialized PostgreSQL org index backend"
135 );
136
137 Ok(Self {
138 backend: Arc::new(backend),
139 backend_type: OrgBackendType::Postgresql,
140 backend_location: sanitize_connection_url(connection_url),
141 })
142 }
143
144 #[cfg(not(feature = "postgres"))]
146 fn new_postgresql(
147 _connection_url: &str,
148 _max_connections: u32,
149 _timeout_secs: u64,
150 ) -> Result<Self> {
151 Err(Error::NotImplemented(
152 "PostgreSQL org backend requires the 'postgres' feature flag".to_string(),
153 ))
154 }
155
156 #[must_use]
158 pub fn backend(&self) -> Arc<dyn IndexBackend + Send + Sync> {
159 Arc::clone(&self.backend)
160 }
161
162 #[must_use]
164 pub const fn backend_type(&self) -> OrgBackendType {
165 self.backend_type
166 }
167
168 #[must_use]
170 pub fn backend_location(&self) -> &str {
171 &self.backend_location
172 }
173
174 #[must_use]
176 pub fn status(&self) -> OrgIndexStatus {
177 OrgIndexStatus {
178 backend_type: self.backend_type,
179 location: self.backend_location.clone(),
180 connected: true, }
182 }
183}
184
185#[derive(Debug, Clone)]
187pub struct OrgIndexStatus {
188 pub backend_type: OrgBackendType,
190 pub location: String,
192 pub connected: bool,
194}
195
196#[cfg(feature = "postgres")]
200fn sanitize_connection_url(url: &str) -> String {
201 reqwest::Url::parse(url).map_or_else(
203 |_| {
204 if url.starts_with("postgresql://") {
206 "postgresql://***".to_string()
207 } else if url.starts_with("postgres://") {
208 "postgres://***".to_string()
209 } else {
210 "***".to_string()
211 }
212 },
213 |mut parsed| {
214 if parsed.password().is_some() {
215 let _ = parsed.set_password(Some("***"));
216 }
217 parsed.to_string()
218 },
219 )
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use tempfile::TempDir;
226
227 #[test]
228 fn test_org_backend_type_as_str() {
229 assert_eq!(OrgBackendType::SqliteShared.as_str(), "sqlite");
230 assert_eq!(OrgBackendType::Postgresql.as_str(), "postgresql");
231 }
232
233 #[test]
234 fn test_new_sqlite_creates_directory() {
235 let dir = TempDir::new().unwrap();
236 let path = dir.path().join("org").join("subdir").join("index.db");
237
238 let config = OrgBackendConfig::SqliteShared { path };
239 let router = OrgIndexRouter::new(&config).unwrap();
240
241 assert_eq!(router.backend_type(), OrgBackendType::SqliteShared);
242 assert!(!router.backend_location().is_empty());
244 }
245
246 #[test]
247 fn test_new_none_returns_error() {
248 let config = OrgBackendConfig::None;
249 let result = OrgIndexRouter::new(&config);
250 assert!(result.is_err());
251 }
252
253 #[test]
254 fn test_status() {
255 let dir = TempDir::new().unwrap();
256 let path = dir.path().join("index.db");
257
258 let config = OrgBackendConfig::SqliteShared { path };
259 let router = OrgIndexRouter::new(&config).unwrap();
260
261 let status = router.status();
262 assert_eq!(status.backend_type, OrgBackendType::SqliteShared);
263 assert!(status.connected);
264 }
265
266 #[cfg(feature = "postgres")]
267 #[test]
268 fn test_sanitize_connection_url() {
269 let url = "postgresql://user:secret@localhost:5432/subcog";
270 let sanitized = sanitize_connection_url(url);
271 assert!(sanitized.contains("***"));
272 assert!(!sanitized.contains("secret"));
273 }
274
275 #[cfg(not(feature = "postgres"))]
276 #[test]
277 fn test_postgres_requires_feature() {
278 let config = OrgBackendConfig::Postgresql {
279 connection_url: "postgresql://localhost/test".to_string(),
280 max_connections: 10,
281 timeout_secs: 30,
282 };
283 let result = OrgIndexRouter::new(&config);
284 assert!(result.is_err());
285 assert!(
286 result
287 .unwrap_err()
288 .to_string()
289 .contains("postgres' feature flag")
290 );
291 }
292}