subcog/observability/
event_bus.rs1use crate::models::MemoryEvent;
4use std::sync::OnceLock;
5use tokio::sync::broadcast;
6
7const DEFAULT_EVENT_BUS_CAPACITY: usize = 1024;
8
9#[derive(Clone)]
11pub struct EventBus {
12 sender: broadcast::Sender<MemoryEvent>,
13}
14
15pub struct FilteredReceiver<F> {
17 receiver: broadcast::Receiver<MemoryEvent>,
18 predicate: F,
19}
20
21fn usize_to_f64(value: usize) -> f64 {
22 let capped = u32::try_from(value).unwrap_or(u32::MAX);
23 f64::from(capped)
24}
25
26impl EventBus {
27 #[must_use]
29 pub fn new(capacity: usize) -> Self {
30 let (sender, _receiver) = broadcast::channel(capacity);
31 Self { sender }
32 }
33
34 pub fn publish(&self, event: MemoryEvent) {
36 metrics::counter!("event_bus_publish_total").increment(1);
37 let receivers = self.sender.receiver_count();
38 metrics::gauge!("event_bus_receivers").set(usize_to_f64(receivers));
39 match self.sender.send(event) {
40 Ok(_) => {
41 metrics::gauge!("event_bus_queue_depth").set(usize_to_f64(self.sender.len()));
42 },
43 Err(_) => {
44 metrics::counter!("event_bus_publish_failed_total").increment(1);
45 },
46 }
47 }
48
49 #[must_use]
51 pub fn subscribe(&self) -> broadcast::Receiver<MemoryEvent> {
52 metrics::counter!("event_bus_subscriptions_total").increment(1);
53 metrics::gauge!("event_bus_receivers").set(usize_to_f64(self.sender.receiver_count()));
54 self.sender.subscribe()
55 }
56
57 #[must_use]
59 pub fn subscribe_filtered<F>(&self, predicate: F) -> FilteredReceiver<F>
60 where
61 F: Fn(&MemoryEvent) -> bool,
62 {
63 metrics::counter!("event_bus_subscriptions_total").increment(1);
64 metrics::gauge!("event_bus_receivers").set(usize_to_f64(self.sender.receiver_count()));
65 FilteredReceiver {
66 receiver: self.sender.subscribe(),
67 predicate,
68 }
69 }
70
71 #[must_use]
73 pub fn subscribe_event_type(
74 &self,
75 event_type: &'static str,
76 ) -> FilteredReceiver<impl Fn(&MemoryEvent) -> bool> {
77 self.subscribe_filtered(move |event| event.event_type() == event_type)
78 }
79}
80
81impl<F> FilteredReceiver<F>
82where
83 F: Fn(&MemoryEvent) -> bool,
84{
85 pub async fn recv(&mut self) -> Result<MemoryEvent, broadcast::error::RecvError> {
87 loop {
88 match self.receiver.recv().await {
89 Ok(event) if (self.predicate)(&event) => return Ok(event),
90 Ok(_) => {},
91 Err(broadcast::error::RecvError::Lagged(skipped)) => {
92 metrics::counter!("event_bus_lagged_total").increment(skipped);
93 },
94 Err(err) => return Err(err),
95 }
96 }
97 }
98}
99
100static GLOBAL_EVENT_BUS: OnceLock<EventBus> = OnceLock::new();
101
102#[must_use]
104pub fn global_event_bus() -> &'static EventBus {
105 GLOBAL_EVENT_BUS.get_or_init(|| EventBus::new(DEFAULT_EVENT_BUS_CAPACITY))
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111 use crate::models::{Domain, EventMeta, MemoryEvent, MemoryId, Namespace};
112
113 #[tokio::test]
114 async fn test_subscribe_filtered_skips_non_matching() {
115 let bus = EventBus::new(16);
116 let mut filtered = bus.subscribe_event_type("captured");
117
118 bus.publish(MemoryEvent::Retrieved {
119 meta: EventMeta::with_timestamp("test", None, 1),
120 memory_id: MemoryId::new("id1"),
121 query: "query".into(),
122 score: 0.5,
123 });
124 bus.publish(MemoryEvent::Captured {
125 meta: EventMeta::with_timestamp("test", None, 2),
126 memory_id: MemoryId::new("id2"),
127 namespace: Namespace::Decisions,
128 domain: Domain {
129 organization: None,
130 project: None,
131 repository: None,
132 },
133 content_length: 10,
134 });
135
136 let event = filtered.recv().await.expect("receive event");
137 assert_eq!(event.event_type(), "captured");
138 }
139
140 #[tokio::test]
141 async fn test_subscribe_receives_published_event() {
142 let bus = EventBus::new(16);
143 let mut receiver = bus.subscribe();
144
145 bus.publish(MemoryEvent::Captured {
146 meta: EventMeta::with_timestamp("test", None, 42),
147 memory_id: MemoryId::new("id3"),
148 namespace: Namespace::Decisions,
149 domain: Domain {
150 organization: None,
151 project: None,
152 repository: None,
153 },
154 content_length: 5,
155 });
156
157 let event = receiver.recv().await.expect("receive event");
158 assert_eq!(event.event_type(), "captured");
159 }
160}