Skip to content

Commit 03ba270

Browse files
committed
Fix passing default grpc call options in Kubernetes client
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
1 parent f739ef4 commit 03ba270

File tree

1 file changed

+30
-59
lines changed

1 file changed

+30
-59
lines changed

client/v3/kubernetes/client.go

Lines changed: 30 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package kubernetes
1616

1717
import (
1818
"context"
19+
"fmt"
1920

2021
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
2122
"go.etcd.io/etcd/api/v3/mvccpb"
@@ -31,7 +32,6 @@ func New(cfg clientv3.Config) (*Client, error) {
3132
}
3233
kc := &Client{
3334
Client: c,
34-
kv: clientv3.RetryKVClient(c),
3535
}
3636
kc.Kubernetes = kc
3737
return kc, nil
@@ -40,15 +40,14 @@ func New(cfg clientv3.Config) (*Client, error) {
4040
type Client struct {
4141
*clientv3.Client
4242
Kubernetes Interface
43-
kv pb.KVClient
4443
}
4544

4645
var _ Interface = (*Client)(nil)
4746

4847
func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) {
49-
rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision))
48+
rangeResp, err := k.KV.Get(ctx, key, clientv3.WithRev(opts.Revision), clientv3.WithLimit(1))
5049
if err != nil {
51-
return resp, clientv3.ContextError(ctx, err)
50+
return resp, err
5251
}
5352
resp.Revision = rangeResp.Header.Revision
5453
if len(rangeResp.Kvs) == 1 {
@@ -58,17 +57,14 @@ func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetR
5857
}
5958

6059
func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) {
61-
rangeStart := prefix + opts.Continue
60+
rangeStart := prefix
61+
if opts.Continue != "" {
62+
rangeStart = opts.Continue
63+
}
6264
rangeEnd := clientv3.GetPrefixRangeEnd(prefix)
63-
64-
rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
65-
Key: []byte(rangeStart),
66-
RangeEnd: []byte(rangeEnd),
67-
Limit: opts.Limit,
68-
Revision: opts.Revision,
69-
})
65+
rangeResp, err := k.KV.Get(ctx, rangeStart, clientv3.WithRange(rangeEnd), clientv3.WithLimit(opts.Limit), clientv3.WithRev(opts.Revision))
7066
if err != nil {
71-
return resp, clientv3.ContextError(ctx, err)
67+
return resp, err
7268
}
7369
resp.Kvs = rangeResp.Kvs
7470
resp.Count = rangeResp.Count
@@ -77,48 +73,51 @@ func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp
7773
}
7874

7975
func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) {
80-
resp, err := k.kv.Range(ctx, &pb.RangeRequest{
81-
Key: []byte(prefix),
82-
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)),
83-
CountOnly: true,
84-
})
76+
resp, err := k.KV.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly())
8577
if err != nil {
86-
return 0, clientv3.ContextError(ctx, err)
78+
return 0, err
8779
}
8880
return resp.Count, nil
8981
}
9082

9183
func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) {
92-
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}}
84+
txn := k.KV.Txn(ctx).If(
85+
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
86+
).Then(
87+
clientv3.OpPut(key, string(value), clientv3.WithLease(opts.LeaseID)),
88+
)
9389

94-
var onFailure *pb.RequestOp
9590
if opts.GetOnFailure {
96-
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
91+
txn = txn.Else(clientv3.OpGet(key))
9792
}
9893

99-
txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
94+
txnResp, err := txn.Commit()
10095
if err != nil {
101-
return resp, clientv3.ContextError(ctx, err)
96+
return resp, err
10297
}
10398
resp.Succeeded = txnResp.Succeeded
10499
resp.Revision = txnResp.Header.Revision
105100
if opts.GetOnFailure && !txnResp.Succeeded {
101+
if len(txnResp.Responses) == 0 {
102+
return resp, fmt.Errorf("invalid OptimisticPut response: %v", txnResp.Responses)
103+
}
106104
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
107105
}
108106
return resp, nil
109107
}
110108

111109
func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) {
112-
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}
113-
114-
var onFailure *pb.RequestOp
110+
txn := k.KV.Txn(ctx).If(
111+
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
112+
).Then(
113+
clientv3.OpDelete(key),
114+
)
115115
if opts.GetOnFailure {
116-
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
116+
txn = txn.Else(clientv3.OpGet(key))
117117
}
118-
119-
txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
118+
txnResp, err := txn.Commit()
120119
if err != nil {
121-
return resp, clientv3.ContextError(ctx, err)
120+
return resp, err
122121
}
123122
resp.Succeeded = txnResp.Succeeded
124123
resp.Revision = txnResp.Header.Revision
@@ -128,34 +127,6 @@ func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevisi
128127
return resp, nil
129128
}
130129

131-
func (k Client) optimisticTxn(ctx context.Context, key string, expectedRevision int64, onSuccess, onFailure *pb.RequestOp) (*pb.TxnResponse, error) {
132-
txn := &pb.TxnRequest{
133-
Compare: []*pb.Compare{
134-
{
135-
Result: pb.Compare_EQUAL,
136-
Target: pb.Compare_MOD,
137-
Key: []byte(key),
138-
TargetUnion: &pb.Compare_ModRevision{ModRevision: expectedRevision},
139-
},
140-
},
141-
}
142-
if onSuccess != nil {
143-
txn.Success = []*pb.RequestOp{onSuccess}
144-
}
145-
if onFailure != nil {
146-
txn.Failure = []*pb.RequestOp{onFailure}
147-
}
148-
return k.kv.Txn(ctx, txn)
149-
}
150-
151-
func getRequest(key string, revision int64) *pb.RangeRequest {
152-
return &pb.RangeRequest{
153-
Key: []byte(key),
154-
Revision: revision,
155-
Limit: 1,
156-
}
157-
}
158-
159130
func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
160131
getResponse := resp.GetResponseRange()
161132
if len(getResponse.Kvs) == 1 {

0 commit comments

Comments
 (0)