1use crate::{Error, Result};
35use std::sync::Mutex;
36use std::time::{Duration, Instant};
37
38#[derive(Debug, Clone)]
40pub struct StorageResilienceConfig {
41 pub max_retries: u32,
43 pub retry_backoff_ms: u64,
45 pub breaker_failure_threshold: u32,
47 pub breaker_reset_timeout_ms: u64,
49 pub breaker_half_open_max_calls: u32,
51}
52
53impl Default for StorageResilienceConfig {
54 fn default() -> Self {
55 Self {
56 max_retries: 3,
57 retry_backoff_ms: 100,
58 breaker_failure_threshold: 5,
59 breaker_reset_timeout_ms: 30_000,
60 breaker_half_open_max_calls: 1,
61 }
62 }
63}
64
65impl StorageResilienceConfig {
66 #[must_use]
68 pub fn from_env() -> Self {
69 Self::default().with_env_overrides()
70 }
71
72 #[must_use]
74 pub fn with_env_overrides(mut self) -> Self {
75 if let Ok(v) = std::env::var("SUBCOG_STORAGE_MAX_RETRIES")
77 && let Ok(parsed) = v.parse::<u32>()
78 {
79 self.max_retries = parsed;
80 }
81 if let Ok(v) = std::env::var("SUBCOG_STORAGE_RETRY_BACKOFF_MS")
82 && let Ok(parsed) = v.parse::<u64>()
83 {
84 self.retry_backoff_ms = parsed;
85 }
86 if let Ok(v) = std::env::var("SUBCOG_STORAGE_BREAKER_FAILURE_THRESHOLD")
88 && let Ok(parsed) = v.parse::<u32>()
89 {
90 self.breaker_failure_threshold = parsed.max(1);
91 }
92 if let Ok(v) = std::env::var("SUBCOG_STORAGE_BREAKER_RESET_MS")
93 && let Ok(parsed) = v.parse::<u64>()
94 {
95 self.breaker_reset_timeout_ms = parsed;
96 }
97 if let Ok(v) = std::env::var("SUBCOG_STORAGE_BREAKER_HALF_OPEN_MAX_CALLS")
98 && let Ok(parsed) = v.parse::<u32>()
99 {
100 self.breaker_half_open_max_calls = parsed.max(1);
101 }
102 self
103 }
104
105 #[must_use]
107 pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
108 self.max_retries = max_retries;
109 self
110 }
111
112 #[must_use]
114 pub const fn with_retry_backoff_ms(mut self, backoff_ms: u64) -> Self {
115 self.retry_backoff_ms = backoff_ms;
116 self
117 }
118
119 #[must_use]
121 pub const fn with_failure_threshold(mut self, threshold: u32) -> Self {
122 self.breaker_failure_threshold = threshold;
123 self
124 }
125
126 #[must_use]
128 pub const fn with_reset_timeout_ms(mut self, timeout_ms: u64) -> Self {
129 self.breaker_reset_timeout_ms = timeout_ms;
130 self
131 }
132
133 #[must_use]
135 pub const fn with_half_open_max_calls(mut self, max_calls: u32) -> Self {
136 self.breaker_half_open_max_calls = max_calls;
137 self
138 }
139}
140
141#[derive(Debug)]
143enum BreakerState {
144 Closed { failures: u32 },
145 Open { opened_at: Instant },
146 HalfOpen { attempts: u32 },
147}
148
149#[derive(Debug)]
151pub struct CircuitBreaker {
152 state: BreakerState,
153 failure_threshold: u32,
154 reset_timeout: Duration,
155 half_open_max_calls: u32,
156 backend_name: &'static str,
157}
158
159impl CircuitBreaker {
160 #[must_use]
162 pub fn new(config: &StorageResilienceConfig, backend_name: &'static str) -> Self {
163 Self {
164 state: BreakerState::Closed { failures: 0 },
165 failure_threshold: config.breaker_failure_threshold.max(1),
166 reset_timeout: Duration::from_millis(config.breaker_reset_timeout_ms),
167 half_open_max_calls: config.breaker_half_open_max_calls.max(1),
168 backend_name,
169 }
170 }
171
172 pub fn allow(&mut self) -> bool {
176 match self.state {
177 BreakerState::Closed { .. } => true,
178 BreakerState::Open { opened_at } => {
179 if opened_at.elapsed() >= self.reset_timeout {
180 tracing::info!(
181 backend = self.backend_name,
182 "Circuit breaker transitioning to half-open"
183 );
184 self.state = BreakerState::HalfOpen { attempts: 0 };
185 true
186 } else {
187 false
188 }
189 },
190 BreakerState::HalfOpen { ref mut attempts } => {
191 if *attempts >= self.half_open_max_calls {
192 false
193 } else {
194 *attempts += 1;
195 true
196 }
197 },
198 }
199 }
200
201 pub fn on_success(&mut self) {
203 if !matches!(self.state, BreakerState::Closed { failures: 0 }) {
204 tracing::info!(
205 backend = self.backend_name,
206 "Circuit breaker closing after success"
207 );
208 }
209 self.state = BreakerState::Closed { failures: 0 };
210 }
211
212 pub fn on_failure(&mut self) -> bool {
216 match self.state {
217 BreakerState::Closed { ref mut failures } => {
218 *failures += 1;
219 if *failures >= self.failure_threshold {
220 tracing::warn!(
221 backend = self.backend_name,
222 failures = *failures,
223 threshold = self.failure_threshold,
224 "Circuit breaker opened after consecutive failures"
225 );
226 self.state = BreakerState::Open {
227 opened_at: Instant::now(),
228 };
229 return true;
230 }
231 },
232 BreakerState::HalfOpen { .. } => {
233 tracing::warn!(
234 backend = self.backend_name,
235 "Circuit breaker re-opened after half-open failure"
236 );
237 self.state = BreakerState::Open {
238 opened_at: Instant::now(),
239 };
240 return true;
241 },
242 BreakerState::Open { .. } => {},
243 }
244 false
245 }
246
247 #[must_use]
253 pub const fn state_value(&self) -> u8 {
254 match self.state {
255 BreakerState::Closed { .. } => 0,
256 BreakerState::Open { .. } => 1,
257 BreakerState::HalfOpen { .. } => 2,
258 }
259 }
260
261 #[must_use]
263 pub const fn backend_name(&self) -> &'static str {
264 self.backend_name
265 }
266}
267
268fn calculate_retry_delay(base_delay_ms: u64, attempt: u32) -> u64 {
279 let exponent = attempt.saturating_sub(1);
281 let exponential_delay = base_delay_ms.saturating_mul(1u64 << exponent.min(10));
282
283 let capped_delay = exponential_delay.min(10_000);
285
286 let jitter = calculate_jitter(capped_delay);
289 let total_delay = capped_delay.saturating_add(jitter);
290
291 tracing::debug!(
292 "Storage retry backoff: attempt={}, base={}ms, exponential={}ms, jitter={}ms, total={}ms",
293 attempt,
294 base_delay_ms,
295 capped_delay,
296 jitter,
297 total_delay
298 );
299
300 total_delay
301}
302
303fn calculate_jitter(delay_ms: u64) -> u64 {
305 let jitter_max = delay_ms / 2;
306 if jitter_max == 0 {
307 return 0;
308 }
309
310 let nanos = std::time::SystemTime::now()
311 .duration_since(std::time::UNIX_EPOCH)
312 .map(|d| d.subsec_nanos())
313 .unwrap_or(0);
314
315 u64::from(nanos) % jitter_max
316}
317
318#[must_use]
326pub fn is_retryable_storage_error(err: &Error) -> bool {
327 match err {
328 Error::OperationFailed { cause, .. } => {
329 let lower = cause.to_lowercase();
330 lower.contains("timeout")
332 || lower.contains("timed out")
333 || lower.contains("deadline")
334 || lower.contains("elapsed")
335 || lower.contains("connect")
337 || lower.contains("connection")
338 || lower.contains("network")
339 || lower.contains("dns")
340 || lower.contains("resolve")
341 || lower.contains("locked")
343 || lower.contains("busy")
344 || lower.contains("pool")
345 || lower.contains("exhausted")
346 || lower.contains("temporary")
348 || lower.contains("try again")
349 || lower.contains("interrupted")
350 },
351 _ => false,
352 }
353}
354
355pub fn retry_connection<T, F>(
388 config: &StorageResilienceConfig,
389 backend_name: &str,
390 operation: &str,
391 mut connect_fn: F,
392) -> Result<T>
393where
394 F: FnMut() -> Result<T>,
395{
396 let max_attempts = config.max_retries + 1;
397 let mut attempts = 0;
398 let mut last_error = None;
399
400 while attempts < max_attempts {
401 attempts += 1;
402
403 match connect_fn() {
404 Ok(result) => {
405 if attempts > 1 {
406 tracing::info!(
407 backend = backend_name,
408 operation = operation,
409 attempts = attempts,
410 "Connection succeeded after retries"
411 );
412 }
413 return Ok(result);
414 },
415 Err(err) => {
416 let retryable = is_retryable_connection_error(&err) && attempts < max_attempts;
417
418 if !retryable {
419 tracing::warn!(
420 backend = backend_name,
421 operation = operation,
422 attempts = attempts,
423 error = %err,
424 "Connection failed with non-retryable error"
425 );
426 return Err(err);
427 }
428
429 let delay_ms = calculate_retry_delay(config.retry_backoff_ms, attempts);
430 tracing::debug!(
431 backend = backend_name,
432 operation = operation,
433 attempt = attempts,
434 max_attempts = max_attempts,
435 delay_ms = delay_ms,
436 error = %err,
437 "Connection failed, retrying with backoff"
438 );
439
440 metrics::counter!(
441 "storage_connection_retries_total",
442 "backend" => backend_name.to_string(),
443 "operation" => operation.to_string()
444 )
445 .increment(1);
446
447 std::thread::sleep(Duration::from_millis(delay_ms));
448 last_error = Some(err);
449 },
450 }
451 }
452
453 let err = last_error.unwrap_or_else(|| Error::OperationFailed {
454 operation: format!("{backend_name}_{operation}"),
455 cause: "exhausted connection retries".to_string(),
456 });
457
458 tracing::error!(
459 backend = backend_name,
460 operation = operation,
461 max_attempts = max_attempts,
462 error = %err,
463 "Connection failed after exhausting all retries"
464 );
465
466 Err(err)
467}
468
469fn is_retryable_connection_error(err: &Error) -> bool {
478 match err {
479 Error::OperationFailed { cause, .. } => {
480 let lower = cause.to_lowercase();
481 lower.contains("connection refused")
483 || lower.contains("connection reset")
484 || lower.contains("connection timed out")
485 || lower.contains("network")
486 || lower.contains("unreachable")
487 || lower.contains("dns")
488 || lower.contains("resolve")
489 || lower.contains("timeout")
490 || lower.contains("timed out")
491 || lower.contains("pool")
492 || lower.contains("exhausted")
493 || lower.contains("not ready")
495 || lower.contains("starting")
496 || lower.contains("unavailable")
497 || lower.contains("service")
498 || lower.contains("temporary")
500 || lower.contains("try again")
501 || lower.contains("econnrefused")
502 || lower.contains("etimedout")
503 },
504 _ => false,
505 }
506}
507
508use super::traits::PersistenceBackend;
513use crate::models::{Memory, MemoryId};
514
515pub struct ResilientPersistenceBackend<P: PersistenceBackend> {
517 inner: P,
518 config: StorageResilienceConfig,
519 breaker: Mutex<CircuitBreaker>,
520 backend_name: &'static str,
521}
522
523impl<P: PersistenceBackend> ResilientPersistenceBackend<P> {
524 #[must_use]
526 pub fn new(inner: P, config: StorageResilienceConfig, backend_name: &'static str) -> Self {
527 Self {
528 inner,
529 breaker: Mutex::new(CircuitBreaker::new(&config, backend_name)),
530 config,
531 backend_name,
532 }
533 }
534
535 fn execute<T, F>(&self, operation: &'static str, mut call: F) -> Result<T>
536 where
537 F: FnMut() -> Result<T>,
538 {
539 let max_attempts = self.config.max_retries + 1;
540 let mut attempts = 0;
541 let mut last_error = None;
542
543 while attempts < max_attempts {
544 attempts += 1;
545
546 if let Some(err) = self.check_circuit_breaker(operation) {
547 return Err(err);
548 }
549
550 match call() {
551 Ok(value) => return Ok(self.handle_success(operation, value)),
552 Err(err) => {
553 last_error = self.handle_error(operation, err, attempts, max_attempts)?;
554 },
555 }
556 }
557
558 Err(last_error.unwrap_or_else(|| Error::OperationFailed {
559 operation: format!("storage_{operation}"),
560 cause: "exhausted retries".to_string(),
561 }))
562 }
563
564 fn check_circuit_breaker(&self, operation: &'static str) -> Option<Error> {
566 let mut breaker = self
567 .breaker
568 .lock()
569 .unwrap_or_else(std::sync::PoisonError::into_inner);
570
571 if !breaker.allow() {
572 let state = breaker.state_value();
573 drop(breaker);
574 Self::record_metrics(self.backend_name, operation, "circuit_open", state);
575 return Some(Error::OperationFailed {
576 operation: format!("storage_{operation}"),
577 cause: format!("circuit breaker open for backend '{}'", self.backend_name),
578 });
579 }
580 None
581 }
582
583 fn handle_success<T>(&self, operation: &'static str, value: T) -> T {
585 let mut breaker = self
586 .breaker
587 .lock()
588 .unwrap_or_else(std::sync::PoisonError::into_inner);
589 breaker.on_success();
590 let state = breaker.state_value();
591 drop(breaker);
592 Self::record_metrics(self.backend_name, operation, "success", state);
593 value
594 }
595
596 fn handle_error(
598 &self,
599 operation: &'static str,
600 err: Error,
601 attempts: u32,
602 max_attempts: u32,
603 ) -> Result<Option<Error>> {
604 let retryable = is_retryable_storage_error(&err) && attempts < max_attempts;
605
606 let mut breaker = self
607 .breaker
608 .lock()
609 .unwrap_or_else(std::sync::PoisonError::into_inner);
610 let tripped = breaker.on_failure();
611 let state = breaker.state_value();
612 drop(breaker);
613
614 Self::record_metrics(self.backend_name, operation, "error", state);
615
616 if tripped {
617 Self::record_circuit_trip(self.backend_name, operation);
618 }
619
620 if !retryable {
621 return Err(err);
622 }
623
624 Self::log_retry_attempt(self.backend_name, operation, attempts, max_attempts, &err);
625 self.apply_retry_backoff(attempts);
626 Ok(Some(err))
627 }
628
629 fn apply_retry_backoff(&self, attempts: u32) {
631 if self.config.retry_backoff_ms > 0 {
632 let delay = calculate_retry_delay(self.config.retry_backoff_ms, attempts);
633 std::thread::sleep(Duration::from_millis(delay));
634 }
635 }
636
637 fn record_metrics(
638 backend: &'static str,
639 operation: &'static str,
640 status: &'static str,
641 state: u8,
642 ) {
643 metrics::counter!(
644 "storage_requests_total",
645 "backend" => backend,
646 "operation" => operation,
647 "status" => status
648 )
649 .increment(1);
650 metrics::gauge!(
651 "storage_circuit_breaker_state",
652 "backend" => backend
653 )
654 .set(f64::from(state));
655 }
656
657 fn record_circuit_trip(backend: &'static str, operation: &'static str) {
659 metrics::counter!(
660 "storage_circuit_breaker_trips_total",
661 "backend" => backend,
662 "operation" => operation
663 )
664 .increment(1);
665 tracing::warn!(
666 backend = backend,
667 operation = operation,
668 "Storage circuit breaker opened"
669 );
670 }
671
672 fn log_retry_attempt(
674 backend: &'static str,
675 operation: &'static str,
676 attempt: u32,
677 max_attempts: u32,
678 err: &Error,
679 ) {
680 metrics::counter!(
681 "storage_retries_total",
682 "backend" => backend,
683 "operation" => operation
684 )
685 .increment(1);
686 tracing::debug!(
687 backend = backend,
688 operation = operation,
689 attempt = attempt,
690 max_attempts = max_attempts,
691 error = %err,
692 "Retrying storage operation"
693 );
694 }
695}
696
697impl<P: PersistenceBackend> PersistenceBackend for ResilientPersistenceBackend<P> {
698 fn store(&self, memory: &Memory) -> Result<()> {
699 self.execute("store", || self.inner.store(memory))
700 }
701
702 fn get(&self, id: &MemoryId) -> Result<Option<Memory>> {
703 self.execute("get", || self.inner.get(id))
704 }
705
706 fn delete(&self, id: &MemoryId) -> Result<bool> {
707 self.execute("delete", || self.inner.delete(id))
708 }
709
710 fn list_ids(&self) -> Result<Vec<MemoryId>> {
711 self.execute("list_ids", || self.inner.list_ids())
712 }
713
714 fn exists(&self, id: &MemoryId) -> Result<bool> {
715 self.execute("exists", || self.inner.exists(id))
716 }
717
718 fn count(&self) -> Result<usize> {
719 self.execute("count", || self.inner.count())
720 }
721}
722
723use super::traits::IndexBackend;
728use crate::models::SearchFilter;
729
730pub struct ResilientIndexBackend<I: IndexBackend> {
732 inner: I,
733 config: StorageResilienceConfig,
734 breaker: Mutex<CircuitBreaker>,
735 backend_name: &'static str,
736}
737
738impl<I: IndexBackend> ResilientIndexBackend<I> {
739 #[must_use]
741 pub fn new(inner: I, config: StorageResilienceConfig, backend_name: &'static str) -> Self {
742 Self {
743 inner,
744 breaker: Mutex::new(CircuitBreaker::new(&config, backend_name)),
745 config,
746 backend_name,
747 }
748 }
749
750 fn execute<T, F>(&self, operation: &'static str, mut call: F) -> Result<T>
751 where
752 F: FnMut() -> Result<T>,
753 {
754 let max_attempts = self.config.max_retries + 1;
755 let mut attempts = 0;
756 let mut last_error = None;
757
758 while attempts < max_attempts {
759 attempts += 1;
760
761 if let Some(err) = self.check_circuit_breaker(operation) {
762 return Err(err);
763 }
764
765 match call() {
766 Ok(value) => return Ok(self.handle_success(operation, value)),
767 Err(err) => {
768 last_error = self.handle_error(operation, err, attempts, max_attempts)?;
769 },
770 }
771 }
772
773 Err(last_error.unwrap_or_else(|| Error::OperationFailed {
774 operation: format!("index_{operation}"),
775 cause: "exhausted retries".to_string(),
776 }))
777 }
778
779 fn check_circuit_breaker(&self, operation: &'static str) -> Option<Error> {
781 let mut breaker = self
782 .breaker
783 .lock()
784 .unwrap_or_else(std::sync::PoisonError::into_inner);
785
786 if !breaker.allow() {
787 let state = breaker.state_value();
788 drop(breaker);
789 Self::record_metrics(self.backend_name, operation, "circuit_open", state);
790 return Some(Error::OperationFailed {
791 operation: format!("index_{operation}"),
792 cause: format!("circuit breaker open for backend '{}'", self.backend_name),
793 });
794 }
795 None
796 }
797
798 fn handle_success<T>(&self, operation: &'static str, value: T) -> T {
800 let mut breaker = self
801 .breaker
802 .lock()
803 .unwrap_or_else(std::sync::PoisonError::into_inner);
804 breaker.on_success();
805 let state = breaker.state_value();
806 drop(breaker);
807 Self::record_metrics(self.backend_name, operation, "success", state);
808 value
809 }
810
811 fn handle_error(
813 &self,
814 operation: &'static str,
815 err: Error,
816 attempts: u32,
817 max_attempts: u32,
818 ) -> Result<Option<Error>> {
819 let retryable = is_retryable_storage_error(&err) && attempts < max_attempts;
820
821 let mut breaker = self
822 .breaker
823 .lock()
824 .unwrap_or_else(std::sync::PoisonError::into_inner);
825 let tripped = breaker.on_failure();
826 let state = breaker.state_value();
827 drop(breaker);
828
829 Self::record_metrics(self.backend_name, operation, "error", state);
830
831 if tripped {
832 Self::record_circuit_trip(self.backend_name, operation);
833 }
834
835 if !retryable {
836 return Err(err);
837 }
838
839 Self::log_retry(self.backend_name, operation);
840 self.apply_retry_backoff(attempts);
841 Ok(Some(err))
842 }
843
844 fn apply_retry_backoff(&self, attempts: u32) {
846 if self.config.retry_backoff_ms > 0 {
847 let delay = calculate_retry_delay(self.config.retry_backoff_ms, attempts);
848 std::thread::sleep(Duration::from_millis(delay));
849 }
850 }
851
852 fn record_metrics(
853 backend: &'static str,
854 operation: &'static str,
855 status: &'static str,
856 state: u8,
857 ) {
858 metrics::counter!(
859 "storage_requests_total",
860 "backend" => backend,
861 "operation" => operation,
862 "status" => status
863 )
864 .increment(1);
865 metrics::gauge!(
866 "storage_circuit_breaker_state",
867 "backend" => backend
868 )
869 .set(f64::from(state));
870 }
871
872 fn record_circuit_trip(backend: &'static str, operation: &'static str) {
874 metrics::counter!(
875 "storage_circuit_breaker_trips_total",
876 "backend" => backend,
877 "operation" => operation
878 )
879 .increment(1);
880 tracing::warn!(
881 backend = backend,
882 operation = operation,
883 "Index circuit breaker opened"
884 );
885 }
886
887 fn log_retry(backend: &'static str, operation: &'static str) {
889 metrics::counter!(
890 "storage_retries_total",
891 "backend" => backend,
892 "operation" => operation
893 )
894 .increment(1);
895 }
896}
897
898impl<I: IndexBackend> IndexBackend for ResilientIndexBackend<I> {
899 fn index(&self, memory: &Memory) -> Result<()> {
900 self.execute("index", || self.inner.index(memory))
901 }
902
903 fn remove(&self, id: &MemoryId) -> Result<bool> {
904 self.execute("remove", || self.inner.remove(id))
905 }
906
907 fn search(
908 &self,
909 query: &str,
910 filter: &SearchFilter,
911 limit: usize,
912 ) -> Result<Vec<(MemoryId, f32)>> {
913 self.execute("search", || self.inner.search(query, filter, limit))
914 }
915
916 fn reindex(&self, memories: &[Memory]) -> Result<()> {
917 self.execute("reindex", || self.inner.reindex(memories))
918 }
919
920 fn clear(&self) -> Result<()> {
921 self.execute("clear", || self.inner.clear())
922 }
923
924 fn list_all(&self, filter: &SearchFilter, limit: usize) -> Result<Vec<(MemoryId, f32)>> {
925 self.execute("list_all", || self.inner.list_all(filter, limit))
926 }
927
928 fn get_memory(&self, id: &MemoryId) -> Result<Option<Memory>> {
929 self.execute("get_memory", || self.inner.get_memory(id))
930 }
931
932 fn get_memories_batch(&self, ids: &[MemoryId]) -> Result<Vec<Option<Memory>>> {
933 self.execute("get_memories_batch", || self.inner.get_memories_batch(ids))
934 }
935}
936
937use super::traits::{VectorBackend, VectorFilter};
942
943pub struct ResilientVectorBackend<V: VectorBackend> {
945 inner: V,
946 config: StorageResilienceConfig,
947 breaker: Mutex<CircuitBreaker>,
948 backend_name: &'static str,
949}
950
951impl<V: VectorBackend> ResilientVectorBackend<V> {
952 #[must_use]
954 pub fn new(inner: V, config: StorageResilienceConfig, backend_name: &'static str) -> Self {
955 Self {
956 inner,
957 breaker: Mutex::new(CircuitBreaker::new(&config, backend_name)),
958 config,
959 backend_name,
960 }
961 }
962
963 fn check_circuit_breaker(&self, operation: &'static str) -> Option<Error> {
965 let mut breaker = self
966 .breaker
967 .lock()
968 .unwrap_or_else(std::sync::PoisonError::into_inner);
969
970 if !breaker.allow() {
971 let state = breaker.state_value();
972 drop(breaker);
973 Self::record_metrics(self.backend_name, operation, "circuit_open", state);
974 return Some(Error::OperationFailed {
975 operation: format!("vector_{operation}"),
976 cause: format!("circuit breaker open for backend '{}'", self.backend_name),
977 });
978 }
979 drop(breaker);
980 None
981 }
982
983 fn handle_success<T>(&self, operation: &'static str, value: T) -> T {
985 let mut breaker = self
986 .breaker
987 .lock()
988 .unwrap_or_else(std::sync::PoisonError::into_inner);
989 breaker.on_success();
990 let state = breaker.state_value();
991 drop(breaker);
992 Self::record_metrics(self.backend_name, operation, "success", state);
993 value
994 }
995
996 fn handle_error(
998 &self,
999 operation: &'static str,
1000 err: Error,
1001 attempts: u32,
1002 max_attempts: u32,
1003 ) -> Result<Option<Error>> {
1004 let retryable = is_retryable_storage_error(&err) && attempts < max_attempts;
1005
1006 let mut breaker = self
1007 .breaker
1008 .lock()
1009 .unwrap_or_else(std::sync::PoisonError::into_inner);
1010 let tripped = breaker.on_failure();
1011 let state = breaker.state_value();
1012 drop(breaker);
1013
1014 Self::record_metrics(self.backend_name, operation, "error", state);
1015
1016 if tripped {
1017 Self::record_circuit_trip(self.backend_name, operation);
1018 }
1019
1020 if !retryable {
1021 return Err(err);
1022 }
1023
1024 self.apply_retry_backoff(operation, attempts, &err);
1025 Ok(Some(err))
1026 }
1027
1028 fn apply_retry_backoff(&self, operation: &'static str, attempts: u32, err: &Error) {
1030 let delay_ms = calculate_retry_delay(self.config.retry_backoff_ms, attempts);
1031 Self::log_retry_attempt(self.backend_name, operation, attempts, delay_ms, err);
1032 std::thread::sleep(std::time::Duration::from_millis(delay_ms));
1033 }
1034
1035 fn execute<T, F>(&self, operation: &'static str, mut call: F) -> Result<T>
1036 where
1037 F: FnMut() -> Result<T>,
1038 {
1039 let max_attempts = self.config.max_retries + 1;
1040 let mut attempts = 0;
1041 let mut last_error = None;
1042
1043 while attempts < max_attempts {
1044 attempts += 1;
1045
1046 if let Some(err) = self.check_circuit_breaker(operation) {
1047 return Err(err);
1048 }
1049
1050 match call() {
1051 Ok(value) => return Ok(self.handle_success(operation, value)),
1052 Err(err) => {
1053 last_error = self.handle_error(operation, err, attempts, max_attempts)?;
1054 },
1055 }
1056 }
1057
1058 Err(last_error.unwrap_or_else(|| Error::OperationFailed {
1059 operation: format!("vector_{operation}"),
1060 cause: "exhausted retries".to_string(),
1061 }))
1062 }
1063
1064 fn record_metrics(
1065 backend: &'static str,
1066 operation: &'static str,
1067 status: &'static str,
1068 state: u8,
1069 ) {
1070 metrics::counter!(
1071 "storage_requests_total",
1072 "backend" => backend,
1073 "operation" => operation,
1074 "status" => status
1075 )
1076 .increment(1);
1077 metrics::gauge!(
1078 "storage_circuit_breaker_state",
1079 "backend" => backend
1080 )
1081 .set(f64::from(state));
1082 }
1083
1084 fn record_circuit_trip(backend: &'static str, operation: &'static str) {
1086 metrics::counter!(
1087 "storage_circuit_breaker_trips_total",
1088 "backend" => backend,
1089 "operation" => operation
1090 )
1091 .increment(1);
1092 tracing::warn!(
1093 backend = backend,
1094 operation = operation,
1095 "Vector circuit breaker opened"
1096 );
1097 }
1098
1099 fn log_retry_attempt(
1101 backend: &'static str,
1102 operation: &'static str,
1103 attempt: u32,
1104 delay_ms: u64,
1105 err: &Error,
1106 ) {
1107 metrics::counter!(
1108 "storage_retries_total",
1109 "backend" => backend,
1110 "operation" => operation
1111 )
1112 .increment(1);
1113 tracing::debug!(
1114 backend = backend,
1115 operation = operation,
1116 attempt = attempt,
1117 delay_ms = delay_ms,
1118 error = %err,
1119 "Retrying vector operation"
1120 );
1121 }
1122}
1123
1124impl<V: VectorBackend> VectorBackend for ResilientVectorBackend<V> {
1125 fn dimensions(&self) -> usize {
1126 self.inner.dimensions()
1128 }
1129
1130 fn upsert(&self, id: &MemoryId, embedding: &[f32]) -> Result<()> {
1131 self.execute("upsert", || self.inner.upsert(id, embedding))
1132 }
1133
1134 fn remove(&self, id: &MemoryId) -> Result<bool> {
1135 self.execute("remove", || self.inner.remove(id))
1136 }
1137
1138 fn search(
1139 &self,
1140 query_embedding: &[f32],
1141 filter: &VectorFilter,
1142 limit: usize,
1143 ) -> Result<Vec<(MemoryId, f32)>> {
1144 self.execute("search", || {
1145 self.inner.search(query_embedding, filter, limit)
1146 })
1147 }
1148
1149 fn count(&self) -> Result<usize> {
1150 self.execute("count", || self.inner.count())
1151 }
1152
1153 fn clear(&self) -> Result<()> {
1154 self.execute("clear", || self.inner.clear())
1155 }
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160 use super::*;
1161
1162 #[test]
1167 fn test_circuit_breaker_starts_closed() {
1168 let config = StorageResilienceConfig::default();
1169 let breaker = CircuitBreaker::new(&config, "test");
1170 assert_eq!(breaker.state_value(), 0); }
1172
1173 #[test]
1174 fn test_circuit_breaker_allows_calls_when_closed() {
1175 let config = StorageResilienceConfig::default();
1176 let mut breaker = CircuitBreaker::new(&config, "test");
1177 assert!(breaker.allow());
1178 assert!(breaker.allow());
1179 assert!(breaker.allow());
1180 }
1181
1182 #[test]
1183 fn test_circuit_breaker_opens_after_threshold_failures() {
1184 let config = StorageResilienceConfig {
1185 breaker_failure_threshold: 3,
1186 ..Default::default()
1187 };
1188 let mut breaker = CircuitBreaker::new(&config, "test");
1189
1190 breaker.on_failure();
1192 assert_eq!(breaker.state_value(), 0); breaker.on_failure();
1194 assert_eq!(breaker.state_value(), 0); let tripped = breaker.on_failure();
1198 assert!(tripped);
1199 assert_eq!(breaker.state_value(), 1); }
1201
1202 #[test]
1203 fn test_circuit_breaker_rejects_when_open() {
1204 let config = StorageResilienceConfig {
1205 breaker_failure_threshold: 1,
1206 breaker_reset_timeout_ms: 10_000, ..Default::default()
1208 };
1209 let mut breaker = CircuitBreaker::new(&config, "test");
1210
1211 breaker.on_failure();
1213 assert_eq!(breaker.state_value(), 1); assert!(!breaker.allow());
1217 assert!(!breaker.allow());
1218 }
1219
1220 #[test]
1221 fn test_circuit_breaker_transitions_to_half_open_after_timeout() {
1222 let config = StorageResilienceConfig {
1223 max_retries: 3,
1224 retry_backoff_ms: 100,
1225 breaker_failure_threshold: 1,
1226 breaker_reset_timeout_ms: 0, breaker_half_open_max_calls: 1,
1228 };
1229 let mut breaker = CircuitBreaker::new(&config, "test");
1230
1231 breaker.on_failure();
1233 assert_eq!(breaker.state_value(), 1); std::thread::sleep(Duration::from_millis(1));
1237 assert!(breaker.allow());
1238 assert_eq!(breaker.state_value(), 2); }
1240
1241 #[test]
1242 fn test_circuit_breaker_half_open_limits_calls() {
1243 let config = StorageResilienceConfig {
1244 max_retries: 3,
1245 retry_backoff_ms: 100,
1246 breaker_failure_threshold: 1,
1247 breaker_reset_timeout_ms: 0,
1248 breaker_half_open_max_calls: 2,
1249 };
1250 let mut breaker = CircuitBreaker::new(&config, "test");
1251
1252 breaker.on_failure();
1254 std::thread::sleep(Duration::from_millis(1));
1255
1256 assert!(breaker.allow());
1258 assert_eq!(breaker.state_value(), 2); assert!(breaker.allow()); assert!(breaker.allow()); assert!(!breaker.allow());
1266 }
1267
1268 #[test]
1269 fn test_circuit_breaker_closes_on_success() {
1270 let config = StorageResilienceConfig {
1271 breaker_failure_threshold: 1,
1272 breaker_reset_timeout_ms: 0,
1273 ..Default::default()
1274 };
1275 let mut breaker = CircuitBreaker::new(&config, "test");
1276
1277 breaker.on_failure();
1279 std::thread::sleep(Duration::from_millis(1));
1280 breaker.allow();
1281
1282 breaker.on_success();
1284 assert_eq!(breaker.state_value(), 0); }
1286
1287 #[test]
1288 fn test_circuit_breaker_reopens_on_failure_when_half_open() {
1289 let config = StorageResilienceConfig {
1290 breaker_failure_threshold: 1,
1291 breaker_reset_timeout_ms: 0,
1292 ..Default::default()
1293 };
1294 let mut breaker = CircuitBreaker::new(&config, "test");
1295
1296 breaker.on_failure();
1298 std::thread::sleep(Duration::from_millis(1));
1299 breaker.allow();
1300 assert_eq!(breaker.state_value(), 2); let tripped = breaker.on_failure();
1304 assert!(tripped);
1305 assert_eq!(breaker.state_value(), 1); }
1307
1308 #[test]
1313 fn test_config_default_values() {
1314 let config = StorageResilienceConfig::default();
1315 assert_eq!(config.breaker_failure_threshold, 5);
1316 assert_eq!(config.breaker_reset_timeout_ms, 30_000);
1317 assert_eq!(config.breaker_half_open_max_calls, 1);
1318 }
1319
1320 #[test]
1321 fn test_config_builder_pattern() {
1322 let config = StorageResilienceConfig::default()
1323 .with_failure_threshold(10)
1324 .with_reset_timeout_ms(60_000)
1325 .with_half_open_max_calls(3);
1326
1327 assert_eq!(config.breaker_failure_threshold, 10);
1328 assert_eq!(config.breaker_reset_timeout_ms, 60_000);
1329 assert_eq!(config.breaker_half_open_max_calls, 3);
1330 }
1331
1332 #[test]
1333 fn test_config_minimum_values() {
1334 let config = StorageResilienceConfig {
1336 max_retries: 0,
1337 retry_backoff_ms: 0,
1338 breaker_failure_threshold: 0,
1339 breaker_reset_timeout_ms: 0,
1340 breaker_half_open_max_calls: 0,
1341 };
1342 let breaker = CircuitBreaker::new(&config, "test");
1343
1344 assert_eq!(breaker.failure_threshold, 1);
1346 assert_eq!(breaker.half_open_max_calls, 1);
1347 }
1348}