From 321911493fc34e2856de09e43710fff8d3001940 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Thu, 15 Jun 2017 10:07:33 -0700 Subject: [PATCH 1/7] Correct SetMatrix documentation The SetMatrix is a generic data structure, so the description should not be tight to any specific use Signed-off-by: Flavio Crisciani (cherry picked from commit 898e32185394b4d2f765a2c675fdd4ce9c2daf61) --- common/setmatrix.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/common/setmatrix.go b/common/setmatrix.go index 0fdb542be4..e0e6cea9bf 100644 --- a/common/setmatrix.go +++ b/common/setmatrix.go @@ -10,23 +10,23 @@ import ( type SetMatrix interface { // Get returns the members of the set for a specific key as a slice. Get(key string) ([]interface{}, bool) - // Contains is used to verify is an element is in a set for a specific key + // Contains is used to verify if an element is in a set for a specific key // returns true if the element is in the set // returns true if there is a set for the key Contains(key string, value interface{}) (bool, bool) - // Insert inserts the mapping between the IP and the endpoint identifier - // returns true if the mapping was not present, false otherwise - // returns also the number of endpoints associated to the IP + // Insert inserts the value in the set of a key + // returns true if the value is inserted (was not already in the set), false otherwise + // returns also the length of the set for the key Insert(key string, value interface{}) (bool, int) - // Remove removes the mapping between the IP and the endpoint identifier - // returns true if the mapping was deleted, false otherwise - // returns also the number of endpoints associated to the IP + // Remove removes the value in the set for a specific key + // returns true if the value is deleted, false otherwise + // returns also the length of the set for the key Remove(key string, value interface{}) (bool, int) - // Cardinality returns the number of elements in the set of a specfic key - // returns false if the key is not in the map + // Cardinality returns the number of elements in the set for a key + // returns false if the set is not present Cardinality(key string) (int, bool) // String returns the string version of the set, empty otherwise - // returns false if the key is not in the map + // returns false if the set is not present String(key string) (string, bool) } From 57bef585d60899779c57a7fbc5b6fc30c8138956 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Thu, 15 Jun 2017 16:16:13 -0700 Subject: [PATCH 2/7] Service Discovery reuse name and serviceBindings deletion - Added logic to handle name reuse from different services - Moved the deletion from the serviceBindings map at the end of the rmServiceBindings body to avoid race with new services Signed-off-by: Flavio Crisciani (cherry picked from commit 5e657f6a719c52303fd2672f21e89379408a75e9) --- common/setmatrix.go | 12 +++ libnetwork_internal_test.go | 118 ++++++++++++++++++++++++- network.go | 169 +++++++++++++++++++++--------------- service_common.go | 62 ++++++------- 4 files changed, 261 insertions(+), 100 deletions(-) diff --git a/common/setmatrix.go b/common/setmatrix.go index e0e6cea9bf..72be5bbbfc 100644 --- a/common/setmatrix.go +++ b/common/setmatrix.go @@ -28,6 +28,8 @@ type SetMatrix interface { // String returns the string version of the set, empty otherwise // returns false if the set is not present String(key string) (string, bool) + // Returns all the keys in the map + Keys() []string } type setMatrix struct { @@ -121,3 +123,13 @@ func (s *setMatrix) String(key string) (string, bool) { } return set.String(), ok } + +func (s *setMatrix) Keys() []string { + s.Lock() + defer s.Unlock() + keys := make([]string, 0, len(s.matrix)) + for k := range s.matrix { + keys = append(keys, k) + } + return keys +} diff --git a/libnetwork_internal_test.go b/libnetwork_internal_test.go index f0d803aa75..c5b8ae2ae7 100644 --- a/libnetwork_internal_test.go +++ b/libnetwork_internal_test.go @@ -13,6 +13,7 @@ import ( "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/ipamapi" "github.com/docker/libnetwork/netlabel" + "github.com/docker/libnetwork/netutils" "github.com/docker/libnetwork/testutils" "github.com/docker/libnetwork/types" ) @@ -379,8 +380,8 @@ func TestSRVServiceQuery(t *testing.T) { } sr := svcInfo{ - svcMap: make(map[string][]net.IP), - svcIPv6Map: make(map[string][]net.IP), + svcMap: common.NewSetMatrix(), + svcIPv6Map: common.NewSetMatrix(), ipMap: common.NewSetMatrix(), service: make(map[string][]servicePorts), } @@ -437,6 +438,119 @@ func TestSRVServiceQuery(t *testing.T) { } } +func TestServiceVIPReuse(t *testing.T) { + c, err := New() + if err != nil { + t.Fatal(err) + } + defer c.Stop() + + n, err := c.NewNetwork("bridge", "net1", "", nil) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := n.Delete(); err != nil { + t.Fatal(err) + } + }() + + ep, err := n.CreateEndpoint("testep") + if err != nil { + t.Fatal(err) + } + + sb, err := c.NewSandbox("c1") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := sb.Delete(); err != nil { + t.Fatal(err) + } + }() + + err = ep.Join(sb) + if err != nil { + t.Fatal(err) + } + + // Add 2 services with same name but different service ID to share the same VIP + n.(*network).addSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + n.(*network).addSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + + ipToResolve := netutils.ReverseIP("192.168.0.1") + + ipList, _ := n.(*network).ResolveName("service_test", types.IPv4) + if len(ipList) == 0 { + t.Fatal("There must be the VIP") + } + if len(ipList) != 1 { + t.Fatal("It must return only 1 VIP") + } + if ipList[0].String() != "192.168.0.1" { + t.Fatal("The service VIP is 192.168.0.1") + } + name := n.(*network).ResolveIP(ipToResolve) + if name == "" { + t.Fatal("It must return a name") + } + if name != "service_test.net1" { + t.Fatalf("It must return the service_test.net1 != %s", name) + } + + // Delete service record for one of the services, the IP should remain because one service is still associated with it + n.(*network).deleteSvcRecords("ep1", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + ipList, _ = n.(*network).ResolveName("service_test", types.IPv4) + if len(ipList) == 0 { + t.Fatal("There must be the VIP") + } + if len(ipList) != 1 { + t.Fatal("It must return only 1 VIP") + } + if ipList[0].String() != "192.168.0.1" { + t.Fatal("The service VIP is 192.168.0.1") + } + name = n.(*network).ResolveIP(ipToResolve) + if name == "" { + t.Fatal("It must return a name") + } + if name != "service_test.net1" { + t.Fatalf("It must return the service_test.net1 != %s", name) + } + + // Delete again the service using the previous service ID, nothing should happen + n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID1", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + ipList, _ = n.(*network).ResolveName("service_test", types.IPv4) + if len(ipList) == 0 { + t.Fatal("There must be the VIP") + } + if len(ipList) != 1 { + t.Fatal("It must return only 1 VIP") + } + if ipList[0].String() != "192.168.0.1" { + t.Fatal("The service VIP is 192.168.0.1") + } + name = n.(*network).ResolveIP(ipToResolve) + if name == "" { + t.Fatal("It must return a name") + } + if name != "service_test.net1" { + t.Fatalf("It must return the service_test.net1 != %s", name) + } + + // Delete now using the second service ID, now all the entries should be gone + n.(*network).deleteSvcRecords("ep2", "service_test", "serviceID2", net.ParseIP("192.168.0.1"), net.IP{}, true, "test") + ipList, _ = n.(*network).ResolveName("service_test", types.IPv4) + if len(ipList) != 0 { + t.Fatal("All the VIPs should be gone now") + } + name = n.(*network).ResolveIP(ipToResolve) + if name != "" { + t.Fatalf("It must return empty no more services associated, instead:%s", name) + } +} + func TestIpamReleaseOnNetDriverFailures(t *testing.T) { if !testutils.IsRunningInContainer() { defer testutils.SetupTestOSContext(t)() diff --git a/network.go b/network.go index 2fddb59b54..5854a16820 100644 --- a/network.go +++ b/network.go @@ -81,9 +81,23 @@ type NetworkInfo interface { // When the function returns true, the walk will stop. type EndpointWalker func(ep Endpoint) bool +// ipInfo is the reverse mapping from IP to service name to serve the PTR query. +// Its an indication to defer PTR queries also to that external server. +type ipInfo struct { + name string + serviceID string +} + +// svcMapEntry is the body of the element into the svcMap +// The ip is a string because the SetMatrix does not accept non hashable values +type svcMapEntry struct { + ip string + serviceID string +} + type svcInfo struct { - svcMap map[string][]net.IP - svcIPv6Map map[string][]net.IP + svcMap common.SetMatrix + svcIPv6Map common.SetMatrix ipMap common.SetMatrix service map[string][]servicePorts } @@ -1047,73 +1061,76 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool ipv6 = iface.AddressIPv6().IP } + serviceID := ep.svcID if isAdd { // If anonymous endpoint has an alias use the first alias // for ip->name mapping. Not having the reverse mapping // breaks some apps if ep.isAnonymous() { + serviceID = ep.ID() if len(myAliases) > 0 { - n.addSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") + n.addSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } } else { - n.addSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") + n.addSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } for _, alias := range myAliases { - n.addSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") + n.addSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord") } } else { if ep.isAnonymous() { + serviceID = ep.ID() if len(myAliases) > 0 { - n.deleteSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord") + n.deleteSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } } else { - n.deleteSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord") + n.deleteSvcRecords(ep.ID(), epName, serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } for _, alias := range myAliases { - n.deleteSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord") + n.deleteSvcRecords(ep.ID(), alias, serviceID, iface.Address().IP, ipv6, false, "updateSvcRecord") } } } } -func addIPToName(ipMap common.SetMatrix, name string, ip net.IP) { +func addIPToName(ipMap common.SetMatrix, name, serviceID string, ip net.IP) { reverseIP := netutils.ReverseIP(ip.String()) - ipMap.Insert(reverseIP, name) + ipMap.Insert(reverseIP, ipInfo{ + name: name, + serviceID: serviceID, + }) } -func addNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) { - ipList := svcMap[name] - for _, ip := range ipList { - if ip.Equal(epIP) { - return - } - } - svcMap[name] = append(svcMap[name], epIP) +func delIPToName(ipMap common.SetMatrix, name, serviceID string, ip net.IP) { + reverseIP := netutils.ReverseIP(ip.String()) + ipMap.Remove(reverseIP, ipInfo{ + name: name, + serviceID: serviceID, + }) } -func delNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) { - ipList := svcMap[name] - for i, ip := range ipList { - if ip.Equal(epIP) { - ipList = append(ipList[:i], ipList[i+1:]...) - break - } - } - svcMap[name] = ipList +func addNameToIP(svcMap common.SetMatrix, name, serviceID string, epIP net.IP) { + svcMap.Insert(name, svcMapEntry{ + ip: epIP.String(), + serviceID: serviceID, + }) +} - if len(ipList) == 0 { - delete(svcMap, name) - } +func delNameToIP(svcMap common.SetMatrix, name, serviceID string, epIP net.IP) { + svcMap.Remove(name, svcMapEntry{ + ip: epIP.String(), + serviceID: serviceID, + }) } -func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { +func (n *network) addSvcRecords(eID, name, serviceID string, epIP, epIPv6 net.IP, ipMapUpdate bool, method string) { // Do not add service names for ingress network as this is a // routing only network if n.ingress { return } - logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method) + logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s sid:%s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID) c := n.getController() c.Lock() @@ -1122,34 +1139,34 @@ func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ip sr, ok := c.svcRecords[n.ID()] if !ok { sr = svcInfo{ - svcMap: make(map[string][]net.IP), - svcIPv6Map: make(map[string][]net.IP), + svcMap: common.NewSetMatrix(), + svcIPv6Map: common.NewSetMatrix(), ipMap: common.NewSetMatrix(), } c.svcRecords[n.ID()] = sr } if ipMapUpdate { - addIPToName(sr.ipMap, name, epIP) + addIPToName(sr.ipMap, name, serviceID, epIP) if epIPv6 != nil { - addIPToName(sr.ipMap, name, epIPv6) + addIPToName(sr.ipMap, name, serviceID, epIPv6) } } - addNameToIP(sr.svcMap, name, epIP) + addNameToIP(sr.svcMap, name, serviceID, epIP) if epIPv6 != nil { - addNameToIP(sr.svcIPv6Map, name, epIPv6) + addNameToIP(sr.svcIPv6Map, name, serviceID, epIPv6) } } -func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { +func (n *network) deleteSvcRecords(eID, name, serviceID string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) { // Do not delete service names from ingress network as this is a // routing only network if n.ingress { return } - logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method) + logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s sid:%s ", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method, serviceID) c := n.getController() c.Lock() @@ -1161,17 +1178,17 @@ func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, } if ipMapUpdate { - sr.ipMap.Remove(netutils.ReverseIP(epIP.String()), name) + delIPToName(sr.ipMap, name, serviceID, epIP) if epIPv6 != nil { - sr.ipMap.Remove(netutils.ReverseIP(epIPv6.String()), name) + delIPToName(sr.ipMap, name, serviceID, epIPv6) } } - delNameToIP(sr.svcMap, name, epIP) + delNameToIP(sr.svcMap, name, serviceID, epIP) if epIPv6 != nil { - delNameToIP(sr.svcIPv6Map, name, epIPv6) + delNameToIP(sr.svcIPv6Map, name, serviceID, epIPv6) } } @@ -1189,19 +1206,31 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record { n.ctrlr.Lock() defer n.ctrlr.Unlock() - sr, _ := n.ctrlr.svcRecords[n.id] + sr, ok := n.ctrlr.svcRecords[n.id] + if !ok || sr.svcMap == nil { + return nil + } - for h, ip := range sr.svcMap { - if strings.Split(h, ".")[0] == epName { + svcMapKeys := sr.svcMap.Keys() + // Loop on service names on this network + for _, k := range svcMapKeys { + if strings.Split(k, ".")[0] == epName { + continue + } + // Get all the IPs associated to this service + mapEntryList, ok := sr.svcMap.Get(k) + if !ok { + // The key got deleted continue } - if len(ip) == 0 { - logrus.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", h, n.name, n.id) + if len(mapEntryList) == 0 { + logrus.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", k, n.name, n.id) continue } + recs = append(recs, etchosts.Record{ - Hosts: h, - IP: ip[0].String(), + Hosts: k, + IP: mapEntryList[0].(svcMapEntry).ip, }) } @@ -1622,17 +1651,15 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) { c := n.getController() c.Lock() + defer c.Unlock() sr, ok := c.svcRecords[n.ID()] - c.Unlock() if !ok { return nil, false } req = strings.TrimSuffix(req, ".") - var ip []net.IP - n.Lock() - ip, ok = sr.svcMap[req] + ipSet, ok := sr.svcMap.Get(req) if ipType == types.IPv6 { // If the name resolved to v4 address then its a valid name in @@ -1642,14 +1669,20 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) { if ok && n.enableIPv6 == false { ipv6Miss = true } - ip = sr.svcIPv6Map[req] + ipSet, ok = sr.svcIPv6Map.Get(req) } - n.Unlock() - if ip != nil { - ipLocal := make([]net.IP, len(ip)) - copy(ipLocal, ip) - return ipLocal, false + if ok && len(ipSet) > 0 { + // this maps is to avoid IP duplicates, this can happen during a transition period where 2 services are using the same IP + noDup := make(map[string]bool) + var ipLocal []net.IP + for _, ip := range ipSet { + if _, dup := noDup[ip.(svcMapEntry).ip]; !dup { + noDup[ip.(svcMapEntry).ip] = true + ipLocal = append(ipLocal, net.ParseIP(ip.(svcMapEntry).ip)) + } + } + return ipLocal, ok } return nil, ipv6Miss @@ -1658,8 +1691,8 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) { func (n *network) ResolveIP(ip string) string { c := n.getController() c.Lock() + defer c.Unlock() sr, ok := c.svcRecords[n.ID()] - c.Unlock() if !ok { return "" @@ -1667,23 +1700,23 @@ func (n *network) ResolveIP(ip string) string { nwName := n.Name() - nameList, ok := sr.ipMap.Get(ip) - if !ok || len(nameList) == 0 { + elemSet, ok := sr.ipMap.Get(ip) + if !ok || len(elemSet) == 0 { return "" } // NOTE it is possible to have more than one element in the Set, this will happen - // because of interleave of diffent events from differnt sources (local container create vs + // because of interleave of different events from different sources (local container create vs // network db notifications) // In such cases the resolution will be based on the first element of the set, and can vary // during the system stabilitation - name, ok := nameList[0].(string) + elem, ok := elemSet[0].(ipInfo) if !ok { setStr, b := sr.ipMap.String(ip) - logrus.Errorf("expected set of strings for key %s set:%t %s", ip, b, setStr) + logrus.Errorf("expected set of ipInfo type for key %s set:%t %s", ip, b, setStr) return "" } - return name + "." + nwName + return elem.name + "." + nwName } func (n *network) ResolveService(name string) ([]*net.SRV, []net.IP) { diff --git a/service_common.go b/service_common.go index 31a3fd9283..e33242a63d 100644 --- a/service_common.go +++ b/service_common.go @@ -21,23 +21,23 @@ func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, contain c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) // Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR. - n.(*network).addSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) + n.(*network).addSvcRecords(eID, "tasks."+svcName, svcID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).addSvcRecords(eID, "tasks."+alias, ip, nil, false, method) + n.(*network).addSvcRecords(eID, "tasks."+alias, svcID, ip, nil, false, method) } // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR if len(vip) == 0 { - n.(*network).addSvcRecords(eID, svcName, ip, nil, false, method) + n.(*network).addSvcRecords(eID, svcName, eID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).addSvcRecords(eID, alias, ip, nil, false, method) + n.(*network).addSvcRecords(eID, alias, eID, ip, nil, false, method) } } if addService && len(vip) != 0 { - n.(*network).addSvcRecords(eID, svcName, vip, nil, false, method) + n.(*network).addSvcRecords(eID, svcName, svcID, vip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).addSvcRecords(eID, alias, vip, nil, false, method) + n.(*network).addSvcRecords(eID, alias, svcID, vip, nil, false, method) } } @@ -52,11 +52,11 @@ func (c *controller) addContainerNameResolution(nID, eID, containerName string, logrus.Debugf("addContainerNameResolution %s %s", eID, containerName) // Add resolution for container name - n.(*network).addSvcRecords(eID, containerName, ip, nil, true, method) + n.(*network).addSvcRecords(eID, containerName, eID, ip, nil, true, method) // Add resolution for taskaliases for _, alias := range taskAliases { - n.(*network).addSvcRecords(eID, alias, ip, nil, true, method) + n.(*network).addSvcRecords(eID, alias, eID, ip, nil, true, method) } return nil @@ -75,25 +75,25 @@ func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, cont // Delete the special "tasks.svc_name" backend record. if !multipleEntries { - n.(*network).deleteSvcRecords(eID, "tasks."+svcName, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, "tasks."+svcName, svcID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).deleteSvcRecords(eID, "tasks."+alias, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, "tasks."+alias, svcID, ip, nil, false, method) } } // If we are doing DNS RR delete the endpoint IP from DNS record right away. if !multipleEntries && len(vip) == 0 { - n.(*network).deleteSvcRecords(eID, svcName, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, svcName, eID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).deleteSvcRecords(eID, alias, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, alias, eID, ip, nil, false, method) } } // Remove the DNS record for VIP only if we are removing the service if rmService && len(vip) != 0 && !multipleEntries { - n.(*network).deleteSvcRecords(eID, svcName, vip, nil, false, method) + n.(*network).deleteSvcRecords(eID, svcName, svcID, vip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).deleteSvcRecords(eID, alias, vip, nil, false, method) + n.(*network).deleteSvcRecords(eID, alias, svcID, vip, nil, false, method) } } @@ -108,11 +108,11 @@ func (c *controller) delContainerNameResolution(nID, eID, containerName string, logrus.Debugf("delContainerNameResolution %s %s", eID, containerName) // Delete resolution for container name - n.(*network).deleteSvcRecords(eID, containerName, ip, nil, true, method) + n.(*network).deleteSvcRecords(eID, containerName, eID, ip, nil, true, method) // Delete resolution for taskaliases for _, alias := range taskAliases { - n.(*network).deleteSvcRecords(eID, alias, ip, nil, true, method) + n.(*network).deleteSvcRecords(eID, alias, eID, ip, nil, true, method) } return nil @@ -208,8 +208,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s } s.Unlock() } - logrus.Debugf("addServiceBinding from %s START for %s %s", method, svcName, eID) - + logrus.Debugf("addServiceBinding from %s START for %s %s p:%p nid:%s skey:%v", method, svcName, eID, s, nID, skey) defer s.Unlock() lb, ok := s.loadBalancers[nID] @@ -274,7 +273,6 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st c.Lock() s, ok := c.serviceBindings[skey] c.Unlock() - logrus.Debugf("rmServiceBinding from %s START for %s %s", method, svcName, eID) if !ok { logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID) return nil @@ -282,6 +280,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st s.Lock() defer s.Unlock() + logrus.Debugf("rmServiceBinding from %s START for %s %s p:%p nid:%s sKey:%v", method, svcName, eID, s, nID, skey) lb, ok := s.loadBalancers[nID] if !ok { logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID) @@ -302,17 +301,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st rmService = true delete(s.loadBalancers, nID) - } - - if len(s.loadBalancers) == 0 { - // All loadbalancers for the service removed. Time to - // remove the service itself. - c.Lock() - - // Mark the object as deleted so that the add won't use it wrongly - s.deleted = true - delete(c.serviceBindings, skey) - c.Unlock() + logrus.Debugf("rmServiceBinding %s delete %s, p:%p in loadbalancers len:%d", eID, nID, lb, len(s.loadBalancers)) } ok, entries := s.removeIPToEndpoint(ip.String(), eID) @@ -330,6 +319,19 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st // Delete the name resolutions c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding") + if len(s.loadBalancers) == 0 { + // All loadbalancers for the service removed. Time to + // remove the service itself. + c.Lock() + + // Mark the object as deleted so that the add won't use it wrongly + s.deleted = true + // NOTE The delete from the serviceBindings map has to be the last operation else we are allowing a race between this service + // that is getting deleted and a new service that will be created if the entry is not anymore there + delete(c.serviceBindings, skey) + c.Unlock() + } + logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID) return nil } From 88004247d901740fe4d49c1637e58f57cc30108a Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Tue, 13 Jun 2017 15:47:31 -0700 Subject: [PATCH 3/7] Fix handleEPTable log There was an extra parameter not in the formatters Signed-off-by: Flavio Crisciani (cherry picked from commit b34bc70afb5b2ce4d979cf2a27b146777510e488) --- agent.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent.go b/agent.go index 5e88b5a649..a755513b8e 100644 --- a/agent.go +++ b/agent.go @@ -751,7 +751,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) { } if isAdd { - logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec) + logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec) if svcID != "" { // This is a remote task part of a service if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { @@ -765,7 +765,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) { } } } else { - logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec) + logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec) if svcID != "" { // This is a remote task part of a service if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { From fd7c2d5b960959a6b7a1ec87d82816af9f874bbe Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Fri, 16 Jun 2017 16:54:18 -0700 Subject: [PATCH 4/7] Avoid race on network cleanup Use the locker to avoid the race between the network deletion and new endpoints being created Signed-off-by: Flavio Crisciani (cherry picked from commit ee1326aa5a73c1907622b41b9570832739817d2c) --- network.go | 8 +++++++- service_common.go | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/network.go b/network.go index 5854a16820..3aa1027eb2 100644 --- a/network.go +++ b/network.go @@ -789,6 +789,9 @@ func (n *network) delete(force bool) error { id := n.id n.Unlock() + c.networkLocker.Lock(id) + defer c.networkLocker.Unlock(id) + n, err := c.getNetworkFromStore(id) if err != nil { return &UnknownNetworkError{name: name, id: id} @@ -903,6 +906,9 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi ep := &endpoint{name: name, generic: make(map[string]interface{}), iface: &endpointInterface{}} ep.id = stringid.GenerateRandomID() + n.ctrlr.networkLocker.Lock(n.id) + defer n.ctrlr.networkLocker.Unlock(n.id) + // Initialize ep.network with a possibly stale copy of n. We need this to get network from // store. But once we get it from store we will have the most uptodate copy possibly. ep.network = n @@ -1673,7 +1679,7 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) { } if ok && len(ipSet) > 0 { - // this maps is to avoid IP duplicates, this can happen during a transition period where 2 services are using the same IP + // this map is to avoid IP duplicates, this can happen during a transition period where 2 services are using the same IP noDup := make(map[string]bool) var ipLocal []net.IP for _, ip := range ipSet { diff --git a/service_common.go b/service_common.go index e33242a63d..c5c28ea220 100644 --- a/service_common.go +++ b/service_common.go @@ -15,7 +15,7 @@ func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, contain return err } - logrus.Debugf("addEndpointNameResolution %s %s add_service:%t", eID, svcName, addService) + logrus.Debugf("addEndpointNameResolution %s %s add_service:%t sAliases:%v tAliases:%v", eID, svcName, addService, serviceAliases, taskAliases) // Add container resolution mappings c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) @@ -68,7 +68,7 @@ func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, cont return err } - logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t", eID, svcName, rmService, multipleEntries) + logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t sAliases:%v tAliases:%v", eID, svcName, rmService, multipleEntries, serviceAliases, taskAliases) // Delete container resolution mappings c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) From 3bb65ff492e87867d0997538ff7ad7e996f62806 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Sat, 17 Jun 2017 12:31:48 -0700 Subject: [PATCH 5/7] CleanupServiceBindings to clean the SD records Allow the cleanupServicebindings to take care of the service discovery cleanup. Also avoid to trigger the cleanup for each endpoint from an SD point of view LB and SD will be separated in the future Signed-off-by: Flavio Crisciani (cherry picked from commit ccc21bc247c7b9a607c6bb1c92eebf4540443f77) --- agent.go | 4 ++-- network.go | 6 ------ service.go | 8 +------- service_common.go | 38 +++++++++++++++++++++++++------------- service_linux.go | 4 ++-- 5 files changed, 30 insertions(+), 30 deletions(-) diff --git a/agent.go b/agent.go index a755513b8e..6f072ae1be 100644 --- a/agent.go +++ b/agent.go @@ -562,7 +562,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err if n.ingress { ingressPorts = ep.ingressPorts } - if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Name(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil { + if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Name(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil { return err } } else { @@ -768,7 +768,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) { logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec) if svcID != "" { // This is a remote task part of a service - if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { + if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil { logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err) return } diff --git a/network.go b/network.go index 3aa1027eb2..d79ace3dfd 100644 --- a/network.go +++ b/network.go @@ -832,12 +832,6 @@ func (n *network) delete(force bool) error { c.cleanupServiceBindings(n.ID()) - // The network had been left, the service discovery can be cleaned up - c.Lock() - logrus.Debugf("network %s delete, clean svcRecords", n.id) - delete(c.svcRecords, n.id) - c.Unlock() - // deleteFromStore performs an atomic delete operation and the // network.epCnt will help prevent any possible // race between endpoint join and network delete diff --git a/service.go b/service.go index c890e01a39..5a0d7e0057 100644 --- a/service.go +++ b/service.go @@ -85,14 +85,8 @@ type loadBalancer struct { // Map of backend IPs backing this loadbalancer on this // network. It is keyed with endpoint ID. - backEnds map[string]loadBalancerBackend + backEnds map[string]net.IP // Back pointer to service to which the loadbalancer belongs. service *service } - -type loadBalancerBackend struct { - ip net.IP - containerName string - taskAliases []string -} diff --git a/service_common.go b/service_common.go index c5c28ea220..8ee3c43e9e 100644 --- a/service_common.go +++ b/service_common.go @@ -132,6 +132,7 @@ func newService(name string, id string, ingressPorts []*PortConfig, serviceAlias func (c *controller) cleanupServiceBindings(cleanupNID string) { var cleanupFuncs []func() + logrus.Debugf("cleanupServiceBindings for %s", cleanupNID) c.Lock() services := make([]*service, 0, len(c.serviceBindings)) for _, s := range c.serviceBindings { @@ -151,16 +152,27 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) { continue } - for eid, be := range lb.backEnds { + // The network is being deleted, erase all the associated service discovery records + // TODO(fcrisciani) separate the Load Balancer from the Service discovery, this operation + // can be done safely here, but the rmServiceBinding is still keeping consistency in the + // data structures that are tracking the endpoint to IP mapping. + c.Lock() + logrus.Debugf("cleanupServiceBindings erasing the svcRecords for %s", nid) + delete(c.svcRecords, nid) + c.Unlock() + + for eid, ip := range lb.backEnds { + epID := eid + epIP := ip service := s loadBalancer := lb networkID := nid - epID := eid - epIP := be.ip - cleanupFuncs = append(cleanupFuncs, func() { - if err := c.rmServiceBinding(service.name, service.id, networkID, epID, be.containerName, loadBalancer.vip, - service.ingressPorts, service.aliases, be.taskAliases, epIP, "cleanupServiceBindings"); err != nil { + // ContainerName and taskAliases are not available here, this is still fine because the Service discovery + // cleanup already happened before. The only thing that rmServiceBinding is still doing here a part from the Load + // Balancer bookeeping, is to keep consistent the mapping of endpoint to IP. + if err := c.rmServiceBinding(service.name, service.id, networkID, epID, "", loadBalancer.vip, + service.ingressPorts, service.aliases, []string{}, epIP, "cleanupServiceBindings", false); err != nil { logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", service.id, networkID, epID, err) } @@ -221,7 +233,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s lb = &loadBalancer{ vip: vip, fwMark: fwMarkCtr, - backEnds: make(map[string]loadBalancerBackend), + backEnds: make(map[string]net.IP), service: s, } @@ -232,9 +244,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s addService = true } - lb.backEnds[eID] = loadBalancerBackend{ip: ip, - containerName: containerName, - taskAliases: taskAliases} + lb.backEnds[eID] = ip ok, entries := s.assignIPToEndpoint(ip.String(), eID) if !ok || entries > 1 { @@ -256,7 +266,7 @@ func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName s return nil } -func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string) error { +func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string, deleteSvcRecords bool) error { var rmService bool @@ -280,7 +290,7 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st s.Lock() defer s.Unlock() - logrus.Debugf("rmServiceBinding from %s START for %s %s p:%p nid:%s sKey:%v", method, svcName, eID, s, nID, skey) + logrus.Debugf("rmServiceBinding from %s START for %s %s p:%p nid:%s sKey:%v deleteSvc:%t", method, svcName, eID, s, nID, skey, deleteSvcRecords) lb, ok := s.loadBalancers[nID] if !ok { logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID) @@ -317,7 +327,9 @@ func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName st } // Delete the name resolutions - c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding") + if deleteSvcRecords { + c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding") + } if len(s.loadBalancers) == 0 { // All loadbalancers for the service removed. Time to diff --git a/service_linux.go b/service_linux.go index 70af7e33bc..7284e9a00e 100644 --- a/service_linux.go +++ b/service_linux.go @@ -102,8 +102,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) { } lb.service.Lock() - for _, l := range lb.backEnds { - sb.addLBBackend(l.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) + for _, ip := range lb.backEnds { + sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress) } lb.service.Unlock() } From 0412c9dde67b0ae6cc52ab9e76f998616dc579ec Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Sat, 17 Jun 2017 16:32:33 -0700 Subject: [PATCH 6/7] Addressed comments Signed-off-by: Flavio Crisciani (cherry picked from commit 5fd5257ad3597be83be8532b0dcc15e84b26e17c) --- network.go | 5 +++-- service_common.go | 36 ++++++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/network.go b/network.go index d79ace3dfd..998df6072d 100644 --- a/network.go +++ b/network.go @@ -1062,12 +1062,14 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool } serviceID := ep.svcID + if serviceID == "" { + serviceID = ep.ID() + } if isAdd { // If anonymous endpoint has an alias use the first alias // for ip->name mapping. Not having the reverse mapping // breaks some apps if ep.isAnonymous() { - serviceID = ep.ID() if len(myAliases) > 0 { n.addSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } @@ -1079,7 +1081,6 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool } } else { if ep.isAnonymous() { - serviceID = ep.ID() if len(myAliases) > 0 { n.deleteSvcRecords(ep.ID(), myAliases[0], serviceID, iface.Address().IP, ipv6, true, "updateSvcRecord") } diff --git a/service_common.go b/service_common.go index 8ee3c43e9e..c8155ca236 100644 --- a/service_common.go +++ b/service_common.go @@ -20,24 +20,30 @@ func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, contain // Add container resolution mappings c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) + serviceID := svcID + if serviceID == "" { + // This is the case of a normal container not part of a service + serviceID = eID + } + // Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR. - n.(*network).addSvcRecords(eID, "tasks."+svcName, svcID, ip, nil, false, method) + n.(*network).addSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).addSvcRecords(eID, "tasks."+alias, svcID, ip, nil, false, method) + n.(*network).addSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method) } // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR if len(vip) == 0 { - n.(*network).addSvcRecords(eID, svcName, eID, ip, nil, false, method) + n.(*network).addSvcRecords(eID, svcName, serviceID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).addSvcRecords(eID, alias, eID, ip, nil, false, method) + n.(*network).addSvcRecords(eID, alias, serviceID, ip, nil, false, method) } } if addService && len(vip) != 0 { - n.(*network).addSvcRecords(eID, svcName, svcID, vip, nil, false, method) + n.(*network).addSvcRecords(eID, svcName, serviceID, vip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).addSvcRecords(eID, alias, svcID, vip, nil, false, method) + n.(*network).addSvcRecords(eID, alias, serviceID, vip, nil, false, method) } } @@ -73,27 +79,33 @@ func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, cont // Delete container resolution mappings c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method) + serviceID := svcID + if serviceID == "" { + // This is the case of a normal container not part of a service + serviceID = eID + } + // Delete the special "tasks.svc_name" backend record. if !multipleEntries { - n.(*network).deleteSvcRecords(eID, "tasks."+svcName, svcID, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, "tasks."+svcName, serviceID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).deleteSvcRecords(eID, "tasks."+alias, svcID, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, "tasks."+alias, serviceID, ip, nil, false, method) } } // If we are doing DNS RR delete the endpoint IP from DNS record right away. if !multipleEntries && len(vip) == 0 { - n.(*network).deleteSvcRecords(eID, svcName, eID, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, svcName, serviceID, ip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).deleteSvcRecords(eID, alias, eID, ip, nil, false, method) + n.(*network).deleteSvcRecords(eID, alias, serviceID, ip, nil, false, method) } } // Remove the DNS record for VIP only if we are removing the service if rmService && len(vip) != 0 && !multipleEntries { - n.(*network).deleteSvcRecords(eID, svcName, svcID, vip, nil, false, method) + n.(*network).deleteSvcRecords(eID, svcName, serviceID, vip, nil, false, method) for _, alias := range serviceAliases { - n.(*network).deleteSvcRecords(eID, alias, svcID, vip, nil, false, method) + n.(*network).deleteSvcRecords(eID, alias, serviceID, vip, nil, false, method) } } From 05bd0fbc28a8ad3f772a5fb4ca069ea50763bc51 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Sat, 17 Jun 2017 17:25:04 -0700 Subject: [PATCH 7/7] NetworkDB deleteEntry has to happen Signed-off-by: Flavio Crisciani (cherry picked from commit 49378070a1683d0b53550bc21e121816bdf28cd8) --- agent.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/agent.go b/agent.go index 6f072ae1be..173dd768d4 100644 --- a/agent.go +++ b/agent.go @@ -525,13 +525,13 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error { TaskAliases: ep.myAliases, EndpointIP: ep.Iface().Address().IP.String(), }) - if err != nil { return err } if agent != nil { if err := agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil { + logrus.Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err) return err } } @@ -556,6 +556,13 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err agent := c.getAgent() if !ep.isAnonymous() && ep.Iface().Address() != nil { + // Delete the entry from network DB, this will trigger a notification to other nodes + if agent != nil { + if err := agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil { + logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err) + } + } + // Update locally if ep.svcID != "" { // This is a task part of a service var ingressPorts []*PortConfig @@ -571,11 +578,6 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err return err } } - if agent != nil { - if err := agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil { - return err - } - } } logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())