Skip to content

Commit ffdd3f1

Browse files
yokazemorimoto-cybozu
authored andcommitted
reboot: implement data struct and storage API
Signed-off-by: Daichi Sakaue <daichi-sakaue@cybozu.co.jp> Signed-off-by: morimoto-cybozu <kenji_morimoto@cybozu.co.jp>
1 parent 649e1d1 commit ffdd3f1

File tree

4 files changed

+328
-5
lines changed

4 files changed

+328
-5
lines changed

docs/reboot.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ Data schema
2323

2424
### `RebootQueueEntry`
2525

26-
| Name | Type | Description |
27-
| -------- | -------- | ------------------------------------------ |
28-
| `index` | uint64 | Index number of entry. |
29-
| `nodes` | []string | A list of IP addresses of nodes to reboot. |
30-
| `status` | string | One of `queued`, `rebooting`, `cancelled`. |
26+
| Name | Type | Description |
27+
| -------- | -------- | --------------------------------------------- |
28+
| `index` | string | Index number of entry, formatted as a string. |
29+
| `nodes` | []string | A list of IP addresses of nodes to reboot. |
30+
| `status` | string | One of `queued`, `rebooting`, `cancelled`. |
3131

3232

3333
Detailed behavior

reboot.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package cke
2+
3+
// RebootStatus is status of reboot operation
4+
type RebootStatus string
5+
6+
// Reboot statuses
7+
const (
8+
RebootStatusQueued = RebootStatus("queued")
9+
RebootStatusRebooting = RebootStatus("rebooting")
10+
RebootStatusCancelled = RebootStatus("cancelled")
11+
)
12+
13+
// RebootQueueEntry represents a queue entry of reboot operation
14+
type RebootQueueEntry struct {
15+
Index int64 `json:"index,string"`
16+
Nodes []string `json:"nodes"`
17+
Status RebootStatus `json:"status"`
18+
}
19+
20+
// NewRebootQueueEntry creates new `RebootQueueEntry`.
21+
// `Index` will be supplied in registration.
22+
func NewRebootQueueEntry(nodes []string) *RebootQueueEntry {
23+
return &RebootQueueEntry{
24+
Nodes: nodes,
25+
Status: RebootStatusQueued,
26+
}
27+
}
28+
29+
// Cancel cancels the entry.
30+
func (r *RebootQueueEntry) Cancel() {
31+
r.Status = RebootStatusCancelled
32+
}

