Skip to content

Commit 42e12f1

Browse files
committed
Merge pull request kubernetes#12340 from wojtek-t/rewrite_service_etcd
Refactor "service" registry to use standard REST storage (and generic etcd)
2 parents 64f82ab + 79125f4 commit 42e12f1

File tree

12 files changed

+217
-355
lines changed

12 files changed

+217
-355
lines changed

contrib/mesos/pkg/scheduler/podtask/leaky.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ package podtask
2020

2121
import (
2222
"k8s.io/kubernetes/pkg/api"
23-
"k8s.io/kubernetes/pkg/registry/etcd"
23+
"k8s.io/kubernetes/pkg/registry/generic/etcd"
2424
)
2525

2626
// makePodKey constructs etcd paths to pod items enforcing namespace rules.
2727
func MakePodKey(ctx api.Context, id string) (string, error) {
28-
return etcd.MakeEtcdItemKey(ctx, PodPath, id)
28+
return etcd.NamespaceKeyFunc(ctx, PodPath, id)
2929
}

pkg/master/master.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ import (
5353
controlleretcd "k8s.io/kubernetes/pkg/registry/controller/etcd"
5454
"k8s.io/kubernetes/pkg/registry/endpoint"
5555
endpointsetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd"
56-
"k8s.io/kubernetes/pkg/registry/etcd"
5756
"k8s.io/kubernetes/pkg/registry/event"
5857
expcontrolleretcd "k8s.io/kubernetes/pkg/registry/experimental/controller/etcd"
5958
"k8s.io/kubernetes/pkg/registry/limitrange"
@@ -69,6 +68,7 @@ import (
6968
secretetcd "k8s.io/kubernetes/pkg/registry/secret/etcd"
7069
"k8s.io/kubernetes/pkg/registry/service"
7170
etcdallocator "k8s.io/kubernetes/pkg/registry/service/allocator/etcd"
71+
serviceetcd "k8s.io/kubernetes/pkg/registry/service/etcd"
7272
ipallocator "k8s.io/kubernetes/pkg/registry/service/ipallocator"
7373
serviceaccountetcd "k8s.io/kubernetes/pkg/registry/serviceaccount/etcd"
7474
"k8s.io/kubernetes/pkg/storage"
@@ -450,9 +450,8 @@ func (m *Master) init(c *Config) {
450450
nodeStorage, nodeStatusStorage := nodeetcd.NewStorage(c.DatabaseStorage, c.KubeletClient)
451451
m.nodeRegistry = minion.NewRegistry(nodeStorage)
452452

453-
// TODO: split me up into distinct storage registries
454-
registry := etcd.NewRegistry(c.DatabaseStorage, m.endpointRegistry)
455-
m.serviceRegistry = registry
453+
serviceStorage := serviceetcd.NewStorage(c.DatabaseStorage)
454+
m.serviceRegistry = service.NewRegistry(serviceStorage)
456455

457456
var serviceClusterIPRegistry service.RangeRegistry
458457
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {

pkg/registry/etcd/doc.go

Lines changed: 0 additions & 19 deletions
This file was deleted.

pkg/registry/etcd/etcd.go

Lines changed: 0 additions & 177 deletions
This file was deleted.

pkg/registry/registrytest/endpoint.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,20 @@ func (e *EndpointRegistry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpo
9393
}
9494

9595
func (e *EndpointRegistry) DeleteEndpoints(ctx api.Context, name string) error {
96-
return fmt.Errorf("unimplemented!")
96+
// TODO: support namespaces in this mock
97+
e.lock.Lock()
98+
defer e.lock.Unlock()
99+
if e.Err != nil {
100+
return e.Err
101+
}
102+
if e.Endpoints != nil {
103+
var newList []api.Endpoints
104+
for _, endpoint := range e.Endpoints.Items {
105+
if endpoint.Name != name {
106+
newList = append(newList, endpoint)
107+
}
108+
}
109+
e.Endpoints.Items = newList
110+
}
111+
return nil
97112
}

pkg/registry/registrytest/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (r *ServiceRegistry) SetError(err error) {
4646
r.Err = err
4747
}
4848

49-
func (r *ServiceRegistry) ListServices(ctx api.Context) (*api.ServiceList, error) {
49+
func (r *ServiceRegistry) ListServices(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ServiceList, error) {
5050
r.mu.Lock()
5151
defer r.mu.Unlock()
5252

pkg/registry/service/etcd/etcd.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
Copyright 2015 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package etcd
18+
19+
import (
20+
"fmt"
21+
22+
"k8s.io/kubernetes/pkg/api"
23+
"k8s.io/kubernetes/pkg/api/rest"
24+
"k8s.io/kubernetes/pkg/fields"
25+
"k8s.io/kubernetes/pkg/labels"
26+
"k8s.io/kubernetes/pkg/registry/generic"
27+
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
28+
"k8s.io/kubernetes/pkg/runtime"
29+
"k8s.io/kubernetes/pkg/storage"
30+
)
31+
32+
type REST struct {
33+
etcdgeneric.Etcd
34+
}
35+
36+
func NewStorage(s storage.Interface) *REST {
37+
prefix := "/services/specs"
38+
store := etcdgeneric.Etcd{
39+
NewFunc: func() runtime.Object { return &api.Service{} },
40+
NewListFunc: func() runtime.Object { return &api.ServiceList{} },
41+
KeyRootFunc: func(ctx api.Context) string {
42+
return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix)
43+
},
44+
KeyFunc: func(ctx api.Context, name string) (string, error) {
45+
return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name)
46+
},
47+
ObjectNameFunc: func(obj runtime.Object) (string, error) {
48+
return obj.(*api.Service).Name, nil
49+
},
50+
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
51+
return MatchServices(label, field)
52+
},
53+
EndpointName: "services",
54+
55+
CreateStrategy: rest.Services,
56+
UpdateStrategy: rest.Services,
57+
58+
Storage: s,
59+
}
60+
return &REST{store}
61+
}
62+
63+
func MatchServices(label labels.Selector, field fields.Selector) generic.Matcher {
64+
return &generic.SelectionPredicate{label, field, ServiceAttributes}
65+
}
66+
67+
func ServiceAttributes(obj runtime.Object) (objLabels labels.Set, objFields fields.Set, err error) {
68+
service, ok := obj.(*api.Service)
69+
if !ok {
70+
return nil, nil, fmt.Errorf("invalid object type %#v", obj)
71+
}
72+
return service.Labels, fields.Set{
73+
"metadata.name": service.Name,
74+
}, nil
75+
}

pkg/registry/service/ipallocator/controller/repair.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"time"
2323

2424
"k8s.io/kubernetes/pkg/api"
25+
"k8s.io/kubernetes/pkg/fields"
26+
"k8s.io/kubernetes/pkg/labels"
2527
"k8s.io/kubernetes/pkg/registry/service"
2628
"k8s.io/kubernetes/pkg/registry/service/ipallocator"
2729
"k8s.io/kubernetes/pkg/util"
@@ -94,7 +96,7 @@ func (c *Repair) RunOnce() error {
9496
}
9597

9698
ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
97-
list, err := c.registry.ListServices(ctx)
99+
list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything())
98100
if err != nil {
99101
return fmt.Errorf("unable to refresh the service IP block: %v", err)
100102
}

pkg/registry/service/portallocator/controller/repair.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"time"
2222

2323
"k8s.io/kubernetes/pkg/api"
24+
"k8s.io/kubernetes/pkg/fields"
25+
"k8s.io/kubernetes/pkg/labels"
2426
"k8s.io/kubernetes/pkg/registry/service"
2527
"k8s.io/kubernetes/pkg/registry/service/portallocator"
2628
"k8s.io/kubernetes/pkg/util"
@@ -79,7 +81,7 @@ func (c *Repair) RunOnce() error {
7981
}
8082

8183
ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
82-
list, err := c.registry.ListServices(ctx)
84+
list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything())
8385
if err != nil {
8486
return fmt.Errorf("unable to refresh the port block: %v", err)
8587
}

0 commit comments

Comments
 (0)