Skip to content

Commit 2e598a4

Browse files
authored
Use kube-resolver to resolve addresses in raft registry (#11422)
1. Created a podWatcherManager that manages podWatchers. This is extracted from kubeResolverBuilder, so that the code can be re-used. 2. Created WatchPodIP() API for non-grpc consumers 3. Resolve() now returns resolved IP if available. The connection key is the resolved IP instead of the hostname; so that when the IP changes, new requests get pushed to a new queue instead of the old queue with the old IP in dragonboat.
1 parent db36d6e commit 2e598a4

File tree

9 files changed

+533
-82
lines changed

9 files changed

+533
-82
lines changed

enterprise/server/raft/client/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"//server/environment",
1616
"//server/metrics",
1717
"//server/util/grpc_client",
18+
"//server/util/kuberesolver",
1819
"//server/util/lockmap",
1920
"//server/util/log",
2021
"//server/util/proto",

enterprise/server/raft/client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8-
"os"
98
"strconv"
109
"sync"
1110
"time"
@@ -16,6 +15,7 @@ import (
1615
"github.com/buildbuddy-io/buildbuddy/server/environment"
1716
"github.com/buildbuddy-io/buildbuddy/server/metrics"
1817
"github.com/buildbuddy-io/buildbuddy/server/util/grpc_client"
18+
"github.com/buildbuddy-io/buildbuddy/server/util/kuberesolver"
1919
"github.com/buildbuddy-io/buildbuddy/server/util/lockmap"
2020
"github.com/buildbuddy-io/buildbuddy/server/util/log"
2121
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
@@ -104,7 +104,7 @@ func (c *APIClient) getClient(ctx context.Context, peer string) (returnedClient
104104
// Use kube:/// resolver when running in k8s for instant IP updates on
105105
// pod restarts. Fall back to default grpc:// resolver otherwise (e.g. tests).
106106
target := "grpc://" + peer
107-
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" {
107+
if kuberesolver.RunningInKubernetes() {
108108
target = "kube:///" + peer
109109
}
110110
conn, err := grpc_client.DialSimple(target, grpc.WithConnectParams(grpc.ConnectParams{

enterprise/server/raft/registry/BUILD

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
"//enterprise/server/raft/constants",
1111
"//proto:raft_go_proto",
1212
"//server/interfaces",
13+
"//server/util/kuberesolver",
1314
"//server/util/log",
1415
"//server/util/proto",
1516
"//server/util/status",
@@ -28,8 +29,12 @@ go_test(
2829
":registry",
2930
"//server/gossip",
3031
"//server/testutil/testport",
32+
"//server/util/kuberesolver",
3133
"//server/util/log",
3234
"//server/util/status",
3335
"@com_github_stretchr_testify//require",
36+
"@io_k8s_api//core/v1:core",
37+
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
38+
"@io_k8s_client_go//kubernetes/fake",
3439
],
3540
)

enterprise/server/raft/registry/registry.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/constants"
1111
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
12+
"github.com/buildbuddy-io/buildbuddy/server/util/kuberesolver"
1213
"github.com/buildbuddy-io/buildbuddy/server/util/log"
1314
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
1415
"github.com/buildbuddy-io/buildbuddy/server/util/status"
@@ -68,6 +69,13 @@ type StaticRegistry struct {
6869

6970
targetAddresses sync.Map // map of NHID(string) => addresses
7071

72+
// Pod watcher for resolving raft addresses to IPs via k8s Watch API.
73+
// When set, raft addresses that are pod FQDNs are watched for IP changes
74+
// and Resolve() returns the resolved IP instead of the hostname.
75+
podWatcherManager *kuberesolver.PodWatcherManager
76+
resolvedRaftAddrs sync.Map // map of raftAddress(string) => resolved IP:port(string)
77+
raftWatchCancels sync.Map // map of raftAddress(string) => cancel func()
78+
7179
log log.Logger
7280
}
7381

@@ -85,6 +93,52 @@ func NewStaticNodeRegistry(streamConnections uint64, v dbConfig.TargetValidator,
8593
return n
8694
}
8795

96+
// SetPodWatcherManager configures the registry to resolve raft addresses
97+
// (pod FQDNs) to IPs using the k8s Watch API via the given PodWatcherManager.
98+
// When set, Resolve() returns the resolved pod IP instead of the hostname,
99+
// and uses the IP as the connection key so that an IP change triggers a new
100+
// connection.
101+
func (n *StaticRegistry) SetPodWatcherManager(m *kuberesolver.PodWatcherManager) {
102+
n.podWatcherManager = m
103+
}
104+
105+
// resolveRaftAddress returns the resolved IP:port for the given raft address
106+
// if a pod watcher is tracking it, otherwise returns the address as-is.
107+
func (n *StaticRegistry) resolveRaftAddress(raftAddr string) string {
108+
if resolved, ok := n.resolvedRaftAddrs.Load(raftAddr); ok {
109+
return resolved.(string)
110+
}
111+
return raftAddr
112+
}
113+
114+
// watchRaftAddress starts watching the given raft address for IP changes
115+
// via the k8s Watch API. If the address is not a pod FQDN or the pod watcher
116+
// is not configured, this is a no-op.
117+
func (n *StaticRegistry) watchRaftAddress(raftAddr string) {
118+
if n.podWatcherManager == nil {
119+
return
120+
}
121+
if _, ok := n.raftWatchCancels.Load(raftAddr); ok {
122+
return
123+
}
124+
cancel, err := n.podWatcherManager.WatchPodIP(raftAddr, func(ipPort string, watchErr error) {
125+
if watchErr != nil {
126+
if _, had := n.resolvedRaftAddrs.LoadAndDelete(raftAddr); had {
127+
n.log.Warningf("Raft address %s lost resolution: %s", raftAddr, watchErr)
128+
}
129+
return
130+
}
131+
if prev, loaded := n.resolvedRaftAddrs.Swap(raftAddr, ipPort); !loaded || prev.(string) != ipPort {
132+
n.log.Infof("Raft address %s resolved to %s", raftAddr, ipPort)
133+
}
134+
})
135+
if err != nil {
136+
// Not a pod FQDN or not in k8s — fall back to hostname resolution.
137+
return
138+
}
139+
n.raftWatchCancels.Store(raftAddr, cancel)
140+
}
141+
88142
// Add adds the specified node and its target info to the registry.
89143
func (n *StaticRegistry) Add(rangeID uint64, replicaID uint64, target string) {
90144
if n.validate != nil && !n.validate(target) {
@@ -156,7 +210,8 @@ func (n *StaticRegistry) Resolve(rangeID uint64, replicaID uint64) (string, stri
156210
if err != nil {
157211
return "", "", err
158212
}
159-
return ci.GetRaftAddress(), n.getConnectionKey(ci.GetRaftAddress(), rangeID), nil
213+
addr := n.resolveRaftAddress(ci.GetRaftAddress())
214+
return addr, n.getConnectionKey(addr, rangeID), nil
160215
}
161216

162217
// ResolveRaft returns the raft address and the connection key of the specified node.
@@ -224,6 +279,7 @@ func (n *StaticRegistry) AddNode(target, raftAddress, grpcAddress string) {
224279
grpc: grpcAddress,
225280
}
226281
n.targetAddresses.Store(target, a)
282+
n.watchRaftAddress(raftAddress)
227283
}
228284

229285
// ListNodes lists all the {NHID, raftAddress, grpcAddress} available in the
@@ -244,6 +300,12 @@ func (n *StaticRegistry) ListNodes() []*rfpb.ConnectionInfo {
244300
}
245301

246302
func (n *StaticRegistry) Close() error {
303+
n.raftWatchCancels.Range(func(key, value interface{}) bool {
304+
if cancel, ok := value.(func()); ok && cancel != nil {
305+
cancel()
306+
}
307+
return true
308+
})
247309
return nil
248310
}
249311

@@ -301,6 +363,12 @@ func NewDynamicNodeRegistry(gossipManager interfaces.GossipService, streamConnec
301363
return dnr
302364
}
303365

366+
// SetPodWatcherManager configures the underlying static registry to resolve
367+
// raft addresses via the k8s Watch API.
368+
func (d *DynamicNodeRegistry) SetPodWatcherManager(m *kuberesolver.PodWatcherManager) {
369+
d.sReg.SetPodWatcherManager(m)
370+
}
371+
304372
func (d *DynamicNodeRegistry) handleEvent(event *serf.UserEvent) {
305373
if event.Name != constants.RegistryUpdateEvent {
306374
return
@@ -462,7 +530,8 @@ func (d *DynamicNodeRegistry) Resolve(rangeID uint64, replicaID uint64) (string,
462530
if err != nil {
463531
return "", "", err
464532
}
465-
return ci.GetRaftAddress(), d.sReg.getConnectionKey(ci.GetRaftAddress(), rangeID), nil
533+
addr := d.sReg.resolveRaftAddress(ci.GetRaftAddress())
534+
return addr, d.sReg.getConnectionKey(addr, rangeID), nil
466535
}
467536

468537
// Lookup returns the connectionInfo of the specified node

enterprise/server/raft/registry/registry_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,19 @@ import (
44
"context"
55
"fmt"
66
"testing"
7+
"time"
78

89
"github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/registry"
910
"github.com/buildbuddy-io/buildbuddy/server/gossip"
1011
"github.com/buildbuddy-io/buildbuddy/server/testutil/testport"
12+
"github.com/buildbuddy-io/buildbuddy/server/util/kuberesolver"
1113
"github.com/buildbuddy-io/buildbuddy/server/util/log"
1214
"github.com/buildbuddy-io/buildbuddy/server/util/status"
1315
"github.com/stretchr/testify/require"
16+
"k8s.io/client-go/kubernetes/fake"
17+
18+
corev1 "k8s.io/api/core/v1"
19+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1420
)
1521

1622
func init() {
@@ -143,6 +149,72 @@ func TestDynamicRegistryRemoveShard(t *testing.T) {
143149
require.True(t, status.IsNotFoundError(err))
144150
}
145151

152+
func TestStaticRegistryResolveWithPodWatcher(t *testing.T) {
153+
pod := &corev1.Pod{
154+
ObjectMeta: metav1.ObjectMeta{
155+
Name: "server-0",
156+
Namespace: "ns",
157+
},
158+
Status: corev1.PodStatus{
159+
PodIP: "10.0.0.1",
160+
},
161+
}
162+
k8sClient := fake.NewClientset(pod)
163+
m := kuberesolver.NewPodWatcherManager(k8sClient)
164+
165+
nr := registry.NewStaticNodeRegistry(1, nil, log.Logger{})
166+
nr.SetPodWatcherManager(m)
167+
168+
raftAddr := "server-0.headless.ns.svc.cluster.local:7238"
169+
nr.Add(1, 1, "nhid-1")
170+
nr.AddNode("nhid-1", raftAddr, "grpcaddress:1")
171+
172+
// Resolve should return the resolved pod IP instead of the hostname.
173+
require.Eventually(t, func() bool {
174+
addr, _, err := nr.Resolve(1, 1)
175+
return err == nil && addr == "10.0.0.1:7238"
176+
}, 5*time.Second, 10*time.Millisecond, "expected Resolve to return resolved IP")
177+
178+
// Connection key should use the resolved IP, not the hostname.
179+
addr, key1, err := nr.Resolve(1, 1)
180+
require.NoError(t, err)
181+
require.Equal(t, "10.0.0.1:7238", addr)
182+
require.Contains(t, key1, "10.0.0.1:7238")
183+
require.NotContains(t, key1, "server-0.headless")
184+
185+
// ResolveGRPC is unaffected by pod watcher (resolves by NHID).
186+
grpcAddr, err := nr.ResolveGRPC(context.Background(), "nhid-1")
187+
require.NoError(t, err)
188+
require.Equal(t, "grpcaddress:1", grpcAddr)
189+
190+
// Wait for the watch to be established before updating.
191+
require.Eventually(t, func() bool {
192+
for _, a := range k8sClient.Actions() {
193+
if a.GetVerb() == "watch" {
194+
return true
195+
}
196+
}
197+
return false
198+
}, 5*time.Second, 10*time.Millisecond, "timed out waiting for watch")
199+
200+
// Simulate a pod restart with a new IP.
201+
pod.Status.PodIP = "10.0.0.2"
202+
_, err = k8sClient.CoreV1().Pods("ns").Update(
203+
context.Background(), pod, metav1.UpdateOptions{},
204+
)
205+
require.NoError(t, err)
206+
207+
// Resolve should return the new IP and a different connection key.
208+
require.Eventually(t, func() bool {
209+
addr, _, err := nr.Resolve(1, 1)
210+
return err == nil && addr == "10.0.0.2:7238"
211+
}, 5*time.Second, 10*time.Millisecond, "expected Resolve to return updated IP")
212+
213+
_, key2, err := nr.Resolve(1, 1)
214+
require.NoError(t, err)
215+
require.NotEqual(t, key1, key2, "connection key should change when IP changes")
216+
}
217+
146218
func TestDynamicRegistryResolution(t *testing.T) {
147219
node1Addr := localAddr(t)
148220
node2Addr := localAddr(t)

enterprise/server/raft/store/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ go_library(
3838
"//server/util/canary",
3939
"//server/util/disk",
4040
"//server/util/grpc_server",
41+
"//server/util/kuberesolver",
4142
"//server/util/lib/set",
4243
"//server/util/log",
4344
"//server/util/proto",

enterprise/server/raft/store/store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/buildbuddy-io/buildbuddy/server/util/canary"
5050
"github.com/buildbuddy-io/buildbuddy/server/util/disk"
5151
"github.com/buildbuddy-io/buildbuddy/server/util/grpc_server"
52+
"github.com/buildbuddy-io/buildbuddy/server/util/kuberesolver"
5253
"github.com/buildbuddy-io/buildbuddy/server/util/lib/set"
5354
"github.com/buildbuddy-io/buildbuddy/server/util/log"
5455
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
@@ -181,6 +182,9 @@ type registryHolder struct {
181182
func (rc *registryHolder) Create(nhid string, streamConnections uint64, v dbConfig.TargetValidator) (raftio.INodeRegistry, error) {
182183
nhLog := log.NamedSubLogger(nhid)
183184
r := registry.NewDynamicNodeRegistry(rc.g, streamConnections, v, nhLog)
185+
if kuberesolver.RunningInKubernetes() {
186+
r.SetPodWatcherManager(kuberesolver.DefaultManager())
187+
}
184188
rc.r = r
185189
r.AddNode(nhid, rc.raftAddr, rc.grpcAddr)
186190
return r, nil

0 commit comments

Comments
 (0)