Skip to content

Commit c5e221d

Browse files
committed
Merge pull request kubernetes#12440 from BenTheElder/proxy_config_handler_refactor
Refactor `pkg/proxy/config`'s ServiceConfigHandler and EndpointsConfigHandler.
2 parents 72db123 + 6bbf2aa commit c5e221d

File tree

7 files changed

+62
-62
lines changed

7 files changed

+62
-62
lines changed

pkg/proxy/config/config.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,17 @@ type EndpointsUpdate struct {
5757

5858
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
5959
type ServiceConfigHandler interface {
60-
// OnUpdate gets called when a configuration has been changed by one of the sources.
60+
// OnServiceUpdate gets called when a configuration has been changed by one of the sources.
6161
// This is the union of all the configuration sources.
62-
OnUpdate(services []api.Service)
62+
OnServiceUpdate(services []api.Service)
6363
}
6464

6565
// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
6666
type EndpointsConfigHandler interface {
67-
// OnUpdate gets called when endpoints configuration is changed for a given
67+
// OnEndpointsUpdate gets called when endpoints configuration is changed for a given
6868
// service on any of the configuration sources. An example is when a new
6969
// service comes up, or when containers come up or down for an existing service.
70-
OnUpdate(endpoints []api.Endpoints)
70+
OnEndpointsUpdate(endpoints []api.Endpoints)
7171
}
7272

7373
// EndpointsConfig tracks a set of endpoints configurations.
@@ -91,7 +91,7 @@ func NewEndpointsConfig() *EndpointsConfig {
9191

9292
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
9393
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
94-
handler.OnUpdate(instance.([]api.Endpoints))
94+
handler.OnEndpointsUpdate(instance.([]api.Endpoints))
9595
}))
9696
}
9797

@@ -189,7 +189,7 @@ func NewServiceConfig() *ServiceConfig {
189189

190190
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
191191
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
192-
handler.OnUpdate(instance.([]api.Service))
192+
handler.OnServiceUpdate(instance.([]api.Service))
193193
}))
194194
}
195195

pkg/proxy/config/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func NewServiceHandlerMock() *ServiceHandlerMock {
5757
return &ServiceHandlerMock{services: make([]api.Service, 0)}
5858
}
5959

60-
func (h *ServiceHandlerMock) OnUpdate(services []api.Service) {
60+
func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) {
6161
sort.Sort(sortedServices(services))
6262
h.services = services
6363
h.updated.Done()
@@ -95,7 +95,7 @@ func NewEndpointsHandlerMock() *EndpointsHandlerMock {
9595
return &EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)}
9696
}
9797

