Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 84 additions & 41 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error {
return nil
}

func (ep *endpoint) addServiceInfoToCluster() error {
func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
return nil
}
Expand All @@ -593,24 +593,49 @@ func (ep *endpoint) addServiceInfoToCluster() error {
return nil
}

sb.Service.Lock()
defer sb.Service.Unlock()
logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())

// Check that the endpoint is still present on the sandbox before adding it to the service discovery.
// This is to handle a race between the EnableService and the sbLeave
// It is possible that the EnableService starts, fetches the list of the endpoints and
// by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
// The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
// This check under the Service lock of the sandbox ensure the correct behavior.
// If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
// but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
// In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
// removed from the list, in this situation the delete will bail out not finding any data to cleanup
// and the add will bail out not finding the endpoint on the sandbox.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the excellent summary of the problem.

if e := sb.getEndpoint(ep.ID()); e == nil {
logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
return nil
}

c := n.getController()
agent := c.getAgent()

name := ep.Name()
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mavenugo I guess to handle properly the case of anonymous container this has to be brought up

if ep.isAnonymous() {
name = ep.MyAliases()[0]
}

var ingressPorts []*PortConfig
if ep.svcID != "" {
// This is a task part of a service
// Gossip ingress ports only in ingress network.
if n.ingress {
ingressPorts = ep.ingressPorts
}

if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
return err
}
}

name := ep.Name()
if ep.isAnonymous() {
name = ep.MyAliases()[0]
}

buf, err := proto.Marshal(&EndpointRecord{
Expand All @@ -634,10 +659,12 @@ func (ep *endpoint) addServiceInfoToCluster() error {
}
}

logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())

return nil
}

func (ep *endpoint) deleteServiceInfoFromCluster() error {
func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 {
return nil
}
Expand All @@ -647,17 +674,33 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
return nil
}

sb.Service.Lock()
defer sb.Service.Unlock()
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())

c := n.getController()
agent := c.getAgent()

if ep.svcID != "" && ep.Iface().Address() != nil {
var ingressPorts []*PortConfig
if n.ingress {
ingressPorts = ep.ingressPorts
}
name := ep.Name()
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mavenugo same here for the delete this name is needed

if ep.isAnonymous() {
name = ep.MyAliases()[0]
}

if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
return err
if ep.Iface().Address() != nil {
if ep.svcID != "" {
// This is a task part of a service
var ingressPorts []*PortConfig
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
return err
}
}
}

Expand All @@ -667,6 +710,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
}
}

logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())

return nil
}

Expand Down Expand Up @@ -814,58 +859,56 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
value = event.Value
case networkdb.UpdateEvent:
logrus.Errorf("Unexpected update service table event = %#v", event)
}

nw, err := c.NetworkByID(nid)
if err != nil {
logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
return
}
n := nw.(*network)

err = proto.Unmarshal(value, &epRec)
err := proto.Unmarshal(value, &epRec)
if err != nil {
logrus.Errorf("Failed to unmarshal service table value: %v", err)
return
}

name := epRec.Name
containerName := epRec.Name
svcName := epRec.ServiceName
svcID := epRec.ServiceID
vip := net.ParseIP(epRec.VirtualIP)
ip := net.ParseIP(epRec.EndpointIP)
ingressPorts := epRec.IngressPorts
aliases := epRec.Aliases
taskaliases := epRec.TaskAliases
serviceAliases := epRec.Aliases
taskAliases := epRec.TaskAliases

if name == "" || ip == nil {
if containerName == "" || ip == nil {
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
}

if isAdd {
logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec)
if svcID != "" {
if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
// This is a remote task part of a service
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
return
}
}

n.addSvcRecords(name, ip, nil, true)
for _, alias := range taskaliases {
n.addSvcRecords(alias, ip, nil, true)
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
}
}
} else {
logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec)
if svcID != "" {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
return
}
}

n.deleteSvcRecords(name, ip, nil, true)
for _, alias := range taskaliases {
n.deleteSvcRecords(alias, ip, nil, true)
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
}
}
}
}
2 changes: 1 addition & 1 deletion agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ option (gogoproto.goproto_stringer_all) = false;
// EndpointRecord specifies all the endpoint specific information that
// needs to gossiped to nodes participating in the network.
message EndpointRecord {
// Name of the endpoint
// Name of the container
string name = 1;

// Service name of the service to which this endpoint belongs.
Expand Down
123 changes: 123 additions & 0 deletions common/setmatrix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package common

import (
"sync"

mapset "github.com/deckarep/golang-set"
)

// SetMatrix is a map of Sets
type SetMatrix interface {
// Get returns the members of the set for a specific key as a slice.
Get(key string) ([]interface{}, bool)
// Contains is used to verify is an element is in a set for a specific key
// returns true if the element is in the set
// returns true if there is a set for the key
Contains(key string, value interface{}) (bool, bool)
// Insert inserts the mapping between the IP and the endpoint identifier
// returns true if the mapping was not present, false otherwise
// returns also the number of endpoints associated to the IP
Insert(key string, value interface{}) (bool, int)
// Remove removes the mapping between the IP and the endpoint identifier
// returns true if the mapping was deleted, false otherwise
// returns also the number of endpoints associated to the IP
Remove(key string, value interface{}) (bool, int)
// Cardinality returns the number of elements in the set of a specfic key
// returns false if the key is not in the map
Cardinality(key string) (int, bool)
// String returns the string version of the set, empty otherwise
// returns false if the key is not in the map
String(key string) (string, bool)
}

type setMatrix struct {
matrix map[string]mapset.Set

sync.Mutex
}

// NewSetMatrix creates a new set matrix object
func NewSetMatrix() SetMatrix {
s := &setMatrix{}
s.init()
return s
}

func (s *setMatrix) init() {
s.matrix = make(map[string]mapset.Set)
}

func (s *setMatrix) Get(key string) ([]interface{}, bool) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return nil, ok
}
return set.ToSlice(), ok
}

func (s *setMatrix) Contains(key string, value interface{}) (bool, bool) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return false, ok
}
return set.Contains(value), ok
}

func (s *setMatrix) Insert(key string, value interface{}) (bool, int) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
s.matrix[key] = mapset.NewSet()
s.matrix[key].Add(value)
return true, 1
}

return set.Add(value), set.Cardinality()
}

func (s *setMatrix) Remove(key string, value interface{}) (bool, int) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return false, 0
}

var removed bool
if set.Contains(value) {
set.Remove(value)
removed = true
// If the set is empty remove it from the matrix
if set.Cardinality() == 0 {
delete(s.matrix, key)
}
}

return removed, set.Cardinality()
}

func (s *setMatrix) Cardinality(key string) (int, bool) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return 0, ok
}

return set.Cardinality(), ok
}

func (s *setMatrix) String(key string) (string, bool) {
s.Lock()
defer s.Unlock()
set, ok := s.matrix[key]
if !ok {
return "", ok
}
return set.String(), ok
}
Loading