Skip to main content

subcog/observability/
event_bus.rs

1//! Tokio broadcast event bus for cross-component notifications.
2
3use crate::models::MemoryEvent;
4use std::sync::OnceLock;
5use tokio::sync::broadcast;
6
7const DEFAULT_EVENT_BUS_CAPACITY: usize = 1024;
8
9/// Central event bus for broadcasting memory events.
10#[derive(Clone)]
11pub struct EventBus {
12    sender: broadcast::Sender<MemoryEvent>,
13}
14
15/// Filtered receiver that yields events matching a predicate.
16pub struct FilteredReceiver<F> {
17    receiver: broadcast::Receiver<MemoryEvent>,
18    predicate: F,
19}
20
21fn usize_to_f64(value: usize) -> f64 {
22    let capped = u32::try_from(value).unwrap_or(u32::MAX);
23    f64::from(capped)
24}
25
26impl EventBus {
27    /// Creates a new event bus with the given buffer capacity.
28    #[must_use]
29    pub fn new(capacity: usize) -> Self {
30        let (sender, _receiver) = broadcast::channel(capacity);
31        Self { sender }
32    }
33
34    /// Publishes an event to all subscribers (best effort).
35    pub fn publish(&self, event: MemoryEvent) {
36        metrics::counter!("event_bus_publish_total").increment(1);
37        let receivers = self.sender.receiver_count();
38        metrics::gauge!("event_bus_receivers").set(usize_to_f64(receivers));
39        match self.sender.send(event) {
40            Ok(_) => {
41                metrics::gauge!("event_bus_queue_depth").set(usize_to_f64(self.sender.len()));
42            },
43            Err(_) => {
44                metrics::counter!("event_bus_publish_failed_total").increment(1);
45            },
46        }
47    }
48
49    /// Subscribes to the event bus.
50    #[must_use]
51    pub fn subscribe(&self) -> broadcast::Receiver<MemoryEvent> {
52        metrics::counter!("event_bus_subscriptions_total").increment(1);
53        metrics::gauge!("event_bus_receivers").set(usize_to_f64(self.sender.receiver_count()));
54        self.sender.subscribe()
55    }
56
57    /// Subscribes with a predicate to filter events by type or attributes.
58    #[must_use]
59    pub fn subscribe_filtered<F>(&self, predicate: F) -> FilteredReceiver<F>
60    where
61        F: Fn(&MemoryEvent) -> bool,
62    {
63        metrics::counter!("event_bus_subscriptions_total").increment(1);
64        metrics::gauge!("event_bus_receivers").set(usize_to_f64(self.sender.receiver_count()));
65        FilteredReceiver {
66            receiver: self.sender.subscribe(),
67            predicate,
68        }
69    }
70
71    /// Subscribes to events matching the provided event type.
72    #[must_use]
73    pub fn subscribe_event_type(
74        &self,
75        event_type: &'static str,
76    ) -> FilteredReceiver<impl Fn(&MemoryEvent) -> bool> {
77        self.subscribe_filtered(move |event| event.event_type() == event_type)
78    }
79}
80
81impl<F> FilteredReceiver<F>
82where
83    F: Fn(&MemoryEvent) -> bool,
84{
85    /// Receives the next event that matches the predicate.
86    pub async fn recv(&mut self) -> Result<MemoryEvent, broadcast::error::RecvError> {
87        loop {
88            match self.receiver.recv().await {
89                Ok(event) if (self.predicate)(&event) => return Ok(event),
90                Ok(_) => {},
91                Err(broadcast::error::RecvError::Lagged(skipped)) => {
92                    metrics::counter!("event_bus_lagged_total").increment(skipped);
93                },
94                Err(err) => return Err(err),
95            }
96        }
97    }
98}
99
100static GLOBAL_EVENT_BUS: OnceLock<EventBus> = OnceLock::new();
101
102/// Returns the global event bus, initializing it on first use.
103#[must_use]
104pub fn global_event_bus() -> &'static EventBus {
105    GLOBAL_EVENT_BUS.get_or_init(|| EventBus::new(DEFAULT_EVENT_BUS_CAPACITY))
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111    use crate::models::{Domain, EventMeta, MemoryEvent, MemoryId, Namespace};
112
113    #[tokio::test]
114    async fn test_subscribe_filtered_skips_non_matching() {
115        let bus = EventBus::new(16);
116        let mut filtered = bus.subscribe_event_type("captured");
117
118        bus.publish(MemoryEvent::Retrieved {
119            meta: EventMeta::with_timestamp("test", None, 1),
120            memory_id: MemoryId::new("id1"),
121            query: "query".into(),
122            score: 0.5,
123        });
124        bus.publish(MemoryEvent::Captured {
125            meta: EventMeta::with_timestamp("test", None, 2),
126            memory_id: MemoryId::new("id2"),
127            namespace: Namespace::Decisions,
128            domain: Domain {
129                organization: None,
130                project: None,
131                repository: None,
132            },
133            content_length: 10,
134        });
135
136        let event = filtered.recv().await.expect("receive event");
137        assert_eq!(event.event_type(), "captured");
138    }
139
140    #[tokio::test]
141    async fn test_subscribe_receives_published_event() {
142        let bus = EventBus::new(16);
143        let mut receiver = bus.subscribe();
144
145        bus.publish(MemoryEvent::Captured {
146            meta: EventMeta::with_timestamp("test", None, 42),
147            memory_id: MemoryId::new("id3"),
148            namespace: Namespace::Decisions,
149            domain: Domain {
150                organization: None,
151                project: None,
152                repository: None,
153            },
154            content_length: 5,
155        });
156
157        let event = receiver.recv().await.expect("receive event");
158        assert_eq!(event.event_type(), "captured");
159    }
160}