Skip to main content

subcog/storage/
resilience.rs

1//! Storage resilience wrapper with circuit breaking.
2//!
3//! Provides circuit breaker protection for storage backends to prevent cascade failures
4//! when backends become unhealthy.
5//!
6//! # Circuit Breaker States
7//!
8//! ```text
9//! +--------+     failures >= threshold     +------+
10//! | Closed | --------------------------->  | Open |
11//! +--------+                               +------+
12//!     ^                                        |
13//!     |  success                               | timeout elapsed
14//!     |                                        v
15//!     +--------------------------------  +-----------+
16//!                                        | Half-Open |
17//!                                        +-----------+
18//! ```
19//!
20//! # Usage
21//!
22//! ```rust,ignore
23//! use subcog::storage::resilience::{StorageResilienceConfig, ResilientPersistenceBackend};
24//! use subcog::storage::index::SqliteBackend;
25//!
26//! let backend = SqliteBackend::new(db_path)?;
27//! let config = StorageResilienceConfig::default();
28//! let resilient = ResilientPersistenceBackend::new(backend, config, "sqlite");
29//!
30//! // Operations are now protected by circuit breaker
31//! resilient.store(&memory)?;
32//! ```
33
34use crate::{Error, Result};
35use std::sync::Mutex;
36use std::time::{Duration, Instant};
37
38/// Resilience configuration for storage backends.
39#[derive(Debug, Clone)]
40pub struct StorageResilienceConfig {
41    /// Maximum number of retries for retryable failures (CHAOS-HIGH-003).
42    pub max_retries: u32,
43    /// Base backoff between retries in milliseconds (exponential with jitter).
44    pub retry_backoff_ms: u64,
45    /// Consecutive failures before opening the circuit.
46    pub breaker_failure_threshold: u32,
47    /// How long to keep the circuit open before half-open.
48    pub breaker_reset_timeout_ms: u64,
49    /// Maximum trial calls while half-open.
50    pub breaker_half_open_max_calls: u32,
51}
52
53impl Default for StorageResilienceConfig {
54    fn default() -> Self {
55        Self {
56            max_retries: 3,
57            retry_backoff_ms: 100,
58            breaker_failure_threshold: 5,
59            breaker_reset_timeout_ms: 30_000,
60            breaker_half_open_max_calls: 1,
61        }
62    }
63}
64
65impl StorageResilienceConfig {
66    /// Loads resilience configuration from environment variables.
67    #[must_use]
68    pub fn from_env() -> Self {
69        Self::default().with_env_overrides()
70    }
71
72    /// Applies environment variable overrides.
73    #[must_use]
74    pub fn with_env_overrides(mut self) -> Self {
75        // Retry configuration (CHAOS-HIGH-003)
76        if let Ok(v) = std::env::var("SUBCOG_STORAGE_MAX_RETRIES")
77            && let Ok(parsed) = v.parse::<u32>()
78        {
79            self.max_retries = parsed;
80        }
81        if let Ok(v) = std::env::var("SUBCOG_STORAGE_RETRY_BACKOFF_MS")
82            && let Ok(parsed) = v.parse::<u64>()
83        {
84            self.retry_backoff_ms = parsed;
85        }
86        // Circuit breaker configuration
87        if let Ok(v) = std::env::var("SUBCOG_STORAGE_BREAKER_FAILURE_THRESHOLD")
88            && let Ok(parsed) = v.parse::<u32>()
89        {
90            self.breaker_failure_threshold = parsed.max(1);
91        }
92        if let Ok(v) = std::env::var("SUBCOG_STORAGE_BREAKER_RESET_MS")
93            && let Ok(parsed) = v.parse::<u64>()
94        {
95            self.breaker_reset_timeout_ms = parsed;
96        }
97        if let Ok(v) = std::env::var("SUBCOG_STORAGE_BREAKER_HALF_OPEN_MAX_CALLS")
98            && let Ok(parsed) = v.parse::<u32>()
99        {
100            self.breaker_half_open_max_calls = parsed.max(1);
101        }
102        self
103    }
104
105    /// Sets the maximum number of retries.
106    #[must_use]
107    pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
108        self.max_retries = max_retries;
109        self
110    }
111
112    /// Sets the base retry backoff in milliseconds.
113    #[must_use]
114    pub const fn with_retry_backoff_ms(mut self, backoff_ms: u64) -> Self {
115        self.retry_backoff_ms = backoff_ms;
116        self
117    }
118
119    /// Sets the failure threshold.
120    #[must_use]
121    pub const fn with_failure_threshold(mut self, threshold: u32) -> Self {
122        self.breaker_failure_threshold = threshold;
123        self
124    }
125
126    /// Sets the reset timeout in milliseconds.
127    #[must_use]
128    pub const fn with_reset_timeout_ms(mut self, timeout_ms: u64) -> Self {
129        self.breaker_reset_timeout_ms = timeout_ms;
130        self
131    }
132
133    /// Sets the half-open max calls.
134    #[must_use]
135    pub const fn with_half_open_max_calls(mut self, max_calls: u32) -> Self {
136        self.breaker_half_open_max_calls = max_calls;
137        self
138    }
139}
140
141/// Circuit breaker state machine.
142#[derive(Debug)]
143enum BreakerState {
144    Closed { failures: u32 },
145    Open { opened_at: Instant },
146    HalfOpen { attempts: u32 },
147}
148
149/// Circuit breaker for storage backends.
150#[derive(Debug)]
151pub struct CircuitBreaker {
152    state: BreakerState,
153    failure_threshold: u32,
154    reset_timeout: Duration,
155    half_open_max_calls: u32,
156    backend_name: &'static str,
157}
158
159impl CircuitBreaker {
160    /// Creates a new circuit breaker with the given configuration.
161    #[must_use]
162    pub fn new(config: &StorageResilienceConfig, backend_name: &'static str) -> Self {
163        Self {
164            state: BreakerState::Closed { failures: 0 },
165            failure_threshold: config.breaker_failure_threshold.max(1),
166            reset_timeout: Duration::from_millis(config.breaker_reset_timeout_ms),
167            half_open_max_calls: config.breaker_half_open_max_calls.max(1),
168            backend_name,
169        }
170    }
171
172    /// Checks if a request is allowed through the circuit breaker.
173    ///
174    /// Returns `true` if the request should proceed, `false` if rejected.
175    pub fn allow(&mut self) -> bool {
176        match self.state {
177            BreakerState::Closed { .. } => true,
178            BreakerState::Open { opened_at } => {
179                if opened_at.elapsed() >= self.reset_timeout {
180                    tracing::info!(
181                        backend = self.backend_name,
182                        "Circuit breaker transitioning to half-open"
183                    );
184                    self.state = BreakerState::HalfOpen { attempts: 0 };
185                    true
186                } else {
187                    false
188                }
189            },
190            BreakerState::HalfOpen { ref mut attempts } => {
191                if *attempts >= self.half_open_max_calls {
192                    false
193                } else {
194                    *attempts += 1;
195                    true
196                }
197            },
198        }
199    }
200
201    /// Records a successful operation, potentially closing the circuit.
202    pub fn on_success(&mut self) {
203        if !matches!(self.state, BreakerState::Closed { failures: 0 }) {
204            tracing::info!(
205                backend = self.backend_name,
206                "Circuit breaker closing after success"
207            );
208        }
209        self.state = BreakerState::Closed { failures: 0 };
210    }
211
212    /// Records a failed operation, potentially opening the circuit.
213    ///
214    /// Returns `true` if the circuit just opened (tripped).
215    pub fn on_failure(&mut self) -> bool {
216        match self.state {
217            BreakerState::Closed { ref mut failures } => {
218                *failures += 1;
219                if *failures >= self.failure_threshold {
220                    tracing::warn!(
221                        backend = self.backend_name,
222                        failures = *failures,
223                        threshold = self.failure_threshold,
224                        "Circuit breaker opened after consecutive failures"
225                    );
226                    self.state = BreakerState::Open {
227                        opened_at: Instant::now(),
228                    };
229                    return true;
230                }
231            },
232            BreakerState::HalfOpen { .. } => {
233                tracing::warn!(
234                    backend = self.backend_name,
235                    "Circuit breaker re-opened after half-open failure"
236                );
237                self.state = BreakerState::Open {
238                    opened_at: Instant::now(),
239                };
240                return true;
241            },
242            BreakerState::Open { .. } => {},
243        }
244        false
245    }
246
247    /// Returns the current state as a numeric value for metrics.
248    ///
249    /// - 0: Closed
250    /// - 1: Open
251    /// - 2: Half-Open
252    #[must_use]
253    pub const fn state_value(&self) -> u8 {
254        match self.state {
255            BreakerState::Closed { .. } => 0,
256            BreakerState::Open { .. } => 1,
257            BreakerState::HalfOpen { .. } => 2,
258        }
259    }
260
261    /// Returns the backend name.
262    #[must_use]
263    pub const fn backend_name(&self) -> &'static str {
264        self.backend_name
265    }
266}
267
268// ============================================================================
269// Retry Helper Functions (CHAOS-HIGH-003)
270// ============================================================================
271
272/// Calculates retry delay with exponential backoff and jitter.
273///
274/// Formula: `base_delay * 2^(attempt-1) + jitter`
275/// - Exponential growth prevents overwhelming a recovering service
276/// - Jitter (0-50% of delay) prevents thundering herd problem
277/// - Maximum delay capped at 10 seconds
278fn calculate_retry_delay(base_delay_ms: u64, attempt: u32) -> u64 {
279    // Exponential: base * 2^(attempt-1), so attempt 1 = base, attempt 2 = 2x, attempt 3 = 4x
280    let exponent = attempt.saturating_sub(1);
281    let exponential_delay = base_delay_ms.saturating_mul(1u64 << exponent.min(10));
282
283    // Cap at 10 seconds max delay
284    let capped_delay = exponential_delay.min(10_000);
285
286    // Add jitter: use system time nanoseconds as pseudo-random source
287    // Jitter range: 0-50% of capped delay to prevent thundering herd
288    let jitter = calculate_jitter(capped_delay);
289    let total_delay = capped_delay.saturating_add(jitter);
290
291    tracing::debug!(
292        "Storage retry backoff: attempt={}, base={}ms, exponential={}ms, jitter={}ms, total={}ms",
293        attempt,
294        base_delay_ms,
295        capped_delay,
296        jitter,
297        total_delay
298    );
299
300    total_delay
301}
302
303/// Calculates jitter for retry backoff using system time as pseudo-random source.
304fn calculate_jitter(delay_ms: u64) -> u64 {
305    let jitter_max = delay_ms / 2;
306    if jitter_max == 0 {
307        return 0;
308    }
309
310    let nanos = std::time::SystemTime::now()
311        .duration_since(std::time::UNIX_EPOCH)
312        .map(|d| d.subsec_nanos())
313        .unwrap_or(0);
314
315    u64::from(nanos) % jitter_max
316}
317
318/// Checks if a storage error is retryable (transient failures that may succeed on retry).
319///
320/// Retryable errors include:
321/// - Timeouts
322/// - Connection errors (network issues, DNS failures)
323/// - Lock/busy errors (database locked, pool exhausted)
324/// - Temporary I/O errors
325#[must_use]
326pub fn is_retryable_storage_error(err: &Error) -> bool {
327    match err {
328        Error::OperationFailed { cause, .. } => {
329            let lower = cause.to_lowercase();
330            // Timeout errors
331            lower.contains("timeout")
332                || lower.contains("timed out")
333                || lower.contains("deadline")
334                || lower.contains("elapsed")
335                // Connection errors
336                || lower.contains("connect")
337                || lower.contains("connection")
338                || lower.contains("network")
339                || lower.contains("dns")
340                || lower.contains("resolve")
341                // Database lock/busy errors
342                || lower.contains("locked")
343                || lower.contains("busy")
344                || lower.contains("pool")
345                || lower.contains("exhausted")
346                // Temporary I/O errors
347                || lower.contains("temporary")
348                || lower.contains("try again")
349                || lower.contains("interrupted")
350        },
351        _ => false,
352    }
353}
354
355// ============================================================================
356// Connection Retry Helper (CHAOS-HIGH-003)
357// ============================================================================
358
359/// Executes a connection operation with retry and exponential backoff.
360///
361/// This function is designed for initial connection establishment where transient
362/// failures (network issues, database starting up, etc.) should be retried.
363///
364/// # Arguments
365///
366/// * `config` - Resilience configuration with retry settings
367/// * `backend_name` - Name of the backend for logging
368/// * `operation` - Description of the operation for error messages
369/// * `connect_fn` - The connection function to retry
370///
371/// # Example
372///
373/// ```rust,ignore
374/// use subcog::storage::resilience::{retry_connection, StorageResilienceConfig};
375///
376/// let pool = retry_connection(
377///     &StorageResilienceConfig::default(),
378///     "postgres",
379///     "create_pool",
380///     || create_postgres_pool(connection_url),
381/// )?;
382/// ```
383///
384/// # Errors
385///
386/// Returns an error if all retry attempts are exhausted without success.
387pub fn retry_connection<T, F>(
388    config: &StorageResilienceConfig,
389    backend_name: &str,
390    operation: &str,
391    mut connect_fn: F,
392) -> Result<T>
393where
394    F: FnMut() -> Result<T>,
395{
396    let max_attempts = config.max_retries + 1;
397    let mut attempts = 0;
398    let mut last_error = None;
399
400    while attempts < max_attempts {
401        attempts += 1;
402
403        match connect_fn() {
404            Ok(result) => {
405                if attempts > 1 {
406                    tracing::info!(
407                        backend = backend_name,
408                        operation = operation,
409                        attempts = attempts,
410                        "Connection succeeded after retries"
411                    );
412                }
413                return Ok(result);
414            },
415            Err(err) => {
416                let retryable = is_retryable_connection_error(&err) && attempts < max_attempts;
417
418                if !retryable {
419                    tracing::warn!(
420                        backend = backend_name,
421                        operation = operation,
422                        attempts = attempts,
423                        error = %err,
424                        "Connection failed with non-retryable error"
425                    );
426                    return Err(err);
427                }
428
429                let delay_ms = calculate_retry_delay(config.retry_backoff_ms, attempts);
430                tracing::debug!(
431                    backend = backend_name,
432                    operation = operation,
433                    attempt = attempts,
434                    max_attempts = max_attempts,
435                    delay_ms = delay_ms,
436                    error = %err,
437                    "Connection failed, retrying with backoff"
438                );
439
440                metrics::counter!(
441                    "storage_connection_retries_total",
442                    "backend" => backend_name.to_string(),
443                    "operation" => operation.to_string()
444                )
445                .increment(1);
446
447                std::thread::sleep(Duration::from_millis(delay_ms));
448                last_error = Some(err);
449            },
450        }
451    }
452
453    let err = last_error.unwrap_or_else(|| Error::OperationFailed {
454        operation: format!("{backend_name}_{operation}"),
455        cause: "exhausted connection retries".to_string(),
456    });
457
458    tracing::error!(
459        backend = backend_name,
460        operation = operation,
461        max_attempts = max_attempts,
462        error = %err,
463        "Connection failed after exhausting all retries"
464    );
465
466    Err(err)
467}
468
469/// Checks if a connection error is retryable.
470///
471/// Connection-specific retryable errors include:
472/// - Connection refused (server starting up)
473/// - Network unreachable
474/// - DNS resolution failures
475/// - Timeouts
476/// - Pool creation failures
477fn is_retryable_connection_error(err: &Error) -> bool {
478    match err {
479        Error::OperationFailed { cause, .. } => {
480            let lower = cause.to_lowercase();
481            // Connection establishment errors
482            lower.contains("connection refused")
483                || lower.contains("connection reset")
484                || lower.contains("connection timed out")
485                || lower.contains("network")
486                || lower.contains("unreachable")
487                || lower.contains("dns")
488                || lower.contains("resolve")
489                || lower.contains("timeout")
490                || lower.contains("timed out")
491                || lower.contains("pool")
492                || lower.contains("exhausted")
493                // Database not ready errors
494                || lower.contains("not ready")
495                || lower.contains("starting")
496                || lower.contains("unavailable")
497                || lower.contains("service")
498                // Generic transient errors
499                || lower.contains("temporary")
500                || lower.contains("try again")
501                || lower.contains("econnrefused")
502                || lower.contains("etimedout")
503        },
504        _ => false,
505    }
506}
507
508// ============================================================================
509// Resilient Persistence Backend
510// ============================================================================
511
512use super::traits::PersistenceBackend;
513use crate::models::{Memory, MemoryId};
514
515/// Persistence backend wrapper with circuit breaker and retry protection.
516pub struct ResilientPersistenceBackend<P: PersistenceBackend> {
517    inner: P,
518    config: StorageResilienceConfig,
519    breaker: Mutex<CircuitBreaker>,
520    backend_name: &'static str,
521}
522
523impl<P: PersistenceBackend> ResilientPersistenceBackend<P> {
524    /// Creates a new resilient persistence backend wrapper.
525    #[must_use]
526    pub fn new(inner: P, config: StorageResilienceConfig, backend_name: &'static str) -> Self {
527        Self {
528            inner,
529            breaker: Mutex::new(CircuitBreaker::new(&config, backend_name)),
530            config,
531            backend_name,
532        }
533    }
534
535    fn execute<T, F>(&self, operation: &'static str, mut call: F) -> Result<T>
536    where
537        F: FnMut() -> Result<T>,
538    {
539        let max_attempts = self.config.max_retries + 1;
540        let mut attempts = 0;
541        let mut last_error = None;
542
543        while attempts < max_attempts {
544            attempts += 1;
545
546            if let Some(err) = self.check_circuit_breaker(operation) {
547                return Err(err);
548            }
549
550            match call() {
551                Ok(value) => return Ok(self.handle_success(operation, value)),
552                Err(err) => {
553                    last_error = self.handle_error(operation, err, attempts, max_attempts)?;
554                },
555            }
556        }
557
558        Err(last_error.unwrap_or_else(|| Error::OperationFailed {
559            operation: format!("storage_{operation}"),
560            cause: "exhausted retries".to_string(),
561        }))
562    }
563
564    /// Checks circuit breaker and returns error if open.
565    fn check_circuit_breaker(&self, operation: &'static str) -> Option<Error> {
566        let mut breaker = self
567            .breaker
568            .lock()
569            .unwrap_or_else(std::sync::PoisonError::into_inner);
570
571        if !breaker.allow() {
572            let state = breaker.state_value();
573            drop(breaker);
574            Self::record_metrics(self.backend_name, operation, "circuit_open", state);
575            return Some(Error::OperationFailed {
576                operation: format!("storage_{operation}"),
577                cause: format!("circuit breaker open for backend '{}'", self.backend_name),
578            });
579        }
580        None
581    }
582
583    /// Handles successful operation result.
584    fn handle_success<T>(&self, operation: &'static str, value: T) -> T {
585        let mut breaker = self
586            .breaker
587            .lock()
588            .unwrap_or_else(std::sync::PoisonError::into_inner);
589        breaker.on_success();
590        let state = breaker.state_value();
591        drop(breaker);
592        Self::record_metrics(self.backend_name, operation, "success", state);
593        value
594    }
595
596    /// Handles operation error. Returns Ok(Some(err)) to continue retrying, Err to stop.
597    fn handle_error(
598        &self,
599        operation: &'static str,
600        err: Error,
601        attempts: u32,
602        max_attempts: u32,
603    ) -> Result<Option<Error>> {
604        let retryable = is_retryable_storage_error(&err) && attempts < max_attempts;
605
606        let mut breaker = self
607            .breaker
608            .lock()
609            .unwrap_or_else(std::sync::PoisonError::into_inner);
610        let tripped = breaker.on_failure();
611        let state = breaker.state_value();
612        drop(breaker);
613
614        Self::record_metrics(self.backend_name, operation, "error", state);
615
616        if tripped {
617            Self::record_circuit_trip(self.backend_name, operation);
618        }
619
620        if !retryable {
621            return Err(err);
622        }
623
624        Self::log_retry_attempt(self.backend_name, operation, attempts, max_attempts, &err);
625        self.apply_retry_backoff(attempts);
626        Ok(Some(err))
627    }
628
629    /// Applies retry backoff delay if configured.
630    fn apply_retry_backoff(&self, attempts: u32) {
631        if self.config.retry_backoff_ms > 0 {
632            let delay = calculate_retry_delay(self.config.retry_backoff_ms, attempts);
633            std::thread::sleep(Duration::from_millis(delay));
634        }
635    }
636
637    fn record_metrics(
638        backend: &'static str,
639        operation: &'static str,
640        status: &'static str,
641        state: u8,
642    ) {
643        metrics::counter!(
644            "storage_requests_total",
645            "backend" => backend,
646            "operation" => operation,
647            "status" => status
648        )
649        .increment(1);
650        metrics::gauge!(
651            "storage_circuit_breaker_state",
652            "backend" => backend
653        )
654        .set(f64::from(state));
655    }
656
657    /// Records metrics and logs when circuit breaker trips.
658    fn record_circuit_trip(backend: &'static str, operation: &'static str) {
659        metrics::counter!(
660            "storage_circuit_breaker_trips_total",
661            "backend" => backend,
662            "operation" => operation
663        )
664        .increment(1);
665        tracing::warn!(
666            backend = backend,
667            operation = operation,
668            "Storage circuit breaker opened"
669        );
670    }
671
672    /// Records metrics and logs for retry attempts.
673    fn log_retry_attempt(
674        backend: &'static str,
675        operation: &'static str,
676        attempt: u32,
677        max_attempts: u32,
678        err: &Error,
679    ) {
680        metrics::counter!(
681            "storage_retries_total",
682            "backend" => backend,
683            "operation" => operation
684        )
685        .increment(1);
686        tracing::debug!(
687            backend = backend,
688            operation = operation,
689            attempt = attempt,
690            max_attempts = max_attempts,
691            error = %err,
692            "Retrying storage operation"
693        );
694    }
695}
696
697impl<P: PersistenceBackend> PersistenceBackend for ResilientPersistenceBackend<P> {
698    fn store(&self, memory: &Memory) -> Result<()> {
699        self.execute("store", || self.inner.store(memory))
700    }
701
702    fn get(&self, id: &MemoryId) -> Result<Option<Memory>> {
703        self.execute("get", || self.inner.get(id))
704    }
705
706    fn delete(&self, id: &MemoryId) -> Result<bool> {
707        self.execute("delete", || self.inner.delete(id))
708    }
709
710    fn list_ids(&self) -> Result<Vec<MemoryId>> {
711        self.execute("list_ids", || self.inner.list_ids())
712    }
713
714    fn exists(&self, id: &MemoryId) -> Result<bool> {
715        self.execute("exists", || self.inner.exists(id))
716    }
717
718    fn count(&self) -> Result<usize> {
719        self.execute("count", || self.inner.count())
720    }
721}
722
723// ============================================================================
724// Resilient Index Backend
725// ============================================================================
726
727use super::traits::IndexBackend;
728use crate::models::SearchFilter;
729
730/// Index backend wrapper with circuit breaker and retry protection.
731pub struct ResilientIndexBackend<I: IndexBackend> {
732    inner: I,
733    config: StorageResilienceConfig,
734    breaker: Mutex<CircuitBreaker>,
735    backend_name: &'static str,
736}
737
738impl<I: IndexBackend> ResilientIndexBackend<I> {
739    /// Creates a new resilient index backend wrapper.
740    #[must_use]
741    pub fn new(inner: I, config: StorageResilienceConfig, backend_name: &'static str) -> Self {
742        Self {
743            inner,
744            breaker: Mutex::new(CircuitBreaker::new(&config, backend_name)),
745            config,
746            backend_name,
747        }
748    }
749
750    fn execute<T, F>(&self, operation: &'static str, mut call: F) -> Result<T>
751    where
752        F: FnMut() -> Result<T>,
753    {
754        let max_attempts = self.config.max_retries + 1;
755        let mut attempts = 0;
756        let mut last_error = None;
757
758        while attempts < max_attempts {
759            attempts += 1;
760
761            if let Some(err) = self.check_circuit_breaker(operation) {
762                return Err(err);
763            }
764
765            match call() {
766                Ok(value) => return Ok(self.handle_success(operation, value)),
767                Err(err) => {
768                    last_error = self.handle_error(operation, err, attempts, max_attempts)?;
769                },
770            }
771        }
772
773        Err(last_error.unwrap_or_else(|| Error::OperationFailed {
774            operation: format!("index_{operation}"),
775            cause: "exhausted retries".to_string(),
776        }))
777    }
778
779    /// Checks circuit breaker and returns error if open.
780    fn check_circuit_breaker(&self, operation: &'static str) -> Option<Error> {
781        let mut breaker = self
782            .breaker
783            .lock()
784            .unwrap_or_else(std::sync::PoisonError::into_inner);
785
786        if !breaker.allow() {
787            let state = breaker.state_value();
788            drop(breaker);
789            Self::record_metrics(self.backend_name, operation, "circuit_open", state);
790            return Some(Error::OperationFailed {
791                operation: format!("index_{operation}"),
792                cause: format!("circuit breaker open for backend '{}'", self.backend_name),
793            });
794        }
795        None
796    }
797
798    /// Handles successful operation result.
799    fn handle_success<T>(&self, operation: &'static str, value: T) -> T {
800        let mut breaker = self
801            .breaker
802            .lock()
803            .unwrap_or_else(std::sync::PoisonError::into_inner);
804        breaker.on_success();
805        let state = breaker.state_value();
806        drop(breaker);
807        Self::record_metrics(self.backend_name, operation, "success", state);
808        value
809    }
810
811    /// Handles operation error. Returns Ok(Some(err)) to continue retrying, Err to stop.
812    fn handle_error(
813        &self,
814        operation: &'static str,
815        err: Error,
816        attempts: u32,
817        max_attempts: u32,
818    ) -> Result<Option<Error>> {
819        let retryable = is_retryable_storage_error(&err) && attempts < max_attempts;
820
821        let mut breaker = self
822            .breaker
823            .lock()
824            .unwrap_or_else(std::sync::PoisonError::into_inner);
825        let tripped = breaker.on_failure();
826        let state = breaker.state_value();
827        drop(breaker);
828
829        Self::record_metrics(self.backend_name, operation, "error", state);
830
831        if tripped {
832            Self::record_circuit_trip(self.backend_name, operation);
833        }
834
835        if !retryable {
836            return Err(err);
837        }
838
839        Self::log_retry(self.backend_name, operation);
840        self.apply_retry_backoff(attempts);
841        Ok(Some(err))
842    }
843
844    /// Applies retry backoff delay if configured.
845    fn apply_retry_backoff(&self, attempts: u32) {
846        if self.config.retry_backoff_ms > 0 {
847            let delay = calculate_retry_delay(self.config.retry_backoff_ms, attempts);
848            std::thread::sleep(Duration::from_millis(delay));
849        }
850    }
851
852    fn record_metrics(
853        backend: &'static str,
854        operation: &'static str,
855        status: &'static str,
856        state: u8,
857    ) {
858        metrics::counter!(
859            "storage_requests_total",
860            "backend" => backend,
861            "operation" => operation,
862            "status" => status
863        )
864        .increment(1);
865        metrics::gauge!(
866            "storage_circuit_breaker_state",
867            "backend" => backend
868        )
869        .set(f64::from(state));
870    }
871
872    /// Records metrics and logs when circuit breaker trips.
873    fn record_circuit_trip(backend: &'static str, operation: &'static str) {
874        metrics::counter!(
875            "storage_circuit_breaker_trips_total",
876            "backend" => backend,
877            "operation" => operation
878        )
879        .increment(1);
880        tracing::warn!(
881            backend = backend,
882            operation = operation,
883            "Index circuit breaker opened"
884        );
885    }
886
887    /// Records metrics for retry attempts.
888    fn log_retry(backend: &'static str, operation: &'static str) {
889        metrics::counter!(
890            "storage_retries_total",
891            "backend" => backend,
892            "operation" => operation
893        )
894        .increment(1);
895    }
896}
897
898impl<I: IndexBackend> IndexBackend for ResilientIndexBackend<I> {
899    fn index(&self, memory: &Memory) -> Result<()> {
900        self.execute("index", || self.inner.index(memory))
901    }
902
903    fn remove(&self, id: &MemoryId) -> Result<bool> {
904        self.execute("remove", || self.inner.remove(id))
905    }
906
907    fn search(
908        &self,
909        query: &str,
910        filter: &SearchFilter,
911        limit: usize,
912    ) -> Result<Vec<(MemoryId, f32)>> {
913        self.execute("search", || self.inner.search(query, filter, limit))
914    }
915
916    fn reindex(&self, memories: &[Memory]) -> Result<()> {
917        self.execute("reindex", || self.inner.reindex(memories))
918    }
919
920    fn clear(&self) -> Result<()> {
921        self.execute("clear", || self.inner.clear())
922    }
923
924    fn list_all(&self, filter: &SearchFilter, limit: usize) -> Result<Vec<(MemoryId, f32)>> {
925        self.execute("list_all", || self.inner.list_all(filter, limit))
926    }
927
928    fn get_memory(&self, id: &MemoryId) -> Result<Option<Memory>> {
929        self.execute("get_memory", || self.inner.get_memory(id))
930    }
931
932    fn get_memories_batch(&self, ids: &[MemoryId]) -> Result<Vec<Option<Memory>>> {
933        self.execute("get_memories_batch", || self.inner.get_memories_batch(ids))
934    }
935}
936
937// ============================================================================
938// Resilient Vector Backend
939// ============================================================================
940
941use super::traits::{VectorBackend, VectorFilter};
942
943/// Vector backend wrapper with circuit breaker and retry protection.
944pub struct ResilientVectorBackend<V: VectorBackend> {
945    inner: V,
946    config: StorageResilienceConfig,
947    breaker: Mutex<CircuitBreaker>,
948    backend_name: &'static str,
949}
950
951impl<V: VectorBackend> ResilientVectorBackend<V> {
952    /// Creates a new resilient vector backend wrapper.
953    #[must_use]
954    pub fn new(inner: V, config: StorageResilienceConfig, backend_name: &'static str) -> Self {
955        Self {
956            inner,
957            breaker: Mutex::new(CircuitBreaker::new(&config, backend_name)),
958            config,
959            backend_name,
960        }
961    }
962
963    /// Checks circuit breaker and returns error if open.
964    fn check_circuit_breaker(&self, operation: &'static str) -> Option<Error> {
965        let mut breaker = self
966            .breaker
967            .lock()
968            .unwrap_or_else(std::sync::PoisonError::into_inner);
969
970        if !breaker.allow() {
971            let state = breaker.state_value();
972            drop(breaker);
973            Self::record_metrics(self.backend_name, operation, "circuit_open", state);
974            return Some(Error::OperationFailed {
975                operation: format!("vector_{operation}"),
976                cause: format!("circuit breaker open for backend '{}'", self.backend_name),
977            });
978        }
979        drop(breaker);
980        None
981    }
982
983    /// Handles successful operation result.
984    fn handle_success<T>(&self, operation: &'static str, value: T) -> T {
985        let mut breaker = self
986            .breaker
987            .lock()
988            .unwrap_or_else(std::sync::PoisonError::into_inner);
989        breaker.on_success();
990        let state = breaker.state_value();
991        drop(breaker);
992        Self::record_metrics(self.backend_name, operation, "success", state);
993        value
994    }
995
996    /// Handles operation error. Returns Ok(Some(err)) to continue retrying, Err to stop.
997    fn handle_error(
998        &self,
999        operation: &'static str,
1000        err: Error,
1001        attempts: u32,
1002        max_attempts: u32,
1003    ) -> Result<Option<Error>> {
1004        let retryable = is_retryable_storage_error(&err) && attempts < max_attempts;
1005
1006        let mut breaker = self
1007            .breaker
1008            .lock()
1009            .unwrap_or_else(std::sync::PoisonError::into_inner);
1010        let tripped = breaker.on_failure();
1011        let state = breaker.state_value();
1012        drop(breaker);
1013
1014        Self::record_metrics(self.backend_name, operation, "error", state);
1015
1016        if tripped {
1017            Self::record_circuit_trip(self.backend_name, operation);
1018        }
1019
1020        if !retryable {
1021            return Err(err);
1022        }
1023
1024        self.apply_retry_backoff(operation, attempts, &err);
1025        Ok(Some(err))
1026    }
1027
1028    /// Applies retry backoff delay if configured.
1029    fn apply_retry_backoff(&self, operation: &'static str, attempts: u32, err: &Error) {
1030        let delay_ms = calculate_retry_delay(self.config.retry_backoff_ms, attempts);
1031        Self::log_retry_attempt(self.backend_name, operation, attempts, delay_ms, err);
1032        std::thread::sleep(std::time::Duration::from_millis(delay_ms));
1033    }
1034
1035    fn execute<T, F>(&self, operation: &'static str, mut call: F) -> Result<T>
1036    where
1037        F: FnMut() -> Result<T>,
1038    {
1039        let max_attempts = self.config.max_retries + 1;
1040        let mut attempts = 0;
1041        let mut last_error = None;
1042
1043        while attempts < max_attempts {
1044            attempts += 1;
1045
1046            if let Some(err) = self.check_circuit_breaker(operation) {
1047                return Err(err);
1048            }
1049
1050            match call() {
1051                Ok(value) => return Ok(self.handle_success(operation, value)),
1052                Err(err) => {
1053                    last_error = self.handle_error(operation, err, attempts, max_attempts)?;
1054                },
1055            }
1056        }
1057
1058        Err(last_error.unwrap_or_else(|| Error::OperationFailed {
1059            operation: format!("vector_{operation}"),
1060            cause: "exhausted retries".to_string(),
1061        }))
1062    }
1063
1064    fn record_metrics(
1065        backend: &'static str,
1066        operation: &'static str,
1067        status: &'static str,
1068        state: u8,
1069    ) {
1070        metrics::counter!(
1071            "storage_requests_total",
1072            "backend" => backend,
1073            "operation" => operation,
1074            "status" => status
1075        )
1076        .increment(1);
1077        metrics::gauge!(
1078            "storage_circuit_breaker_state",
1079            "backend" => backend
1080        )
1081        .set(f64::from(state));
1082    }
1083
1084    /// Records metrics and logs when circuit breaker trips.
1085    fn record_circuit_trip(backend: &'static str, operation: &'static str) {
1086        metrics::counter!(
1087            "storage_circuit_breaker_trips_total",
1088            "backend" => backend,
1089            "operation" => operation
1090        )
1091        .increment(1);
1092        tracing::warn!(
1093            backend = backend,
1094            operation = operation,
1095            "Vector circuit breaker opened"
1096        );
1097    }
1098
1099    /// Records metrics and logs for retry attempts.
1100    fn log_retry_attempt(
1101        backend: &'static str,
1102        operation: &'static str,
1103        attempt: u32,
1104        delay_ms: u64,
1105        err: &Error,
1106    ) {
1107        metrics::counter!(
1108            "storage_retries_total",
1109            "backend" => backend,
1110            "operation" => operation
1111        )
1112        .increment(1);
1113        tracing::debug!(
1114            backend = backend,
1115            operation = operation,
1116            attempt = attempt,
1117            delay_ms = delay_ms,
1118            error = %err,
1119            "Retrying vector operation"
1120        );
1121    }
1122}
1123
1124impl<V: VectorBackend> VectorBackend for ResilientVectorBackend<V> {
1125    fn dimensions(&self) -> usize {
1126        // dimensions() is a pure getter, no circuit breaker needed
1127        self.inner.dimensions()
1128    }
1129
1130    fn upsert(&self, id: &MemoryId, embedding: &[f32]) -> Result<()> {
1131        self.execute("upsert", || self.inner.upsert(id, embedding))
1132    }
1133
1134    fn remove(&self, id: &MemoryId) -> Result<bool> {
1135        self.execute("remove", || self.inner.remove(id))
1136    }
1137
1138    fn search(
1139        &self,
1140        query_embedding: &[f32],
1141        filter: &VectorFilter,
1142        limit: usize,
1143    ) -> Result<Vec<(MemoryId, f32)>> {
1144        self.execute("search", || {
1145            self.inner.search(query_embedding, filter, limit)
1146        })
1147    }
1148
1149    fn count(&self) -> Result<usize> {
1150        self.execute("count", || self.inner.count())
1151    }
1152
1153    fn clear(&self) -> Result<()> {
1154        self.execute("clear", || self.inner.clear())
1155    }
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160    use super::*;
1161
1162    // =========================================================================
1163    // Circuit Breaker Tests
1164    // =========================================================================
1165
1166    #[test]
1167    fn test_circuit_breaker_starts_closed() {
1168        let config = StorageResilienceConfig::default();
1169        let breaker = CircuitBreaker::new(&config, "test");
1170        assert_eq!(breaker.state_value(), 0); // Closed = 0
1171    }
1172
1173    #[test]
1174    fn test_circuit_breaker_allows_calls_when_closed() {
1175        let config = StorageResilienceConfig::default();
1176        let mut breaker = CircuitBreaker::new(&config, "test");
1177        assert!(breaker.allow());
1178        assert!(breaker.allow());
1179        assert!(breaker.allow());
1180    }
1181
1182    #[test]
1183    fn test_circuit_breaker_opens_after_threshold_failures() {
1184        let config = StorageResilienceConfig {
1185            breaker_failure_threshold: 3,
1186            ..Default::default()
1187        };
1188        let mut breaker = CircuitBreaker::new(&config, "test");
1189
1190        // First two failures don't trip the breaker
1191        breaker.on_failure();
1192        assert_eq!(breaker.state_value(), 0); // Still closed
1193        breaker.on_failure();
1194        assert_eq!(breaker.state_value(), 0); // Still closed
1195
1196        // Third failure trips the breaker
1197        let tripped = breaker.on_failure();
1198        assert!(tripped);
1199        assert_eq!(breaker.state_value(), 1); // Open = 1
1200    }
1201
1202    #[test]
1203    fn test_circuit_breaker_rejects_when_open() {
1204        let config = StorageResilienceConfig {
1205            breaker_failure_threshold: 1,
1206            breaker_reset_timeout_ms: 10_000, // Long timeout
1207            ..Default::default()
1208        };
1209        let mut breaker = CircuitBreaker::new(&config, "test");
1210
1211        // Trip the breaker
1212        breaker.on_failure();
1213        assert_eq!(breaker.state_value(), 1); // Open
1214
1215        // Should reject calls
1216        assert!(!breaker.allow());
1217        assert!(!breaker.allow());
1218    }
1219
1220    #[test]
1221    fn test_circuit_breaker_transitions_to_half_open_after_timeout() {
1222        let config = StorageResilienceConfig {
1223            max_retries: 3,
1224            retry_backoff_ms: 100,
1225            breaker_failure_threshold: 1,
1226            breaker_reset_timeout_ms: 0, // Immediate reset
1227            breaker_half_open_max_calls: 1,
1228        };
1229        let mut breaker = CircuitBreaker::new(&config, "test");
1230
1231        // Trip the breaker
1232        breaker.on_failure();
1233        assert_eq!(breaker.state_value(), 1); // Open
1234
1235        // Should allow call after timeout (immediate since reset_timeout_ms=0)
1236        std::thread::sleep(Duration::from_millis(1));
1237        assert!(breaker.allow());
1238        assert_eq!(breaker.state_value(), 2); // Half-open = 2
1239    }
1240
1241    #[test]
1242    fn test_circuit_breaker_half_open_limits_calls() {
1243        let config = StorageResilienceConfig {
1244            max_retries: 3,
1245            retry_backoff_ms: 100,
1246            breaker_failure_threshold: 1,
1247            breaker_reset_timeout_ms: 0,
1248            breaker_half_open_max_calls: 2,
1249        };
1250        let mut breaker = CircuitBreaker::new(&config, "test");
1251
1252        // Trip and transition to half-open
1253        breaker.on_failure();
1254        std::thread::sleep(Duration::from_millis(1));
1255
1256        // First call transitions to half-open and is allowed
1257        assert!(breaker.allow());
1258        assert_eq!(breaker.state_value(), 2); // Half-open
1259
1260        // Two more calls allowed (attempts = 1, 2)
1261        assert!(breaker.allow()); // attempts = 1
1262        assert!(breaker.allow()); // attempts = 2
1263
1264        // Next call rejected (attempts >= max_calls)
1265        assert!(!breaker.allow());
1266    }
1267
1268    #[test]
1269    fn test_circuit_breaker_closes_on_success() {
1270        let config = StorageResilienceConfig {
1271            breaker_failure_threshold: 1,
1272            breaker_reset_timeout_ms: 0,
1273            ..Default::default()
1274        };
1275        let mut breaker = CircuitBreaker::new(&config, "test");
1276
1277        // Trip and transition to half-open
1278        breaker.on_failure();
1279        std::thread::sleep(Duration::from_millis(1));
1280        breaker.allow();
1281
1282        // Success closes the breaker
1283        breaker.on_success();
1284        assert_eq!(breaker.state_value(), 0); // Closed
1285    }
1286
1287    #[test]
1288    fn test_circuit_breaker_reopens_on_failure_when_half_open() {
1289        let config = StorageResilienceConfig {
1290            breaker_failure_threshold: 1,
1291            breaker_reset_timeout_ms: 0,
1292            ..Default::default()
1293        };
1294        let mut breaker = CircuitBreaker::new(&config, "test");
1295
1296        // Trip and transition to half-open
1297        breaker.on_failure();
1298        std::thread::sleep(Duration::from_millis(1));
1299        breaker.allow();
1300        assert_eq!(breaker.state_value(), 2); // Half-open
1301
1302        // Failure reopens the breaker
1303        let tripped = breaker.on_failure();
1304        assert!(tripped);
1305        assert_eq!(breaker.state_value(), 1); // Open
1306    }
1307
1308    // =========================================================================
1309    // Configuration Tests
1310    // =========================================================================
1311
1312    #[test]
1313    fn test_config_default_values() {
1314        let config = StorageResilienceConfig::default();
1315        assert_eq!(config.breaker_failure_threshold, 5);
1316        assert_eq!(config.breaker_reset_timeout_ms, 30_000);
1317        assert_eq!(config.breaker_half_open_max_calls, 1);
1318    }
1319
1320    #[test]
1321    fn test_config_builder_pattern() {
1322        let config = StorageResilienceConfig::default()
1323            .with_failure_threshold(10)
1324            .with_reset_timeout_ms(60_000)
1325            .with_half_open_max_calls(3);
1326
1327        assert_eq!(config.breaker_failure_threshold, 10);
1328        assert_eq!(config.breaker_reset_timeout_ms, 60_000);
1329        assert_eq!(config.breaker_half_open_max_calls, 3);
1330    }
1331
1332    #[test]
1333    fn test_config_minimum_values() {
1334        // Threshold of 0 should become 1 in CircuitBreaker::new
1335        let config = StorageResilienceConfig {
1336            max_retries: 0,
1337            retry_backoff_ms: 0,
1338            breaker_failure_threshold: 0,
1339            breaker_reset_timeout_ms: 0,
1340            breaker_half_open_max_calls: 0,
1341        };
1342        let breaker = CircuitBreaker::new(&config, "test");
1343
1344        // Internal values should be clamped to minimum 1
1345        assert_eq!(breaker.failure_threshold, 1);
1346        assert_eq!(breaker.half_open_max_calls, 1);
1347    }
1348}