Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions doc/developer/design/20230531_compute_metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ All metrics in this list have a `worker_id` label identifying the Timely worker.
* **Labels**: `worker_id`
* **Description**: A histogram of dataflow shutdown durations since restart.
* **Export Type**: prometheus-exporter, through the `mz_dataflow_shutdown_durations_histogram` introspection source
* [ ] `mz_dataflow_frontiers`
* [x] `mz_dataflow_frontiers`
* **Type**: gauge
* **Labels**: `worker_id`, `collection_id`
* **Description**: The frontiers of dataflows.
* **Export Type**: prometheus-exporter, through the `mz_compute_frontiers` introspection source
* **Notes**: To reduce the cardinality of this metric, we limit it to non-transient dataflows.
* [ ] `mz_dataflow_import_frontiers`
* [x] `mz_dataflow_import_frontiers`
* **Type**: gauge
* **Labels**: `worker_id`, `collection_id`
* **Description**: The import frontiers of dataflows.
Expand Down Expand Up @@ -297,16 +297,16 @@ All metrics in this list have a `worker_id` label identifying the Timely worker.
* **Notes**: This metric exists already, with an extra label `arrangement_id`.
Proposing to remove the `arrangement_id` label, because it blows up the cardinality of this metric.
* Reconciliation
* [ ] `mz_compute_reconciliation_reused_dataflows_count_total`
* [x] `mz_compute_reconciliation_reused_dataflows_count_total`
* **Type**: counter
* **Labels**: `worker_id`
* **Description**: The total number of dataflows that were reused during compute reconciliation.
* **Export Type**: direct
* **Notes**: This metric exists already as `mz_compute_reconciliation_reused_dataflows`.
Proposing to rename to follow the Prometheus naming conventions, and adding a worker label.
* [ ] `mz_compute_reconciliation_replaced_dataflows_count_total`
* [x] `mz_compute_reconciliation_replaced_dataflows_count_total`
* **Type**: counter
* **Labels**: `worker_id`
* **Labels**: `worker_id`, `reason`
* **Description**: The total number of dataflows that were replaced during compute reconciliation.
* **Export Type**: direct
* **Notes**: This metric exists already as `mz_compute_reconciliation_replaced_dataflows`.
Expand Down
42 changes: 23 additions & 19 deletions src/compute/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics};
use mz_ore::metric;
use mz_ore::metrics::{
raw, DeleteOnDropGauge, GaugeVec, GaugeVecExt, IntCounter, MetricsRegistry, UIntGauge,
};
use mz_ore::metrics::{raw, DeleteOnDropGauge, GaugeVec, GaugeVecExt, MetricsRegistry, UIntGauge};
use mz_repr::GlobalId;
use prometheus::core::AtomicF64;

Expand All @@ -22,8 +20,8 @@ pub struct ComputeMetrics {
pub history_dataflow_count: UIntGauge,

// reconciliation
pub reconciliation_reused_dataflows: IntCounter,
pub reconciliation_replaced_dataflows: raw::IntCounterVec,
pub reconciliation_reused_dataflows_count_total: raw::IntCounterVec,
pub reconciliation_replaced_dataflows_count_total: raw::IntCounterVec,

// dataflow state
pub dataflow_initial_output_duration_seconds: GaugeVec,
Expand All @@ -41,14 +39,15 @@ impl ComputeMetrics {
name: "mz_compute_replica_history_dataflow_count",
help: "The number of dataflows in the replica's command history.",
)),
reconciliation_reused_dataflows: registry.register(metric!(
name: "mz_compute_reconciliation_reused_dataflows",
help: "The number of dataflows that were reused during compute reconciliation.",
reconciliation_reused_dataflows_count_total: registry.register(metric!(
name: "mz_compute_reconciliation_reused_dataflows_count_total",
help: "The total number of dataflows that were reused during compute reconciliation.",
var_labels: ["worker_id"],
)),
reconciliation_replaced_dataflows: registry.register(metric!(
name: "mz_compute_reconciliation_replaced_dataflows",
help: "The number of dataflows that were replaced during compute reconciliation.",
var_labels: ["reason"],
reconciliation_replaced_dataflows_count_total: registry.register(metric!(
name: "mz_compute_reconciliation_replaced_dataflows_count_total",
help: "The total number of dataflows that were replaced during compute reconciliation.",
var_labels: ["worker_id", "reason"],
)),
dataflow_initial_output_duration_seconds: registry.register(metric!(
name: "mz_dataflow_initial_output_duration_seconds",
Expand Down Expand Up @@ -101,24 +100,29 @@ impl ComputeMetrics {
/// recorded as unsuccessful, with a reason based on the first property that does not hold.
pub fn record_dataflow_reconciliation(
&self,
worker_id: usize,
compatible: bool,
uncompacted: bool,
subscribe_free: bool,
) {
let worker = worker_id.to_string();

if !compatible {
self.reconciliation_replaced_dataflows
.with_label_values(&["incompatible"])
self.reconciliation_replaced_dataflows_count_total
.with_label_values(&[&worker, "incompatible"])
.inc();
} else if !uncompacted {
self.reconciliation_replaced_dataflows
.with_label_values(&["compacted"])
self.reconciliation_replaced_dataflows_count_total
.with_label_values(&[&worker, "compacted"])
.inc();
} else if !subscribe_free {
self.reconciliation_replaced_dataflows
.with_label_values(&["subscribe"])
self.reconciliation_replaced_dataflows_count_total
.with_label_values(&[&worker, "subscribe"])
.inc();
} else {
self.reconciliation_reused_dataflows.inc();
self.reconciliation_reused_dataflows_count_total
.with_label_values(&[&worker])
.inc();
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ impl<'w, A: Allocate> Worker<'w, A> {
}

compute_state.metrics.record_dataflow_reconciliation(
self.timely_worker.index(),
compatible,
uncompacted,
subscribe_free,
Expand Down
8 changes: 6 additions & 2 deletions test/cluster/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -1607,9 +1607,13 @@ def fetch_reconciliation_metrics() -> Tuple[int, int]:
reused = 0
replaced = 0
for metric in metrics.splitlines():
if metric.startswith("mz_compute_reconciliation_reused_dataflows"):
if metric.startswith(
"mz_compute_reconciliation_reused_dataflows_count_total"
):
reused += int(metric.split()[1])
elif metric.startswith("mz_compute_reconciliation_replaced_dataflows"):
elif metric.startswith(
"mz_compute_reconciliation_replaced_dataflows_count_total"
):
replaced += int(metric.split()[1])

return reused, replaced
Expand Down