diff --git a/core/nylon_apply.go b/core/nylon_apply.go index b5ad501b..87a2ae51 100644 --- a/core/nylon_apply.go +++ b/core/nylon_apply.go @@ -137,16 +137,20 @@ func reconcileConfiguredEndpoints(neigh *state.Neighbour, desired []*state.Dynam } func (n *Nylon) reconcileAdvertisedPrefixes(next *state.CentralCfg) { - cur := n.GetRouter(n.LocalCfg.Id) - nextRouter := next.GetRouter(n.LocalCfg.Id) - currentLocal := make(map[netip.Prefix]state.PrefixHealthWrapper) - for _, prefix := range cur.Prefixes { - currentLocal[prefix.GetPrefix()] = prefix + if cur := n.CentralCfg.TryGetNode(n.LocalCfg.Id); cur != nil { + for _, prefix := range cur.Prefixes { + currentLocal[prefix.GetPrefix()] = prefix + } } - desiredLocal := make(map[netip.Prefix]state.PrefixHealthWrapper) - for _, prefix := range nextRouter.Prefixes { - desiredLocal[prefix.GetPrefix()] = prefix + nextNode := next.TryGetNode(n.LocalCfg.Id) + if nextNode == nil { + return + } + + desiredLocal := make(map[netip.Prefix]int) + for i, prefix := range nextNode.Prefixes { + desiredLocal[prefix.GetPrefix()] = i } for prefix, adv := range n.RouterState.Advertised { @@ -161,8 +165,15 @@ func (n *Nylon) reconcileAdvertisedPrefixes(next *state.CentralCfg) { } } - for prefix, desired := range desiredLocal { - if _, ok := currentLocal[prefix]; !ok { + for prefix, index := range desiredLocal { + desired := nextNode.Prefixes[index] + if current, ok := currentLocal[prefix]; ok && current.SameConfig(desired, &n.RouterTunables) { + desired = current + nextNode.Prefixes[index] = current + } else { + if current, ok := currentLocal[prefix]; ok { + current.Stop() + } n.Log.Debug("starting prefix healthcheck", "prefix", prefix) desired.Start(n.Log, &n.RouterTunables) } diff --git a/core/nylon_apply_test.go b/core/nylon_apply_test.go new file mode 100644 index 00000000..be7d1ae2 --- /dev/null +++ b/core/nylon_apply_test.go @@ -0,0 +1,136 @@ +package core + +import ( + "io" + "log/slog" + "net/netip" + "testing" + "time" + + "github.com/encodeous/nylon/state" + "github.com/stretchr/testify/assert" +) + +func TestReconcileAdvertisedPrefixesStartsChangedPrefixHealth(t *testing.T) { + prefix := netip.MustParsePrefix("fd00::53/128") + oldPrefix := state.PrefixHealthWrapper{ + PrefixHealth: &state.StaticPrefixHealth{Prefix: prefix}, + } + n := testNylonWithPrefixes(oldPrefix) + n.RouterState.Advertised[prefix] = state.Advertisement{ + NodeId: n.LocalCfg.Id, + Expiry: maxConfigTime, + MetricFn: oldPrefix.GetMetric, + } + + delay := time.Millisecond + next := testCentralConfig(n.LocalCfg.Id, state.PrefixHealthWrapper{ + PrefixHealth: &state.HTTPPrefixHealth{ + Prefix: prefix, + URL: "http://127.0.0.1:1/healthz", + Delay: &delay, + }, + }) + + n.reconcileAdvertisedPrefixes(&next) + t.Cleanup(next.Routers[0].Prefixes[0].Stop) + + assert.Equal(t, state.INF, n.RouterState.Advertised[prefix].MetricFn()) +} + +func TestReconcileAdvertisedPrefixesStartsChangedPingPrefixHealth(t *testing.T) { + prefix := netip.MustParsePrefix("fd00::54/128") + oldPrefix := state.PrefixHealthWrapper{ + PrefixHealth: &state.StaticPrefixHealth{Prefix: prefix}, + } + n := testNylonWithPrefixes(oldPrefix) + n.RouterState.Advertised[prefix] = state.Advertisement{ + NodeId: n.LocalCfg.Id, + Expiry: maxConfigTime, + MetricFn: oldPrefix.GetMetric, + } + + delay := 100 * time.Millisecond + next := testCentralConfig(n.LocalCfg.Id, state.PrefixHealthWrapper{ + PrefixHealth: &state.PingPrefixHealth{ + Prefix: prefix, + Addr: netip.MustParseAddr("127.0.0.1"), + Delay: &delay, + }, + }) + + n.reconcileAdvertisedPrefixes(&next) + t.Cleanup(next.Routers[0].Prefixes[0].Stop) + + assert.Equal(t, state.INF, n.RouterState.Advertised[prefix].MetricFn()) +} + +func TestReconcileAdvertisedPrefixesReusesUnchangedRunningPrefixHealth(t *testing.T) { + prefix := netip.MustParsePrefix("fd00::53/128") + delay := time.Millisecond + current := state.PrefixHealthWrapper{ + PrefixHealth: &state.HTTPPrefixHealth{ + Prefix: prefix, + URL: "http://127.0.0.1:1/healthz", + Delay: &delay, + }, + } + n := testNylonWithPrefixes(current) + current.Start(n.Log, &n.RouterTunables) + t.Cleanup(current.Stop) + n.RouterState.Advertised[prefix] = state.Advertisement{ + NodeId: n.LocalCfg.Id, + Expiry: maxConfigTime, + MetricFn: current.GetMetric, + ExpiryFn: current.Stop, + } + + next := testCentralConfig(n.LocalCfg.Id, state.PrefixHealthWrapper{ + PrefixHealth: &state.HTTPPrefixHealth{ + Prefix: prefix, + URL: "http://127.0.0.1:1/healthz", + Delay: &delay, + }, + }) + + n.reconcileAdvertisedPrefixes(&next) + + assert.Same(t, current.PrefixHealth, next.Routers[0].Prefixes[0].PrefixHealth) + assert.Equal(t, state.INF, n.RouterState.Advertised[prefix].MetricFn()) +} + +func testNylonWithPrefixes(prefixes ...state.PrefixHealthWrapper) *Nylon { + id := state.NodeId("node") + tunables := state.DefaultRouterTunables() + return &Nylon{ + RouterTunables: tunables, + ConfigState: state.ConfigState{ + CentralCfg: testCentralConfig(id, prefixes...), + LocalCfg: state.LocalCfg{ + Id: id, + }, + }, + RouterState: &state.RouterState{ + RouterTunables: &tunables, + Id: id, + SelfSeqno: make(map[netip.Prefix]uint16), + Routes: make(map[netip.Prefix]state.SelRoute), + Sources: make(map[state.Source]state.FD), + Advertised: make(map[netip.Prefix]state.Advertisement), + }, + Log: slog.New(slog.NewTextHandler(io.Discard, nil)), + } +} + +func testCentralConfig(id state.NodeId, prefixes ...state.PrefixHealthWrapper) state.CentralCfg { + return state.CentralCfg{ + Routers: []state.RouterCfg{ + { + NodeCfg: state.NodeCfg{ + Id: id, + Prefixes: prefixes, + }, + }, + }, + } +} diff --git a/state/prefix_health.go b/state/prefix_health.go index 11a70122..3a1f90d4 100644 --- a/state/prefix_health.go +++ b/state/prefix_health.go @@ -39,6 +39,11 @@ func (s *StaticPrefixHealth) Start(log *slog.Logger, t *RouterTunables) { // do nothing } +func (s *StaticPrefixHealth) sameConfig(other PrefixHealth, _ *RouterTunables) bool { + o, ok := other.(*StaticPrefixHealth) + return ok && s.Prefix == o.Prefix && s.Metric == o.Metric +} + type PingPrefixHealth struct { Prefix netip.Prefix `yaml:"prefix"` Addr netip.Addr `yaml:"addr"` // the address to ping @@ -46,7 +51,7 @@ type PingPrefixHealth struct { Delay *time.Duration `yaml:"delay,omitempty"` // delay between pings BindIf string `yaml:"bind_if,omitempty"` // local interface to bind to Metric *uint32 `yaml:"metric,omitempty"` // metric override - lastMetric uint32 + lastMetric atomic.Uint32 running atomic.Bool } @@ -81,7 +86,7 @@ func (p *PingPrefixHealth) GetMetric() uint32 { if p.Metric != nil { return *p.Metric } - return p.lastMetric + return p.lastMetric.Load() } func (p *PingPrefixHealth) GetPrefix() netip.Prefix { return p.Prefix @@ -96,11 +101,12 @@ func (p *PingPrefixHealth) Start(log *slog.Logger, t *RouterTunables) { if p.MaxFailures == nil { p.MaxFailures = &t.HealthCheckMaxFailures } + // Default to unreachable until the first successful probe. + p.lastMetric.Store(INF) go func() { ticker := time.NewTicker(*p.Delay) for p.running.Load() { time.Sleep(*p.Delay) - p.lastMetric = INF bind4 := "" bind6 := "" var err error @@ -133,25 +139,36 @@ func (p *PingPrefixHealth) Start(log *slog.Logger, t *RouterTunables) { rtt, err := pinger.PingAttempts(addr, time.Duration(int64(*p.Delay)/int64(*p.MaxFailures)), *p.MaxFailures) if err != nil { // failed - p.lastMetric = INF + p.lastMetric.Store(INF) log.Debug("prefix healthcheck failed", "prefix", p.Prefix.String(), "addr", p.Addr.String(), "error", err) pinger.Close() break // break to outer loop to recreate pinger } else { // success - p.lastMetric = DurationToMetric(rtt) + p.lastMetric.Store(DurationToMetric(rtt)) } } } }() } +func (p *PingPrefixHealth) sameConfig(other PrefixHealth, tunables *RouterTunables) bool { + o, ok := other.(*PingPrefixHealth) + return ok && + p.Prefix == o.Prefix && + p.Addr == o.Addr && + p.BindIf == o.BindIf && + sameOptionalUint32(p.Metric, o.Metric) && + prefixHealthDelay(p.Delay, tunables) == prefixHealthDelay(o.Delay, tunables) && + prefixHealthMaxFailures(p.MaxFailures, tunables) == prefixHealthMaxFailures(o.MaxFailures, tunables) +} + type HTTPPrefixHealth struct { Prefix netip.Prefix `yaml:"prefix"` URL string `yaml:"url"` // the URL to check Delay *time.Duration `yaml:"delay,omitempty"` // delay between probes Metric *uint32 `yaml:"metric,omitempty"` // metric override - lastMetric uint32 + lastMetric atomic.Uint32 running atomic.Bool } @@ -163,7 +180,7 @@ func (h *HTTPPrefixHealth) GetMetric() uint32 { if h.Metric != nil { return *h.Metric } - return h.lastMetric + return h.lastMetric.Load() } func (h *HTTPPrefixHealth) GetPrefix() netip.Prefix { return h.Prefix @@ -172,7 +189,7 @@ func (h *HTTPPrefixHealth) Start(log *slog.Logger, t *RouterTunables) { if h.running.Swap(true) { return } - h.lastMetric = INF + h.lastMetric.Store(INF) if h.Delay == nil { h.Delay = &t.HealthCheckDelay } @@ -186,21 +203,64 @@ func (h *HTTPPrefixHealth) Start(log *slog.Logger, t *RouterTunables) { resp, err := http.Get(h.URL) if err != nil || resp.StatusCode != http.StatusOK { // failed - h.lastMetric = INF + h.lastMetric.Store(INF) log.Debug("prefix healthcheck failed", "prefix", h.Prefix.String(), "url", h.URL, "error", err) } else { // success rtt := time.Since(startTime) - h.lastMetric = DurationToMetric(rtt) + h.lastMetric.Store(DurationToMetric(rtt)) } } }() } +func (h *HTTPPrefixHealth) sameConfig(other PrefixHealth, tunables *RouterTunables) bool { + o, ok := other.(*HTTPPrefixHealth) + return ok && + h.Prefix == o.Prefix && + h.URL == o.URL && + sameOptionalUint32(h.Metric, o.Metric) && + prefixHealthDelay(h.Delay, tunables) == prefixHealthDelay(o.Delay, tunables) +} + type PrefixHealthWrapper struct { PrefixHealth } +type prefixHealthConfig interface { + sameConfig(other PrefixHealth, tunables *RouterTunables) bool +} + +// SameConfig reports whether two prefix health checks have equivalent configuration. +func (p PrefixHealthWrapper) SameConfig(other PrefixHealthWrapper, tunables *RouterTunables) bool { + if p.PrefixHealth == nil || other.PrefixHealth == nil { + return p.PrefixHealth == other.PrefixHealth + } + config, ok := p.PrefixHealth.(prefixHealthConfig) + return ok && config.sameConfig(other.PrefixHealth, tunables) +} + +func sameOptionalUint32(a, b *uint32) bool { + if a == nil || b == nil { + return a == b + } + return *a == *b +} + +func prefixHealthDelay(value *time.Duration, tunables *RouterTunables) time.Duration { + if value != nil { + return *value + } + return tunables.HealthCheckDelay +} + +func prefixHealthMaxFailures(value *int, tunables *RouterTunables) int { + if value != nil { + return *value + } + return tunables.HealthCheckMaxFailures +} + func (p PrefixHealthWrapper) MarshalYAML() (interface{}, error) { switch v := p.PrefixHealth.(type) { case *StaticPrefixHealth: