Skip to content

Commit 1051efc

Browse files
committed
reboot: add reboot_queue_entries metrics
Signed-off-by: Daichi Sakaue <daichi-sakaue@cybozu.co.jp>
1 parent d05e106 commit 1051efc

File tree

9 files changed

+107
-46
lines changed

9 files changed

+107
-46
lines changed

docs/metrics.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST
88
| leader | True (=1) if this server is the leader of CKE. | Gauge | |
99
| operation_phase | 1 if CKE is operating in the phase specified by the `phase` label. | Gauge | `phase` |
1010
| operation_phase_timestamp_seconds | The Unix timestamp when `operation_phase` was last updated. | Gauge | |
11+
| reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | |
1112
| sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | |
1213
| sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | |
1314
| sabakan_workers | The number of worker nodes for each role. | Gauge | `role` |

metrics/collector.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ func NewCollector(client *v3.Client) prometheus.Collector {
5454
collectors: []prometheus.Collector{operationPhase, operationPhaseTimestampSeconds},
5555
isAvailable: isOperationPhaseAvailable,
5656
},
57+
"reboot": {
58+
collectors: []prometheus.Collector{rebootQueueEntries},
59+
isAvailable: isRebootAvailable,
60+
},
5761
"sabakan_integration": {
5862
collectors: []prometheus.Collector{sabakanIntegrationSuccessful, sabakanIntegrationTimestampSeconds, sabakanWorkers, sabakanUnusedMachines},
5963
isAvailable: isSabakanIntegrationAvailable,

metrics/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ var operationPhaseTimestampSeconds = prometheus.NewGauge(
3333
},
3434
)
3535

