From a0f741a5ce357d5246c88bb6f91003977c286d45 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Wed, 2 Aug 2023 13:32:43 +0200 Subject: [PATCH] compute: adjust reconciliation metrics This commit makes two adjustments to the compute reconciliation metrics: * renames them to bring them in line with the Prometheus naming conventions * adds a `worker_id` label to bring them in line with the other metrics exported by replicas --- .../design/20230531_compute_metrics.md | 10 ++--- src/compute/src/metrics.rs | 42 ++++++++++--------- src/compute/src/server.rs | 1 + test/cluster/mzcompose.py | 8 +++- 4 files changed, 35 insertions(+), 26 deletions(-) diff --git a/doc/developer/design/20230531_compute_metrics.md b/doc/developer/design/20230531_compute_metrics.md index b75fb745eda8d..a1d6ffc648022 100644 --- a/doc/developer/design/20230531_compute_metrics.md +++ b/doc/developer/design/20230531_compute_metrics.md @@ -249,13 +249,13 @@ All metrics in this list have a `worker_id` label identifying the Timely worker. * **Labels**: `worker_id` * **Description**: A histogram of dataflow shutdown durations since restart. * **Export Type**: prometheus-exporter, through the `mz_dataflow_shutdown_durations_histogram` introspection source - * [ ] `mz_dataflow_frontiers` + * [x] `mz_dataflow_frontiers` * **Type**: gauge * **Labels**: `worker_id`, `collection_id` * **Description**: The frontiers of dataflows. * **Export Type**: prometheus-exporter, through the `mz_compute_frontiers` introspection source * **Notes**: To reduce the cardinality of this metric, we limit it to non-transient dataflows. - * [ ] `mz_dataflow_import_frontiers` + * [x] `mz_dataflow_import_frontiers` * **Type**: gauge * **Labels**: `worker_id`, `collection_id` * **Description**: The import frontiers of dataflows. @@ -297,16 +297,16 @@ All metrics in this list have a `worker_id` label identifying the Timely worker. * **Notes**: This metric exists already, with an extra label `arrangement_id`. Proposing to remove the `arrangement_id` label, because it blows up the cardinality of this metric. * Reconciliation - * [ ] `mz_compute_reconciliation_reused_dataflows_count_total` + * [x] `mz_compute_reconciliation_reused_dataflows_count_total` * **Type**: counter * **Labels**: `worker_id` * **Description**: The total number of dataflows that were reused during compute reconciliation. * **Export Type**: direct * **Notes**: This metric exists already as `mz_compute_reconciliation_reused_dataflows`. Proposing to rename to follow the Prometheus naming conventions, and adding a worker label. - * [ ] `mz_compute_reconciliation_replaced_dataflows_count_total` + * [x] `mz_compute_reconciliation_replaced_dataflows_count_total` * **Type**: counter - * **Labels**: `worker_id` + * **Labels**: `worker_id`, `reason` * **Description**: The total number of dataflows that were replaced during compute reconciliation. * **Export Type**: direct * **Notes**: This metric exists already as `mz_compute_reconciliation_replaced_dataflows`. diff --git a/src/compute/src/metrics.rs b/src/compute/src/metrics.rs index 0aded18b25111..ca12ad79a6474 100644 --- a/src/compute/src/metrics.rs +++ b/src/compute/src/metrics.rs @@ -9,9 +9,7 @@ use mz_compute_client::metrics::{CommandMetrics, HistoryMetrics}; use mz_ore::metric; -use mz_ore::metrics::{ - raw, DeleteOnDropGauge, GaugeVec, GaugeVecExt, IntCounter, MetricsRegistry, UIntGauge, -}; +use mz_ore::metrics::{raw, DeleteOnDropGauge, GaugeVec, GaugeVecExt, MetricsRegistry, UIntGauge}; use mz_repr::GlobalId; use prometheus::core::AtomicF64; @@ -22,8 +20,8 @@ pub struct ComputeMetrics { pub history_dataflow_count: UIntGauge, // reconciliation - pub reconciliation_reused_dataflows: IntCounter, - pub reconciliation_replaced_dataflows: raw::IntCounterVec, + pub reconciliation_reused_dataflows_count_total: raw::IntCounterVec, + pub reconciliation_replaced_dataflows_count_total: raw::IntCounterVec, // dataflow state pub dataflow_initial_output_duration_seconds: GaugeVec, @@ -41,14 +39,15 @@ impl ComputeMetrics { name: "mz_compute_replica_history_dataflow_count", help: "The number of dataflows in the replica's command history.", )), - reconciliation_reused_dataflows: registry.register(metric!( - name: "mz_compute_reconciliation_reused_dataflows", - help: "The number of dataflows that were reused during compute reconciliation.", + reconciliation_reused_dataflows_count_total: registry.register(metric!( + name: "mz_compute_reconciliation_reused_dataflows_count_total", + help: "The total number of dataflows that were reused during compute reconciliation.", + var_labels: ["worker_id"], )), - reconciliation_replaced_dataflows: registry.register(metric!( - name: "mz_compute_reconciliation_replaced_dataflows", - help: "The number of dataflows that were replaced during compute reconciliation.", - var_labels: ["reason"], + reconciliation_replaced_dataflows_count_total: registry.register(metric!( + name: "mz_compute_reconciliation_replaced_dataflows_count_total", + help: "The total number of dataflows that were replaced during compute reconciliation.", + var_labels: ["worker_id", "reason"], )), dataflow_initial_output_duration_seconds: registry.register(metric!( name: "mz_dataflow_initial_output_duration_seconds", @@ -101,24 +100,29 @@ impl ComputeMetrics { /// recorded as unsuccessful, with a reason based on the first property that does not hold. pub fn record_dataflow_reconciliation( &self, + worker_id: usize, compatible: bool, uncompacted: bool, subscribe_free: bool, ) { + let worker = worker_id.to_string(); + if !compatible { - self.reconciliation_replaced_dataflows - .with_label_values(&["incompatible"]) + self.reconciliation_replaced_dataflows_count_total + .with_label_values(&[&worker, "incompatible"]) .inc(); } else if !uncompacted { - self.reconciliation_replaced_dataflows - .with_label_values(&["compacted"]) + self.reconciliation_replaced_dataflows_count_total + .with_label_values(&[&worker, "compacted"]) .inc(); } else if !subscribe_free { - self.reconciliation_replaced_dataflows - .with_label_values(&["subscribe"]) + self.reconciliation_replaced_dataflows_count_total + .with_label_values(&[&worker, "subscribe"]) .inc(); } else { - self.reconciliation_reused_dataflows.inc(); + self.reconciliation_reused_dataflows_count_total + .with_label_values(&[&worker]) + .inc(); } } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c0bcf2f3560a3..ed32aeb600077 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -559,6 +559,7 @@ impl<'w, A: Allocate> Worker<'w, A> { } compute_state.metrics.record_dataflow_reconciliation( + self.timely_worker.index(), compatible, uncompacted, subscribe_free, diff --git a/test/cluster/mzcompose.py b/test/cluster/mzcompose.py index 21abc9ee76dd2..3ef0568f461fa 100644 --- a/test/cluster/mzcompose.py +++ b/test/cluster/mzcompose.py @@ -1607,9 +1607,13 @@ def fetch_reconciliation_metrics() -> Tuple[int, int]: reused = 0 replaced = 0 for metric in metrics.splitlines(): - if metric.startswith("mz_compute_reconciliation_reused_dataflows"): + if metric.startswith( + "mz_compute_reconciliation_reused_dataflows_count_total" + ): reused += int(metric.split()[1]) - elif metric.startswith("mz_compute_reconciliation_replaced_dataflows"): + elif metric.startswith( + "mz_compute_reconciliation_replaced_dataflows_count_total" + ): replaced += int(metric.split()[1]) return reused, replaced