Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ async fn chain_follower(
});
}
}

tasks_set.shrink_to_fit();
}
}
});
Expand Down Expand Up @@ -772,6 +774,8 @@ impl SyncStateMachine {
self.tipsets
.insert(merged_tipset.key().clone(), merged_tipset);
}

self.tipsets.shrink_to_fit();
}

// Mark blocks in tipset as bad.
Expand Down
83 changes: 75 additions & 8 deletions src/message_pool/msgpool/pending_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

//! Pending message storage and event broadcast.

use ahash::{HashMap, HashMapExt};
use hashbrown::HashMap;
use parking_lot::RwLock as SyncRwLock;
use tokio::sync::broadcast;

Expand Down Expand Up @@ -43,13 +43,13 @@ impl PendingStore {
/// Construct an empty store with the given per-actor limits.
pub(in crate::message_pool) fn new(limits: MsgSetLimits) -> Self {
let (events, _) = broadcast::channel(MPOOL_UPDATE_CHANNEL_CAPACITY);
Self {
inner: Arc::new(Inner {
pending: SyncRwLock::new(HashMap::new()),
events,
limits,
}),
}
let inner = Arc::new(Inner {
pending: SyncRwLock::new(HashMap::new()),
events,
limits,
});
crate::metrics::register_collector(Box::new(InnerMetricsCollector(inner.shallow_clone())));
Self { inner }
}
Comment thread
hanabi1224 marked this conversation as resolved.

/// Insert a signed message under its already-resolved sender address.
Expand Down Expand Up @@ -109,6 +109,11 @@ impl PendingStore {
removed
}

/// Shrinks the capacity of the internal map as much as possible.
pub(in crate::message_pool) fn shrink_to_fit(&self) {
self.inner.pending.write().shrink_to_fit();
}

/// Deep-clone of the pending map — one read-lock acquisition.
pub(in crate::message_pool) fn snapshot(&self) -> HashMap<Address, MsgSet> {
self.inner.pending.read().clone()
Expand All @@ -126,6 +131,68 @@ impl PendingStore {
}
}

#[derive(derive_more::Debug, derive_more::Deref)]
struct InnerMetricsCollector(#[debug(skip)] Arc<Inner>);

mod metrics_collection {
use super::*;
use prometheus_client::{
collector::Collector,
encoding::{DescriptorEncoder, EncodeMetric},
metrics::gauge::Gauge,
registry::Unit,
};

impl Collector for InnerMetricsCollector {
fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> {
{
let size_in_bytes = {
let g: Gauge = Default::default();
g.set(self.pending.read().allocation_size() as i64);
g
};
let size_metric_encoder = encoder.encode_descriptor(
"mpool_pending_size",
"Allocation size of message pool pending messages in bytes",
Some(&Unit::Bytes),
size_in_bytes.metric_type(),
)?;
size_in_bytes.encode(size_metric_encoder)?;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
{
let len = {
let g: Gauge = Default::default();
g.set(self.pending.read().len() as i64);
g
};
let size_metric_encoder = encoder.encode_descriptor(
"mpool_pending_len",
"Length of the message pool pending messages",
None,
len.metric_type(),
)?;
len.encode(size_metric_encoder)?;
}
{
let cap = {
let g: Gauge = Default::default();
g.set(self.pending.read().capacity() as i64);
g
};
let size_metric_encoder = encoder.encode_descriptor(
"mpool_pending_cap",
"Capacity of the message pool pending messages",
None,
cap.metric_type(),
)?;
cap.encode(size_metric_encoder)?;
}

Ok(())
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/message_pool/msgpool/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ where
}
}
}
self.pending.shrink_to_fit();
Ok(())
}

Expand Down
Loading