36+
var rebootQueueEntries = prometheus.NewGauge(
37+
prometheus.GaugeOpts{
38+
Namespace: namespace,
39+
Name: "reboot_queue_entries",
40+
Help: "The number of reboot queue entries remaining.",
41+
},
42+
)
43+
3644
var sabakanIntegrationSuccessful = prometheus.NewGauge(
3745
prometheus.GaugeOpts{
3846
Namespace: namespace,

metrics/updater.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@ func isOperationPhaseAvailable(_ context.Context, _ storage) (bool, error) {
3939
return isLeader, nil
4040
}
4141

42+
// UpdateReboot updates "reboot_queue_entries".
43+
func UpdateReboot(numEntries int) {
44+
rebootQueueEntries.Set(float64(numEntries))
45+
}
46+
47+
// DecrementReboot decrements "reboot_queue_entries".
48+
func DecrementReboot() {
49+
rebootQueueEntries.Dec()
50+
}
51+
52+
func isRebootAvailable(_ context.Context, _ storage) (bool, error) {
53+
return isLeader, nil
54+
}
55+
4256
// UpdateSabakanIntegration updates Sabakan integration metrics.
4357
func UpdateSabakanIntegration(isSuccessful bool, workersByRole map[string]int, unusedMachines int, ts time.Time) {
4458
sabakanIntegrationTimestampSeconds.Set(float64(ts.Unix()))

metrics/updater_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ type updateOperationPhaseTestCase struct {
4040
expected operationPhaseExpected
4141
}
4242

43+
type updateRebootTestCase struct {
44+
name string
45+
input int
46+
expected float64
47+
}
48+
4349
type sabakanInput struct {
4450
isLeader bool
4551
enabled bool
@@ -64,6 +70,7 @@ type updateSabakanIntegrationTestCase struct {
6470
func TestMetricsUpdater(t *testing.T) {
6571
t.Run("UpdateLeader", testUpdateLeader)
6672
t.Run("UpdateOperationPhase", testUpdateOperationPhase)
73+
t.Run("UpdateReboot", testUpdateReboot)
6774
t.Run("UpdateSabakanIntegration", testUpdateSabakanIntegration)
6875
}
6976

@@ -232,6 +239,62 @@ func testUpdateOperationPhase(t *testing.T) {
232239
}
233240
}
234241

242+
func testUpdateReboot(t *testing.T) {
243+
testCases := []updateRebootTestCase{
244+
{
245+
name: "zero",
246+
input: 0,
247+
expected: 0,
248+
},
249+
{
250+
name: "one",
251+
input: 1,
252+
expected: 1,
253+
},
254+
{
255+
name: "two",
256+
input: 2,
257+
expected: 2,
258+
},
259+
}
260+
for _, tt := range testCases {
261+
t.Run(tt.name, func(t *testing.T) {
262+
ctx := context.Background()
263+
defer ctx.Done()
264+
265+
collector, _ := newTestCollector()
266+
handler := GetHandler(collector)
267+
268+
UpdateReboot(tt.input)
269+
270+
w := httptest.NewRecorder()
271+
req := httptest.NewRequest("GET", "/metrics", nil)
272+
handler.ServeHTTP(w, req)
273+
274+
metricsFamily, err := parseMetrics(w.Result())
275+
if err != nil {
276+
t.Fatal(err)
277+
}
278+
279+
metricsFound := false
280+
for _, mf := range metricsFamily {
281+
if *mf.Name != "cke_reboot_queue_entries" {
282+
continue
283+
}
284+
for _, m := range mf.Metric {
285+
metricsFound = true
286+
if *m.Gauge.Value != tt.expected {
287+
t.Errorf("value for cke_reboot_queue_entries is wrong. expected: %f, actual: %f", tt.expected, *m.Gauge.Value)
288+
}
289+
}
290+
}
291+
if !metricsFound {
292+
t.Errorf("metrics reboot_queue_entries was not found")
293+
}
294+
})
295+
}
296+
}
297+
235298
func testUpdateSabakanIntegration(t *testing.T) {
236299
testCases := []updateSabakanIntegrationTestCase{
237300
{

op/reboot_dequeue.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"strconv"
66

77
"github.com/cybozu-go/cke"
8+
"github.com/cybozu-go/cke/metrics"
89
)
910

1011
type rebootDequeueOp struct {
@@ -41,7 +42,12 @@ type rebootDequeueCommand struct {
4142
}
4243

4344
func (c rebootDequeueCommand) Run(ctx context.Context, inf cke.Infrastructure, leaderKey string) error {
44-
return inf.Storage().DeleteRebootsEntry(ctx, leaderKey, c.index)
45+
err := inf.Storage().DeleteRebootsEntry(ctx, leaderKey, c.index)
46+
if err != nil {
47+
return err
48+
}
49+
metrics.DecrementReboot()
50+
return nil
4551
}
4652

4753
func (c rebootDequeueCommand) Command() cke.Command {

server/control.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -266,15 +266,15 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
266266
return err
267267
}
268268

269-
reboot, err := inf.Storage().GetRebootsFrontEntry(ctx)
270-
switch err {
271-
case nil:
272-
case cke.ErrNotFound:
273-
reboot = nil
274-
default:
269+
re, err := inf.Storage().GetRebootsEntries(ctx)
270+
if err != nil {
275271
return err
276272
}
277-
273+
metrics.UpdateReboot(len(re))
274+
var reboot *cke.RebootQueueEntry
275+
if len(re) > 0 {
276+
reboot = re[0]
277+
}
278278
ops, phase := DecideOps(cluster, status, constraints, rcs, reboot)
279279

280280
st := &cke.ServerStatus{

storage.go

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -794,14 +794,12 @@ func (s Storage) GetRebootsEntry(ctx context.Context, index int64) (*RebootQueue
794794
return r, nil
795795
}
796796

797-
func (s Storage) getRebootsEntries(ctx context.Context, count int64) ([]*RebootQueueEntry, error) {
797+
// GetRebootsEntries loads the entries from the reboot queue.
798+
func (s Storage) GetRebootsEntries(ctx context.Context) ([]*RebootQueueEntry, error) {
798799
opts := []clientv3.OpOption{
799800
clientv3.WithPrefix(),
800801
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
801802
}
802-
if count > 0 {
803-
opts = append(opts, clientv3.WithLimit(count))
804-
}
805803
resp, err := s.Get(ctx, KeyRebootsPrefix, opts...)
806804
if err != nil {
807805
return nil, err
@@ -824,26 +822,6 @@ func (s Storage) getRebootsEntries(ctx context.Context, count int64) ([]*RebootQ
824822
return reboots, nil
825823
}
826824

827-
// GetRebootsEntries loads the entries from the reboot queue.
828-
func (s Storage) GetRebootsEntries(ctx context.Context) ([]*RebootQueueEntry, error) {
829-
return s.getRebootsEntries(ctx, 0)
830-
}
831-
832-
// GetRebootsFrontEntry loads the front entry from the reboot queue.
833-
// If the queue is empty, this returns ErrNotFound.
834-
func (s Storage) GetRebootsFrontEntry(ctx context.Context) (*RebootQueueEntry, error) {
835-
reboots, err := s.getRebootsEntries(ctx, 1)
836-
if err != nil {
837-
return nil, err
838-
}
839-
840-
if len(reboots) == 0 {
841-
return nil, ErrNotFound
842-
}
843-
844-
return reboots[0], nil
845-
}
846-
847825
// DeleteRebootsEntry deletes the entry specified by the index from the reboot queue.
848826
func (s Storage) DeleteRebootsEntry(ctx context.Context, leaderKey string, index int64) error {
849827
resp, err := s.Txn(ctx).

storage_test.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -653,11 +653,6 @@ func testStorageReboot(t *testing.T) {
653653
t.Error("unexptected error:", err)
654654
}
655655

656-
_, err = storage.GetRebootsFrontEntry(ctx)
657-
if err != ErrNotFound {
658-
t.Error("unexptected error:", err)
659-
}
660-
661656
ents, err := storage.GetRebootsEntries(ctx)
662657
if err != nil {
663658
t.Fatal("GetRebootsEntries failed:", err)
@@ -698,14 +693,6 @@ func testStorageReboot(t *testing.T) {
698693
t.Error("GetRebootsEntry returned unexpected result:", cmp.Diff(ent, entry2))
699694
}
700695

701-
ent, err = storage.GetRebootsFrontEntry(ctx)
702-
if err != nil {
703-
t.Fatal("GetRebootsFrontEntry failed:", err)
704-
}
705-
if !cmp.Equal(ent, entry) {
706-
t.Error("GetRebootsFrontEntry returned unexpected result:", cmp.Diff(ent, entry))
707-
}
708-
709696
entries := []*RebootQueueEntry{entry, entry2}
710697
ents, err = storage.GetRebootsEntries(ctx)
711698
if err != nil {
@@ -725,7 +712,7 @@ func testStorageReboot(t *testing.T) {
725712
t.Fatal("GetRebootsEntry failed:", err)
726713
}
727714
if !cmp.Equal(ent, entry) {
728-
t.Error("GetRebootsFrontEntry returned unexpected result:", cmp.Diff(ent, entry))
715+
t.Error("GetRebootsEntry returned unexpected result:", cmp.Diff(ent, entry))
729716
}
730717

731718
err = storage.DeleteRebootsEntry(ctx, leaderKey, 0)

0 commit comments

Comments
 (0)