|
| 1 | +use std::{ |
| 2 | + collections::BTreeMap, |
| 3 | + future::Future, |
| 4 | + path::PathBuf, |
| 5 | + sync::Arc, |
| 6 | + task::Poll, |
| 7 | + time::{Duration, Instant}, |
| 8 | +}; |
| 9 | + |
| 10 | +use anyhow::Result; |
| 11 | +use async_trait::async_trait; |
| 12 | +use caryatid_sdk::{MessageBounds, MessageBus, Subscription, SubscriptionBounds}; |
| 13 | +use dashmap::DashMap; |
| 14 | +use futures::{future::BoxFuture, FutureExt}; |
| 15 | +use serde::{Deserialize, Serialize}; |
| 16 | +use tokio::{fs, time}; |
| 17 | + |
| 18 | +#[derive(Default, Clone)] |
| 19 | +struct ReadStreamState { |
| 20 | + read: u64, |
| 21 | + pending_since: Option<Instant>, |
| 22 | +} |
| 23 | + |
| 24 | +#[derive(Default, Clone)] |
| 25 | +struct WriteStreamState { |
| 26 | + written: u64, |
| 27 | + pending_since: Option<Instant>, |
| 28 | +} |
| 29 | + |
| 30 | +#[derive(Default, Clone)] |
| 31 | +struct ModuleState { |
| 32 | + reads: DashMap<String, ReadStreamState>, |
| 33 | + writes: DashMap<String, WriteStreamState>, |
| 34 | +} |
| 35 | + |
| 36 | +#[derive(Serialize)] |
| 37 | +struct SerializedReadStreamState { |
| 38 | + read: u64, |
| 39 | + #[serde(skip_serializing_if = "Option::is_none")] |
| 40 | + unread: Option<u64>, |
| 41 | + #[serde(skip_serializing_if = "Option::is_none")] |
| 42 | + pending_for: Option<String>, |
| 43 | +} |
| 44 | + |
| 45 | +#[derive(Serialize)] |
| 46 | +struct SerializedWriteStreamState { |
| 47 | + written: u64, |
| 48 | + #[serde(skip_serializing_if = "Option::is_none")] |
| 49 | + pending_for: Option<String>, |
| 50 | +} |
| 51 | + |
| 52 | +#[derive(Serialize)] |
| 53 | +struct SerializedModuleState { |
| 54 | + reads: BTreeMap<String, SerializedReadStreamState>, |
| 55 | + writes: BTreeMap<String, SerializedWriteStreamState>, |
| 56 | +} |
| 57 | + |
| 58 | +const fn default_frequency() -> Duration { |
| 59 | + Duration::from_secs(5) |
| 60 | +} |
| 61 | + |
| 62 | +#[derive(Deserialize)] |
| 63 | +pub struct MonitorConfig { |
| 64 | + output: PathBuf, |
| 65 | + #[serde(default = "default_frequency")] |
| 66 | + frequency: Duration, |
| 67 | +} |
| 68 | + |
| 69 | +pub struct Monitor { |
| 70 | + modules: BTreeMap<String, Arc<ModuleState>>, |
| 71 | + stream_writes: Arc<DashMap<String, u64>>, |
| 72 | + output_path: PathBuf, |
| 73 | + write_frequency: Duration, |
| 74 | +} |
| 75 | +impl Monitor { |
| 76 | + pub fn new(config: MonitorConfig) -> Self { |
| 77 | + Self { |
| 78 | + modules: BTreeMap::new(), |
| 79 | + stream_writes: Arc::new(DashMap::new()), |
| 80 | + output_path: config.output, |
| 81 | + write_frequency: config.frequency, |
| 82 | + } |
| 83 | + } |
| 84 | + |
| 85 | + pub fn spy_on_bus<M: MessageBounds>( |
| 86 | + &mut self, |
| 87 | + module_name: &str, |
| 88 | + message_bus: Arc<dyn MessageBus<M>>, |
| 89 | + ) -> Arc<dyn MessageBus<M>> { |
| 90 | + let state = Arc::new(ModuleState::default()); |
| 91 | + self.modules.insert(module_name.to_string(), state.clone()); |
| 92 | + |
| 93 | + Arc::new(MonitorBus { |
| 94 | + inner: message_bus, |
| 95 | + stream_writes: self.stream_writes.clone(), |
| 96 | + state, |
| 97 | + }) |
| 98 | + } |
| 99 | + |
| 100 | + pub async fn monitor(self) { |
| 101 | + loop { |
| 102 | + time::sleep(self.write_frequency).await; |
| 103 | + let now = Instant::now(); |
| 104 | + let state = self |
| 105 | + .modules |
| 106 | + .iter() |
| 107 | + .map(|(name, state)| { |
| 108 | + let reads = state |
| 109 | + .reads |
| 110 | + .iter() |
| 111 | + .map(|kvp| { |
| 112 | + let (topic, data) = kvp.pair(); |
| 113 | + let read = data.read; |
| 114 | + let unread = self |
| 115 | + .stream_writes |
| 116 | + .get(topic) |
| 117 | + .and_then(|w| w.checked_sub(read)) |
| 118 | + .filter(|u| *u > 0); |
| 119 | + let pending_for = data |
| 120 | + .pending_since |
| 121 | + .map(|d| format!("{:?}", now.duration_since(d))); |
| 122 | + let state = SerializedReadStreamState { |
| 123 | + read, |
| 124 | + unread, |
| 125 | + pending_for, |
| 126 | + }; |
| 127 | + (topic.clone(), state) |
| 128 | + }) |
| 129 | + .collect(); |
| 130 | + |
| 131 | + let writes = state |
| 132 | + .writes |
| 133 | + .iter() |
| 134 | + .map(|kvp| { |
| 135 | + let (topic, data) = kvp.pair(); |
| 136 | + let written = data.written; |
| 137 | + let pending_for = data |
| 138 | + .pending_since |
| 139 | + .map(|d| format!("{:?}", now.duration_since(d))); |
| 140 | + let state = SerializedWriteStreamState { |
| 141 | + written, |
| 142 | + pending_for, |
| 143 | + }; |
| 144 | + (topic.clone(), state) |
| 145 | + }) |
| 146 | + .collect(); |
| 147 | + |
| 148 | + (name.clone(), SerializedModuleState { reads, writes }) |
| 149 | + }) |
| 150 | + .collect::<BTreeMap<_, _>>(); |
| 151 | + let serialized = serde_json::to_vec_pretty(&state).expect("could not serialize state"); |
| 152 | + fs::write(&self.output_path, serialized) |
| 153 | + .await |
| 154 | + .expect("could not write file"); |
| 155 | + } |
| 156 | + } |
| 157 | +} |
| 158 | + |
| 159 | +pub struct MonitorBus<M: MessageBounds> { |
| 160 | + inner: Arc<dyn MessageBus<M>>, |
| 161 | + stream_writes: Arc<DashMap<String, u64>>, |
| 162 | + state: Arc<ModuleState>, |
| 163 | +} |
| 164 | + |
| 165 | +#[async_trait] |
| 166 | +impl<M: MessageBounds> MessageBus<M> for MonitorBus<M> { |
| 167 | + async fn publish(&self, topic: &str, message: Arc<M>) -> Result<()> { |
| 168 | + self.state |
| 169 | + .writes |
| 170 | + .entry(topic.to_string()) |
| 171 | + .or_default() |
| 172 | + .pending_since = Some(Instant::now()); |
| 173 | + let res = self.inner.publish(topic, message).await; |
| 174 | + let mut writes = self.state.writes.entry(topic.to_string()).or_default(); |
| 175 | + writes.written += 1; |
| 176 | + writes.pending_since = None; |
| 177 | + *self.stream_writes.entry(topic.to_string()).or_default() += 1; |
| 178 | + res |
| 179 | + } |
| 180 | + |
| 181 | + fn request_timeout(&self) -> std::time::Duration { |
| 182 | + self.inner.request_timeout() |
| 183 | + } |
| 184 | + |
| 185 | + async fn subscribe(&self, topic: &str) -> Result<Box<dyn Subscription<M>>> { |
| 186 | + self.state.reads.entry(topic.to_string()).or_default(); |
| 187 | + Ok(Box::new(MonitorSubscription { |
| 188 | + inner: self.inner.subscribe(topic).await?, |
| 189 | + state: self.state.clone(), |
| 190 | + topic: topic.to_string(), |
| 191 | + })) |
| 192 | + } |
| 193 | + |
| 194 | + async fn shutdown(&self) -> Result<()> { |
| 195 | + self.inner.shutdown().await |
| 196 | + } |
| 197 | +} |
| 198 | + |
| 199 | +struct MonitorSubscription<M: MessageBounds> { |
| 200 | + inner: Box<dyn Subscription<M>>, |
| 201 | + state: Arc<ModuleState>, |
| 202 | + topic: String, |
| 203 | +} |
| 204 | +impl<M: MessageBounds> SubscriptionBounds for MonitorSubscription<M> {} |
| 205 | + |
| 206 | +impl<M: MessageBounds> Subscription<M> for MonitorSubscription<M> { |
| 207 | + fn read(&mut self) -> BoxFuture<'_, Result<(String, Arc<M>)>> { |
| 208 | + Box::pin( |
| 209 | + MonitorReadFuture { |
| 210 | + inner: self.inner.read(), |
| 211 | + state: &self.state, |
| 212 | + topic: &self.topic, |
| 213 | + } |
| 214 | + .fuse(), |
| 215 | + ) |
| 216 | + } |
| 217 | +} |
| 218 | + |
| 219 | +struct MonitorReadFuture<'a, M: MessageBounds> { |
| 220 | + inner: BoxFuture<'a, Result<(String, Arc<M>)>>, |
| 221 | + state: &'a ModuleState, |
| 222 | + topic: &'a str, |
| 223 | +} |
| 224 | +impl<'a, M: MessageBounds> Future for MonitorReadFuture<'a, M> { |
| 225 | + type Output = Result<(String, Arc<M>)>; |
| 226 | + |
| 227 | + fn poll( |
| 228 | + mut self: std::pin::Pin<&mut Self>, |
| 229 | + cx: &mut std::task::Context<'_>, |
| 230 | + ) -> std::task::Poll<Self::Output> { |
| 231 | + let res = self.inner.poll_unpin(cx); |
| 232 | + let mut entry = self.state.reads.entry(self.topic.to_string()).or_default(); |
| 233 | + match &res { |
| 234 | + Poll::Pending => { |
| 235 | + if entry.pending_since.is_none() { |
| 236 | + entry.pending_since = Some(Instant::now()); |
| 237 | + } |
| 238 | + } |
| 239 | + Poll::Ready(r) => { |
| 240 | + entry.pending_since = None; |
| 241 | + if r.is_ok() { |
| 242 | + entry.read += 1; |
| 243 | + } |
| 244 | + } |
| 245 | + } |
| 246 | + res |
| 247 | + } |
| 248 | +} |
| 249 | +impl<'a, M: MessageBounds> Drop for MonitorReadFuture<'a, M> { |
| 250 | + fn drop(&mut self) { |
| 251 | + let mut entry = self.state.reads.entry(self.topic.to_string()).or_default(); |
| 252 | + entry.pending_since = None; |
| 253 | + } |
| 254 | +} |
0 commit comments