diff --git a/src/chain_sync/chain_follower.rs b/src/chain_sync/chain_follower.rs index a9a854a75b6..2b44ef2fe43 100644 --- a/src/chain_sync/chain_follower.rs +++ b/src/chain_sync/chain_follower.rs @@ -359,6 +359,8 @@ async fn chain_follower( }); } } + + tasks_set.shrink_to_fit(); } } }); @@ -772,6 +774,8 @@ impl SyncStateMachine { self.tipsets .insert(merged_tipset.key().clone(), merged_tipset); } + + self.tipsets.shrink_to_fit(); } // Mark blocks in tipset as bad. diff --git a/src/message_pool/msgpool/pending_store.rs b/src/message_pool/msgpool/pending_store.rs index 0c49f18357a..b04e2f393c9 100644 --- a/src/message_pool/msgpool/pending_store.rs +++ b/src/message_pool/msgpool/pending_store.rs @@ -3,7 +3,7 @@ //! Pending message storage and event broadcast. -use ahash::{HashMap, HashMapExt}; +use hashbrown::HashMap; use parking_lot::RwLock as SyncRwLock; use tokio::sync::broadcast; @@ -43,13 +43,13 @@ impl PendingStore { /// Construct an empty store with the given per-actor limits. pub(in crate::message_pool) fn new(limits: MsgSetLimits) -> Self { let (events, _) = broadcast::channel(MPOOL_UPDATE_CHANNEL_CAPACITY); - Self { - inner: Arc::new(Inner { - pending: SyncRwLock::new(HashMap::new()), - events, - limits, - }), - } + let inner = Arc::new(Inner { + pending: SyncRwLock::new(HashMap::new()), + events, + limits, + }); + crate::metrics::register_collector(Box::new(InnerMetricsCollector(inner.shallow_clone()))); + Self { inner } } /// Insert a signed message under its already-resolved sender address. @@ -109,6 +109,11 @@ impl PendingStore { removed } + /// Shrinks the capacity of the internal map as much as possible. + pub(in crate::message_pool) fn shrink_to_fit(&self) { + self.inner.pending.write().shrink_to_fit(); + } + /// Deep-clone of the pending map — one read-lock acquisition. pub(in crate::message_pool) fn snapshot(&self) -> HashMap { self.inner.pending.read().clone() @@ -126,6 +131,68 @@ impl PendingStore { } } +#[derive(derive_more::Debug, derive_more::Deref)] +struct InnerMetricsCollector(#[debug(skip)] Arc); + +mod metrics_collection { + use super::*; + use prometheus_client::{ + collector::Collector, + encoding::{DescriptorEncoder, EncodeMetric}, + metrics::gauge::Gauge, + registry::Unit, + }; + + impl Collector for InnerMetricsCollector { + fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> { + { + let size_in_bytes = { + let g: Gauge = Default::default(); + g.set(self.pending.read().allocation_size() as i64); + g + }; + let size_metric_encoder = encoder.encode_descriptor( + "mpool_pending_size", + "Allocation size of message pool pending messages in bytes", + Some(&Unit::Bytes), + size_in_bytes.metric_type(), + )?; + size_in_bytes.encode(size_metric_encoder)?; + } + { + let len = { + let g: Gauge = Default::default(); + g.set(self.pending.read().len() as i64); + g + }; + let size_metric_encoder = encoder.encode_descriptor( + "mpool_pending_len", + "Length of the message pool pending messages", + None, + len.metric_type(), + )?; + len.encode(size_metric_encoder)?; + } + { + let cap = { + let g: Gauge = Default::default(); + g.set(self.pending.read().capacity() as i64); + g + }; + let size_metric_encoder = encoder.encode_descriptor( + "mpool_pending_cap", + "Capacity of the message pool pending messages", + None, + cap.metric_type(), + )?; + cap.encode(size_metric_encoder)?; + } + + Ok(()) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/message_pool/msgpool/reorg.rs b/src/message_pool/msgpool/reorg.rs index 474cf300067..128779a5304 100644 --- a/src/message_pool/msgpool/reorg.rs +++ b/src/message_pool/msgpool/reorg.rs @@ -105,6 +105,7 @@ where } } } + self.pending.shrink_to_fit(); Ok(()) }