98-
func (h *EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) {
98+
func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []api.Endpoints) {
9999
sort.Sort(sortedEndpoints(endpoints))
100100
h.endpoints = endpoints
101101
h.updated.Done()

pkg/proxy/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ import (
2525

2626
// ProxyProvider is the interface provided by proxier implementations.
2727
type ProxyProvider interface {
28-
// OnUpdate manages the active set of service proxies.
28+
// OnServiceUpdate manages the active set of service proxies.
2929
// Active service proxies are reinitialized if found in the update set or
3030
// removed if missing from the update set.
31-
OnUpdate(services []api.Service)
31+
OnServiceUpdate(services []api.Service)
3232
// SyncLoop runs periodic work.
3333
// This is expected to run as a goroutine or as the main loop of the app.
3434
// It does not return.

pkg/proxy/userspace/proxier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ const udpIdleTimeout = 1 * time.Second
265265
// OnUpdate manages the active set of service proxies.
266266
// Active service proxies are reinitialized if found in the update set or
267267
// shutdown if missing from the update set.
268-
func (proxier *Proxier) OnUpdate(services []api.Service) {
268+
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
269269
glog.V(4).Infof("Received update notice: %+v", services)
270270
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
271271
for i := range services {

pkg/proxy/userspace/proxier_test.go

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
213213
func TestTCPProxy(t *testing.T) {
214214
lb := NewLoadBalancerRR()
215215
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
216-
lb.OnUpdate([]api.Endpoints{
216+
lb.OnEndpointsUpdate([]api.Endpoints{
217217
{
218218
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
219219
Subsets: []api.EndpointSubset{{
@@ -240,7 +240,7 @@ func TestTCPProxy(t *testing.T) {
240240
func TestUDPProxy(t *testing.T) {
241241
lb := NewLoadBalancerRR()
242242
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
243-
lb.OnUpdate([]api.Endpoints{
243+
lb.OnEndpointsUpdate([]api.Endpoints{
244244
{
245245
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
246246
Subsets: []api.EndpointSubset{{
@@ -268,7 +268,7 @@ func TestMultiPortProxy(t *testing.T) {
268268
lb := NewLoadBalancerRR()
269269
serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-p"}, "p"}
270270
serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-q"}, "q"}
271-
lb.OnUpdate([]api.Endpoints{{
271+
lb.OnEndpointsUpdate([]api.Endpoints{{
272272
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
273273
Subsets: []api.EndpointSubset{{
274274
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
@@ -303,7 +303,7 @@ func TestMultiPortProxy(t *testing.T) {
303303
waitForNumProxyLoops(t, p, 2)
304304
}
305305

306-
func TestMultiPortOnUpdate(t *testing.T) {
306+
func TestMultiPortOnServiceUpdate(t *testing.T) {
307307
lb := NewLoadBalancerRR()
308308
serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
309309
serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"}
@@ -315,7 +315,7 @@ func TestMultiPortOnUpdate(t *testing.T) {
315315
}
316316
waitForNumProxyLoops(t, p, 0)
317317

318-
p.OnUpdate([]api.Service{{
318+
p.OnServiceUpdate([]api.Service{{
319319
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
320320
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
321321
Name: "p",
@@ -362,7 +362,7 @@ func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error {
362362
func TestTCPProxyStop(t *testing.T) {
363363
lb := NewLoadBalancerRR()
364364
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
365-
lb.OnUpdate([]api.Endpoints{
365+
lb.OnEndpointsUpdate([]api.Endpoints{
366366
{
367367
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
368368
Subsets: []api.EndpointSubset{{
@@ -400,7 +400,7 @@ func TestTCPProxyStop(t *testing.T) {
400400
func TestUDPProxyStop(t *testing.T) {
401401
lb := NewLoadBalancerRR()
402402
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
403-
lb.OnUpdate([]api.Endpoints{
403+
lb.OnEndpointsUpdate([]api.Endpoints{
404404
{
405405
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
406406
Subsets: []api.EndpointSubset{{
@@ -438,7 +438,7 @@ func TestUDPProxyStop(t *testing.T) {
438438
func TestTCPProxyUpdateDelete(t *testing.T) {
439439
lb := NewLoadBalancerRR()
440440
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
441-
lb.OnUpdate([]api.Endpoints{
441+
lb.OnEndpointsUpdate([]api.Endpoints{
442442
{
443443
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
444444
Subsets: []api.EndpointSubset{{
@@ -465,7 +465,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
465465
conn.Close()
466466
waitForNumProxyLoops(t, p, 1)
467467

468-
p.OnUpdate([]api.Service{})
468+
p.OnServiceUpdate([]api.Service{})
469469
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
470470
t.Fatalf(err.Error())
471471
}
@@ -475,7 +475,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
475475
func TestUDPProxyUpdateDelete(t *testing.T) {
476476
lb := NewLoadBalancerRR()
477477
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
478-
lb.OnUpdate([]api.Endpoints{
478+
lb.OnEndpointsUpdate([]api.Endpoints{
479479
{
480480
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
481481
Subsets: []api.EndpointSubset{{
@@ -502,7 +502,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
502502
conn.Close()
503503
waitForNumProxyLoops(t, p, 1)
504504

505-
p.OnUpdate([]api.Service{})
505+
p.OnServiceUpdate([]api.Service{})
506506
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
507507
t.Fatalf(err.Error())
508508
}
@@ -512,7 +512,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
512512
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
513513
lb := NewLoadBalancerRR()
514514
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
515-
lb.OnUpdate([]api.Endpoints{
515+
lb.OnEndpointsUpdate([]api.Endpoints{
516516
{
517517
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
518518
Subsets: []api.EndpointSubset{{
@@ -539,13 +539,13 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
539539
conn.Close()
540540
waitForNumProxyLoops(t, p, 1)
541541

542-
p.OnUpdate([]api.Service{})
542+
p.OnServiceUpdate([]api.Service{})
543543
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
544544
t.Fatalf(err.Error())
545545
}
546546
waitForNumProxyLoops(t, p, 0)
547547

548-
p.OnUpdate([]api.Service{{
548+
p.OnServiceUpdate([]api.Service{{
549549
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
550550
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
551551
Name: "p",
@@ -564,7 +564,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
564564
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
565565
lb := NewLoadBalancerRR()
566566
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
567-
lb.OnUpdate([]api.Endpoints{
567+
lb.OnEndpointsUpdate([]api.Endpoints{
568568
{
569569
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
570570
Subsets: []api.EndpointSubset{{
@@ -591,13 +591,13 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
591591
conn.Close()
592592
waitForNumProxyLoops(t, p, 1)
593593

594-
p.OnUpdate([]api.Service{})
594+
p.OnServiceUpdate([]api.Service{})
595595
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
596596
t.Fatalf(err.Error())
597597
}
598598
waitForNumProxyLoops(t, p, 0)
599599

600-
p.OnUpdate([]api.Service{{
600+
p.OnServiceUpdate([]api.Service{{
601601
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
602602
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
603603
Name: "p",
@@ -616,7 +616,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
616616
func TestTCPProxyUpdatePort(t *testing.T) {
617617
lb := NewLoadBalancerRR()
618618
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
619-
lb.OnUpdate([]api.Endpoints{
619+
lb.OnEndpointsUpdate([]api.Endpoints{
620620
{
621621
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
622622
Subsets: []api.EndpointSubset{{
@@ -639,7 +639,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
639639
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
640640
waitForNumProxyLoops(t, p, 1)
641641

642-
p.OnUpdate([]api.Service{{
642+
p.OnServiceUpdate([]api.Service{{
643643
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
644644
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
645645
Name: "p",
@@ -664,7 +664,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
664664
func TestUDPProxyUpdatePort(t *testing.T) {
665665
lb := NewLoadBalancerRR()
666666
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
667-
lb.OnUpdate([]api.Endpoints{
667+
lb.OnEndpointsUpdate([]api.Endpoints{
668668
{
669669
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
670670
Subsets: []api.EndpointSubset{{
@@ -686,7 +686,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
686686
}
687687
waitForNumProxyLoops(t, p, 1)
688688

689-
p.OnUpdate([]api.Service{{
689+
p.OnServiceUpdate([]api.Service{{
690690
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
691691
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
692692
Name: "p",
@@ -709,7 +709,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
709709
func TestProxyUpdatePublicIPs(t *testing.T) {
710710
lb := NewLoadBalancerRR()
711711
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
712-
lb.OnUpdate([]api.Endpoints{
712+
lb.OnEndpointsUpdate([]api.Endpoints{
713713
{
714714
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
715715
Subsets: []api.EndpointSubset{{
@@ -732,7 +732,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
732732
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
733733
waitForNumProxyLoops(t, p, 1)
734734

735-
p.OnUpdate([]api.Service{{
735+
p.OnServiceUpdate([]api.Service{{
736736
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
737737
Spec: api.ServiceSpec{
738738
Ports: []api.ServicePort{{
@@ -761,7 +761,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
761761
func TestProxyUpdatePortal(t *testing.T) {
762762
lb := NewLoadBalancerRR()
763763
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
764-
lb.OnUpdate([]api.Endpoints{
764+
lb.OnEndpointsUpdate([]api.Endpoints{
765765
{
766766
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
767767
Subsets: []api.EndpointSubset{{
@@ -784,7 +784,7 @@ func TestProxyUpdatePortal(t *testing.T) {
784784
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
785785
waitForNumProxyLoops(t, p, 1)
786786

787-
p.OnUpdate([]api.Service{{
787+
p.OnServiceUpdate([]api.Service{{
788788
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
789789
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
790790
Name: "p",
@@ -797,7 +797,7 @@ func TestProxyUpdatePortal(t *testing.T) {
797797
t.Fatalf("service with empty ClusterIP should not be included in the proxy")
798798
}
799799

800-
p.OnUpdate([]api.Service{{
800+
p.OnServiceUpdate([]api.Service{{
801801
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
802802
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
803803
Name: "p",
@@ -810,7 +810,7 @@ func TestProxyUpdatePortal(t *testing.T) {
810810
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
811811
}
812812

813-
p.OnUpdate([]api.Service{{
813+
p.OnServiceUpdate([]api.Service{{
814814
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
815815
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
816816
Name: "p",

pkg/proxy/userspace/roundrobin.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,10 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
223223
}
224224
}
225225

226-
// OnUpdate manages the registered service endpoints.
226+
// OnEndpointsUpdate manages the registered service endpoints.
227227
// Registered endpoints are updated if found in the update set or
228228
// unregistered if missing from the update set.
229-
func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
229+
func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
230230
registeredEndpoints := make(map[proxy.ServicePortName]bool)
231231
lb.lock.Lock()
232232
defer lb.lock.Unlock()
@@ -262,7 +262,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
262262
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
263263
glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
264264
lb.updateAffinityMap(svcPort, newEndpoints)
265-
// OnUpdate can be called without NewService being called externally.
265+
// OnEndpointsUpdate can be called without NewService being called externally.
266266
// To be safe we will call it here. A new service will only be created
267267
// if one does not already exist. The affinity will be updated
268268
// later, once NewService is called.

0 commit comments

Comments
 (0)