Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions core/nylon_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,20 @@ func reconcileConfiguredEndpoints(neigh *state.Neighbour, desired []*state.Dynam
}

func (n *Nylon) reconcileAdvertisedPrefixes(next *state.CentralCfg) {
cur := n.GetRouter(n.LocalCfg.Id)
nextRouter := next.GetRouter(n.LocalCfg.Id)

currentLocal := make(map[netip.Prefix]state.PrefixHealthWrapper)
for _, prefix := range cur.Prefixes {
currentLocal[prefix.GetPrefix()] = prefix
if cur := n.CentralCfg.TryGetNode(n.LocalCfg.Id); cur != nil {
for _, prefix := range cur.Prefixes {
currentLocal[prefix.GetPrefix()] = prefix
}
}
desiredLocal := make(map[netip.Prefix]state.PrefixHealthWrapper)
for _, prefix := range nextRouter.Prefixes {
desiredLocal[prefix.GetPrefix()] = prefix
nextNode := next.TryGetNode(n.LocalCfg.Id)
if nextNode == nil {
return
}

desiredLocal := make(map[netip.Prefix]int)
for i, prefix := range nextNode.Prefixes {
desiredLocal[prefix.GetPrefix()] = i
}

for prefix, adv := range n.RouterState.Advertised {
Expand All @@ -161,8 +165,15 @@ func (n *Nylon) reconcileAdvertisedPrefixes(next *state.CentralCfg) {
}
}

for prefix, desired := range desiredLocal {
if _, ok := currentLocal[prefix]; !ok {
for prefix, index := range desiredLocal {
desired := nextNode.Prefixes[index]
if current, ok := currentLocal[prefix]; ok && current.SameConfig(desired, &n.RouterTunables) {
desired = current
nextNode.Prefixes[index] = current
} else {
if current, ok := currentLocal[prefix]; ok {
current.Stop()
}
n.Log.Debug("starting prefix healthcheck", "prefix", prefix)
desired.Start(n.Log, &n.RouterTunables)
}
Comment thread
encodeous marked this conversation as resolved.
Expand Down
136 changes: 136 additions & 0 deletions core/nylon_apply_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package core

import (
"io"
"log/slog"
"net/netip"
"testing"
"time"

"github.com/encodeous/nylon/state"
"github.com/stretchr/testify/assert"
)

func TestReconcileAdvertisedPrefixesStartsChangedPrefixHealth(t *testing.T) {
prefix := netip.MustParsePrefix("fd00::53/128")
oldPrefix := state.PrefixHealthWrapper{
PrefixHealth: &state.StaticPrefixHealth{Prefix: prefix},
}
n := testNylonWithPrefixes(oldPrefix)
n.RouterState.Advertised[prefix] = state.Advertisement{
NodeId: n.LocalCfg.Id,
Expiry: maxConfigTime,
MetricFn: oldPrefix.GetMetric,
}

delay := time.Millisecond
next := testCentralConfig(n.LocalCfg.Id, state.PrefixHealthWrapper{
PrefixHealth: &state.HTTPPrefixHealth{
Prefix: prefix,
URL: "http://127.0.0.1:1/healthz",
Delay: &delay,
},
})

n.reconcileAdvertisedPrefixes(&next)
t.Cleanup(next.Routers[0].Prefixes[0].Stop)

assert.Equal(t, state.INF, n.RouterState.Advertised[prefix].MetricFn())
}

func TestReconcileAdvertisedPrefixesStartsChangedPingPrefixHealth(t *testing.T) {
prefix := netip.MustParsePrefix("fd00::54/128")
oldPrefix := state.PrefixHealthWrapper{
PrefixHealth: &state.StaticPrefixHealth{Prefix: prefix},
}
n := testNylonWithPrefixes(oldPrefix)
n.RouterState.Advertised[prefix] = state.Advertisement{
NodeId: n.LocalCfg.Id,
Expiry: maxConfigTime,
MetricFn: oldPrefix.GetMetric,
}

delay := 100 * time.Millisecond
next := testCentralConfig(n.LocalCfg.Id, state.PrefixHealthWrapper{
PrefixHealth: &state.PingPrefixHealth{
Prefix: prefix,
Addr: netip.MustParseAddr("127.0.0.1"),
Delay: &delay,
},
})

n.reconcileAdvertisedPrefixes(&next)
t.Cleanup(next.Routers[0].Prefixes[0].Stop)

assert.Equal(t, state.INF, n.RouterState.Advertised[prefix].MetricFn())
}

func TestReconcileAdvertisedPrefixesReusesUnchangedRunningPrefixHealth(t *testing.T) {
prefix := netip.MustParsePrefix("fd00::53/128")
delay := time.Millisecond
current := state.PrefixHealthWrapper{
PrefixHealth: &state.HTTPPrefixHealth{
Prefix: prefix,
URL: "http://127.0.0.1:1/healthz",
Delay: &delay,
},
}
n := testNylonWithPrefixes(current)
current.Start(n.Log, &n.RouterTunables)
t.Cleanup(current.Stop)
n.RouterState.Advertised[prefix] = state.Advertisement{
NodeId: n.LocalCfg.Id,
Expiry: maxConfigTime,
MetricFn: current.GetMetric,
ExpiryFn: current.Stop,
}

next := testCentralConfig(n.LocalCfg.Id, state.PrefixHealthWrapper{
PrefixHealth: &state.HTTPPrefixHealth{
Prefix: prefix,
URL: "http://127.0.0.1:1/healthz",
Delay: &delay,
},
})

n.reconcileAdvertisedPrefixes(&next)

assert.Same(t, current.PrefixHealth, next.Routers[0].Prefixes[0].PrefixHealth)
assert.Equal(t, state.INF, n.RouterState.Advertised[prefix].MetricFn())
}

func testNylonWithPrefixes(prefixes ...state.PrefixHealthWrapper) *Nylon {
id := state.NodeId("node")
tunables := state.DefaultRouterTunables()
return &Nylon{
RouterTunables: tunables,
ConfigState: state.ConfigState{
CentralCfg: testCentralConfig(id, prefixes...),
LocalCfg: state.LocalCfg{
Id: id,
},
},
RouterState: &state.RouterState{
RouterTunables: &tunables,
Id: id,
SelfSeqno: make(map[netip.Prefix]uint16),
Routes: make(map[netip.Prefix]state.SelRoute),
Sources: make(map[state.Source]state.FD),
Advertised: make(map[netip.Prefix]state.Advertisement),
},
Log: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
}

func testCentralConfig(id state.NodeId, prefixes ...state.PrefixHealthWrapper) state.CentralCfg {
return state.CentralCfg{
Routers: []state.RouterCfg{
{
NodeCfg: state.NodeCfg{
Id: id,
Prefixes: prefixes,
},
},
},
}
}
80 changes: 70 additions & 10 deletions state/prefix_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,19 @@ func (s *StaticPrefixHealth) Start(log *slog.Logger, t *RouterTunables) {
// do nothing
}

func (s *StaticPrefixHealth) sameConfig(other PrefixHealth, _ *RouterTunables) bool {
o, ok := other.(*StaticPrefixHealth)
return ok && s.Prefix == o.Prefix && s.Metric == o.Metric
}

type PingPrefixHealth struct {
Prefix netip.Prefix `yaml:"prefix"`
Addr netip.Addr `yaml:"addr"` // the address to ping
MaxFailures *int `yaml:"max_failures,omitempty"` // number of failures before returning infinite metric
Delay *time.Duration `yaml:"delay,omitempty"` // delay between pings
BindIf string `yaml:"bind_if,omitempty"` // local interface to bind to
Metric *uint32 `yaml:"metric,omitempty"` // metric override
lastMetric uint32
lastMetric atomic.Uint32
running atomic.Bool
}

Expand Down Expand Up @@ -81,7 +86,7 @@ func (p *PingPrefixHealth) GetMetric() uint32 {
if p.Metric != nil {
return *p.Metric
}
return p.lastMetric
return p.lastMetric.Load()
}
func (p *PingPrefixHealth) GetPrefix() netip.Prefix {
return p.Prefix
Expand All @@ -96,11 +101,12 @@ func (p *PingPrefixHealth) Start(log *slog.Logger, t *RouterTunables) {
if p.MaxFailures == nil {
p.MaxFailures = &t.HealthCheckMaxFailures
}
// Default to unreachable until the first successful probe.
p.lastMetric.Store(INF)
go func() {
ticker := time.NewTicker(*p.Delay)
for p.running.Load() {
time.Sleep(*p.Delay)
p.lastMetric = INF
bind4 := ""
bind6 := ""
var err error
Expand Down Expand Up @@ -133,25 +139,36 @@ func (p *PingPrefixHealth) Start(log *slog.Logger, t *RouterTunables) {
rtt, err := pinger.PingAttempts(addr, time.Duration(int64(*p.Delay)/int64(*p.MaxFailures)), *p.MaxFailures)
if err != nil {
// failed
p.lastMetric = INF
p.lastMetric.Store(INF)
log.Debug("prefix healthcheck failed", "prefix", p.Prefix.String(), "addr", p.Addr.String(), "error", err)
pinger.Close()
break // break to outer loop to recreate pinger
} else {
// success
p.lastMetric = DurationToMetric(rtt)
p.lastMetric.Store(DurationToMetric(rtt))
}
}
}
}()
}

func (p *PingPrefixHealth) sameConfig(other PrefixHealth, tunables *RouterTunables) bool {
o, ok := other.(*PingPrefixHealth)
return ok &&
p.Prefix == o.Prefix &&
p.Addr == o.Addr &&
p.BindIf == o.BindIf &&
sameOptionalUint32(p.Metric, o.Metric) &&
prefixHealthDelay(p.Delay, tunables) == prefixHealthDelay(o.Delay, tunables) &&
prefixHealthMaxFailures(p.MaxFailures, tunables) == prefixHealthMaxFailures(o.MaxFailures, tunables)
}

type HTTPPrefixHealth struct {
Prefix netip.Prefix `yaml:"prefix"`
URL string `yaml:"url"` // the URL to check
Delay *time.Duration `yaml:"delay,omitempty"` // delay between probes
Metric *uint32 `yaml:"metric,omitempty"` // metric override
lastMetric uint32
lastMetric atomic.Uint32
running atomic.Bool
}

Expand All @@ -163,7 +180,7 @@ func (h *HTTPPrefixHealth) GetMetric() uint32 {
if h.Metric != nil {
return *h.Metric
}
return h.lastMetric
return h.lastMetric.Load()
}
func (h *HTTPPrefixHealth) GetPrefix() netip.Prefix {
return h.Prefix
Expand All @@ -172,7 +189,7 @@ func (h *HTTPPrefixHealth) Start(log *slog.Logger, t *RouterTunables) {
if h.running.Swap(true) {
return
}
h.lastMetric = INF
h.lastMetric.Store(INF)
if h.Delay == nil {
h.Delay = &t.HealthCheckDelay
}
Expand All @@ -186,21 +203,64 @@ func (h *HTTPPrefixHealth) Start(log *slog.Logger, t *RouterTunables) {
resp, err := http.Get(h.URL)
if err != nil || resp.StatusCode != http.StatusOK {
// failed
h.lastMetric = INF
h.lastMetric.Store(INF)
log.Debug("prefix healthcheck failed", "prefix", h.Prefix.String(), "url", h.URL, "error", err)
} else {
// success
rtt := time.Since(startTime)
h.lastMetric = DurationToMetric(rtt)
h.lastMetric.Store(DurationToMetric(rtt))
}
}
}()
}

func (h *HTTPPrefixHealth) sameConfig(other PrefixHealth, tunables *RouterTunables) bool {
o, ok := other.(*HTTPPrefixHealth)
return ok &&
h.Prefix == o.Prefix &&
h.URL == o.URL &&
sameOptionalUint32(h.Metric, o.Metric) &&
prefixHealthDelay(h.Delay, tunables) == prefixHealthDelay(o.Delay, tunables)
}

type PrefixHealthWrapper struct {
PrefixHealth
}

type prefixHealthConfig interface {
sameConfig(other PrefixHealth, tunables *RouterTunables) bool
}

// SameConfig reports whether two prefix health checks have equivalent configuration.
func (p PrefixHealthWrapper) SameConfig(other PrefixHealthWrapper, tunables *RouterTunables) bool {
if p.PrefixHealth == nil || other.PrefixHealth == nil {
return p.PrefixHealth == other.PrefixHealth
}
config, ok := p.PrefixHealth.(prefixHealthConfig)
return ok && config.sameConfig(other.PrefixHealth, tunables)
}

func sameOptionalUint32(a, b *uint32) bool {
if a == nil || b == nil {
return a == b
}
return *a == *b
}

func prefixHealthDelay(value *time.Duration, tunables *RouterTunables) time.Duration {
if value != nil {
return *value
}
return tunables.HealthCheckDelay
}

func prefixHealthMaxFailures(value *int, tunables *RouterTunables) int {
if value != nil {
return *value
}
return tunables.HealthCheckMaxFailures
}

func (p PrefixHealthWrapper) MarshalYAML() (interface{}, error) {
switch v := p.PrefixHealth.(type) {
case *StaticPrefixHealth:
Expand Down
Loading