Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ impl BroadcastIngesterCapacityScoreTask {
.map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?;
let usage = guard.mrecordlog.resource_usage();
let disk_used = ByteSize::b(usage.disk_used_bytes as u64);
let capacity_score = guard.wal_capacity_time_series.record_and_score(disk_used);
let memory_used = ByteSize::b(usage.memory_used_bytes as u64);
let capacity_score = guard
.wal_capacity_tracker
.record_and_score(disk_used, memory_used);
let (open_shard_counts, _) = guard.get_shard_snapshot();

Ok(Some((capacity_score, open_shard_counts)))
Expand Down Expand Up @@ -218,8 +221,8 @@ mod tests {
state_guard.shards.insert(shard.queue_id(), shard);
let (open_shard_counts, _) = state_guard.get_shard_snapshot();
let capacity_score = state_guard
.wal_capacity_time_series
.record_and_score(ByteSize::b(500));
.wal_capacity_tracker
.record_and_score(ByteSize::b(500), ByteSize::b(0));
drop(state_guard);

assert_eq!(capacity_score, 6);
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#[allow(dead_code)]
mod ingester_capacity_score;
mod capacity_score;
mod local_shards;

use std::time::Duration;
Expand All @@ -26,7 +26,7 @@ pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(tes
Duration::from_secs(5)
};

pub use ingester_capacity_score::{
pub use capacity_score::{
BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate,
setup_ingester_capacity_update_listener,
};
Expand Down
13 changes: 10 additions & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ impl Ingester {
idle_shard_timeout: Duration,
) -> IngestV2Result<Self> {
let self_node_id: NodeId = cluster.self_node_id().into();
let state = IngesterState::load(wal_dir_path, disk_capacity, rate_limiter_settings);
let state = IngesterState::load(
wal_dir_path,
disk_capacity,
memory_capacity,
rate_limiter_settings,
);

let weak_state = state.weak();
BroadcastLocalShardsTask::spawn(cluster.clone(), weak_state.clone());
Expand Down Expand Up @@ -784,10 +789,12 @@ impl Ingester {
}
let wal_usage = state_guard.mrecordlog.resource_usage();
let disk_used = wal_usage.disk_used_bytes as u64;
let memory_used = wal_usage.memory_used_bytes as u64;
let (open_shard_counts, closed_shards) = state_guard.get_shard_snapshot();
let capacity_score = state_guard
.wal_capacity_time_series
.score(ByteSize::b(disk_used)) as u32;
.wal_capacity_tracker
.score(ByteSize::b(disk_used), ByteSize::b(memory_used))
as u32;
drop(state_guard);

if disk_used >= self.disk_capacity.as_u64() * 90 / 100 {
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,20 @@ pub(super) struct IngestV2Metrics {
pub wal_disk_used_bytes: IntGauge,
pub wal_memory_used_bytes: IntGauge,
pub ingest_results: IngestResultMetrics,
pub ingest_attempts: IntCounterVec<1>,
}

impl Default for IngestV2Metrics {
fn default() -> Self {
Self {
ingest_results: IngestResultMetrics::default(),
ingest_attempts: new_counter_vec::<1>(
"ingest_attempts",
"Number of routing attempts by AZ locality",
"ingest",
&[],
["az_routing"],
),
reset_shards_operations_total: new_counter_vec(
"reset_shards_operations_total",
"Total number of reset shards operations performed.",
Expand Down
4 changes: 1 addition & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ mod metrics;
mod models;
mod mrecord;
mod mrecordlog_utils;
mod node_routing_table;
mod publish_tracker;
mod rate_meter;
mod replication;
mod router;
#[allow(dead_code)]
mod routing_table;
mod state;
mod wal_capacity_timeseries;
mod wal_capacity_tracker;
mod workbench;

use std::collections::HashMap;
Expand Down
Loading