storage.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const (
3030
KeyClusterRevision = "cluster-revision"
3131
KeyConstraints = "constraints"
3232
KeyLeader = "leader/"
33+
KeyRebootsPrefix = "reboots/data/"
34+
KeyRebootsWriteIndex = "reboots/write-index"
3335
KeyRecords = "records/"
3436
KeyRecordID = "records"
3537
KeyResourcePrefix = "resource/"
@@ -685,6 +687,179 @@ func (s Storage) GetSabakanURL(ctx context.Context) (string, error) {
685687
return s.getStringValue(ctx, KeySabakanURL)
686688
}
687689

690+
func rebootsEntryKey(index int64) string {
691+
return fmt.Sprintf("%s%016x", KeyRebootsPrefix, index)
692+
}
693+
694+
// RegisterRebootsEntry enqueues a reboot queue entry to the reboot queue.
695+
// "Index" of the entry is retrieved and updated in this method. The given value is ignored.
696+
func (s Storage) RegisterRebootsEntry(ctx context.Context, r *RebootQueueEntry) error {
697+
RETRY:
698+
var writeIndex, writeIndexRev int64
699+
resp, err := s.Get(ctx, KeyRebootsWriteIndex)
700+
if err != nil {
701+
return err
702+
}
703+
if resp.Count != 0 {
704+
value, err := strconv.ParseInt(string(resp.Kvs[0].Value), 10, 64)
705+
if err != nil {
706+
return err
707+
}
708+
writeIndex = value
709+
writeIndexRev = resp.Kvs[0].ModRevision
710+
}
711+
712+
r.Index = writeIndex
713+
data, err := json.Marshal(r)
714+
if err != nil {
715+
return err
716+
}
717+
718+
nextWriteIndex := strconv.FormatInt(writeIndex+1, 10)
719+
txnResp, err := s.Txn(ctx).
720+
If(
721+
clientv3.Compare(clientv3.ModRevision(KeyRebootsWriteIndex), "=", writeIndexRev),
722+
).
723+
Then(
724+
clientv3.OpPut(rebootsEntryKey(writeIndex), string(data)),
725+
clientv3.OpPut(KeyRebootsWriteIndex, nextWriteIndex),
726+
).
727+
Commit()
728+
if err != nil {
729+
return err
730+
}
731+
if !txnResp.Succeeded {
732+
goto RETRY
733+
}
734+
735+
return nil
736+
}
737+
738+
// UpdateRebootsEntry updates existing reboot queue entry.
739+
// It always overwrites the contents with a CAS loop.
740+
// If the entry is not found in the reboot queue, this returns ErrNotFound.
741+
func (s Storage) UpdateRebootsEntry(ctx context.Context, r *RebootQueueEntry) error {
742+
key := rebootsEntryKey(r.Index)
743+
data, err := json.Marshal(r)
744+
if err != nil {
745+
return err
746+
}
747+
748+
RETRY:
749+
resp, err := s.Get(ctx, key)
750+
if err != nil {
751+
return err
752+
}
753+
if resp.Count == 0 {
754+
return ErrNotFound
755+
}
756+
757+
rev := resp.Kvs[0].ModRevision
758+
txnResp, err := s.Txn(ctx).
759+
If(
760+
clientv3.Compare(clientv3.ModRevision(key), "=", rev),
761+
).
762+
Then(
763+
clientv3.OpPut(key, string(data)),
764+
).
765+
Commit()
766+
if err != nil {
767+
return err
768+
}
769+
if !txnResp.Succeeded {
770+
goto RETRY
771+
}
772+
773+
return nil
774+
}
775+
776+
// GetRebootsEntry loads the entry specified by the index from the reboot queue.
777+
// If the pointed entry is not found, this returns ErrNotFound.
778+
func (s Storage) GetRebootsEntry(ctx context.Context, index int64) (*RebootQueueEntry, error) {
779+
resp, err := s.Get(ctx, rebootsEntryKey(index))
780+
if err != nil {
781+
return nil, err
782+
}
783+
784+
if len(resp.Kvs) == 0 {
785+
return nil, ErrNotFound
786+
}
787+
788+
r := new(RebootQueueEntry)
789+
err = json.Unmarshal(resp.Kvs[0].Value, r)
790+
if err != nil {
791+
return nil, err
792+
}
793+
794+
return r, nil
795+
}
796+
797+
func (s Storage) getRebootsEntries(ctx context.Context, count int64) ([]*RebootQueueEntry, error) {
798+
opts := []clientv3.OpOption{
799+
clientv3.WithPrefix(),
800+
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
801+
}
802+
if count > 0 {
803+
opts = append(opts, clientv3.WithLimit(count))
804+
}
805+
resp, err := s.Get(ctx, KeyRebootsPrefix, opts...)
806+
if err != nil {
807+
return nil, err
808+
}
809+
810+
if len(resp.Kvs) == 0 {
811+
return nil, nil
812+
}
813+
814+
reboots := make([]*RebootQueueEntry, len(resp.Kvs))
815+
for i, kv := range resp.Kvs {
816+
r := new(RebootQueueEntry)
817+
err = json.Unmarshal(kv.Value, r)
818+
if err != nil {
819+
return nil, err
820+
}
821+
reboots[i] = r
822+
}
823+
824+
return reboots, nil
825+
}
826+
827+
// GetRebootsEntries loads the entries from the reboot queue.
828+
func (s Storage) GetRebootsEntries(ctx context.Context) ([]*RebootQueueEntry, error) {
829+
return s.getRebootsEntries(ctx, 0)
830+
}
831+
832+
// GetRebootsFrontEntry loads the front entry from the reboot queue.
833+
// If the queue is empty, this returns ErrNotFound.
834+
func (s Storage) GetRebootsFrontEntry(ctx context.Context) (*RebootQueueEntry, error) {
835+
reboots, err := s.getRebootsEntries(ctx, 1)
836+
if err != nil {
837+
return nil, err
838+
}
839+
840+
if len(reboots) == 0 {
841+
return nil, ErrNotFound
842+
}
843+
844+
return reboots[0], nil
845+
}
846+
847+
// DeleteRebootsEntry deletes the entry specified by the index from the reboot queue.
848+
func (s Storage) DeleteRebootsEntry(ctx context.Context, leaderKey string, index int64) error {
849+
resp, err := s.Txn(ctx).
850+
If(clientv3util.KeyExists(leaderKey)).
851+
Then(clientv3.OpDelete(rebootsEntryKey(index))).
852+
Commit()
853+
if err != nil {
854+
return err
855+
}
856+
if !resp.Succeeded {
857+
return ErrNoLeader
858+
}
859+
860+
return nil
861+
}
862+
688863
// SetStatus stores the server status.
689864
func (s Storage) SetStatus(ctx context.Context, lease clientv3.LeaseID, st *ServerStatus) error {
690865
data, err := json.Marshal(st)

storage_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,121 @@ func testStorageSabakan(t *testing.T) {
627627
}
628628
}
629629

630+
func testStorageReboot(t *testing.T) {
631+
t.Parallel()
632+
633+
client := newEtcdClient(t)
634+
defer client.Close()
635+
storage := Storage{client}
636+
ctx := context.Background()
637+
638+
s, err := concurrency.NewSession(client)
639+
if err != nil {
640+
t.Fatal(err)
641+
}
642+
defer s.Close()
643+
e := concurrency.NewElection(s, KeyLeader)
644+
err = e.Campaign(ctx, "test")
645+
if err != nil {
646+
t.Fatal(err)
647+
}
648+
649+
leaderKey := e.Key()
650+
651+
_, err = storage.GetRebootsEntry(ctx, 0)
652+
if err != ErrNotFound {
653+
t.Error("unexptected error:", err)
654+
}
655+
656+
_, err = storage.GetRebootsFrontEntry(ctx)
657+
if err != ErrNotFound {
658+
t.Error("unexptected error:", err)
659+
}
660+
661+
ents, err := storage.GetRebootsEntries(ctx)
662+
if err != nil {
663+
t.Fatal("GetRebootsEntries failed:", err)
664+
}
665+
if len(ents) != 0 {
666+
t.Error("Unknown entries:", ents)
667+
}
668+
669+
err = storage.DeleteRebootsEntry(ctx, leaderKey, 0)
670+
if err != nil {
671+
t.Fatal("DeleteRebootsEntry failed:", err)
672+
}
673+
674+
nodes := []string{
675+
"1.2.3.4",
676+
"5.6.7.8",
677+
}
678+
entry := NewRebootQueueEntry(nodes)
679+
err = storage.RegisterRebootsEntry(ctx, entry)
680+
if err != nil {
681+
t.Fatal("RegisterRebootsEntry failed:", err)
682+
}
683+
684+
nodes2 := []string{
685+
"12.34.56.78",
686+
}
687+
entry2 := NewRebootQueueEntry(nodes2)
688+
err = storage.RegisterRebootsEntry(ctx, entry2)
689+
if err != nil {
690+
t.Fatal("RegisterRebootsEntry failed:", err)
691+
}
692+
693+
ent, err := storage.GetRebootsEntry(ctx, 1)
694+
if err != nil {
695+
t.Fatal("GetRebootsEntry failed:", err)
696+
}
697+
if !cmp.Equal(ent, entry2) {
698+
t.Error("GetRebootsEntry returned unexpected result:", cmp.Diff(ent, entry2))
699+
}
700+
701+
ent, err = storage.GetRebootsFrontEntry(ctx)
702+
if err != nil {
703+
t.Fatal("GetRebootsFrontEntry failed:", err)
704+
}
705+
if !cmp.Equal(ent, entry) {
706+
t.Error("GetRebootsFrontEntry returned unexpected result:", cmp.Diff(ent, entry))
707+
}
708+
709+
entries := []*RebootQueueEntry{entry, entry2}
710+
ents, err = storage.GetRebootsEntries(ctx)
711+
if err != nil {
712+
t.Fatal("GetRebootsEntries failed:", err)
713+
}
714+
if !cmp.Equal(ents, entries) {
715+
t.Error("GetRebootsEntries returned unexpected result:", cmp.Diff(ents, entries))
716+
}
717+
718+
entry.Status = RebootStatusRebooting
719+
err = storage.UpdateRebootsEntry(ctx, entry)
720+
if err != nil {
721+
t.Fatal("UpdateRebootsEntry failed:", err)
722+
}
723+
ent, err = storage.GetRebootsEntry(ctx, 0)
724+
if err != nil {
725+
t.Fatal("GetRebootsEntry failed:", err)
726+
}
727+
if !cmp.Equal(ent, entry) {
728+
t.Error("GetRebootsFrontEntry returned unexpected result:", cmp.Diff(ent, entry))
729+
}
730+
731+
err = storage.DeleteRebootsEntry(ctx, leaderKey, 0)
732+
if err != nil {
733+
t.Fatal("DeleteRebootsEntry failed:", err)
734+
}
735+
_, err = storage.GetRebootsEntry(ctx, 0)
736+
if err != ErrNotFound {
737+
t.Error("unexptected error:", err)
738+
}
739+
err = storage.UpdateRebootsEntry(ctx, entry)
740+
if err == nil {
741+
t.Error("UpdateRebootsEntry succeeded for deleted entry")
742+
}
743+
}
744+
630745
func testStatus(t *testing.T) {
631746
t.Parallel()
632747

@@ -677,5 +792,6 @@ func TestStorage(t *testing.T) {
677792
t.Run("Maint", testStorageMaint)
678793
t.Run("Resource", testStorageResource)
679794
t.Run("Sabakan", testStorageSabakan)
795+
t.Run("Reboot", testStorageReboot)
680796
t.Run("Status", testStatus)
681797
}

0 commit comments

Comments
 (0)