@@ -66,137 +66,134 @@ func parsePodTarget(endpoint string) (podTarget, error) {
6666 }, nil
6767}
6868
69- type kubeResolverBuilder struct {
70- mu sync. Mutex
71- client kubernetes. Interface
69+ type podResolution struct {
70+ pod * corev1. Pod
71+ err error
7272}
7373
74- func (b * kubeResolverBuilder ) Build (target resolver.Target , cc resolver.ClientConn , opts resolver.BuildOptions ) (resolver.Resolver , error ) {
75- endpoint := target .Endpoint ()
76- pt , err := parsePodTarget (endpoint )
77- if err != nil {
78- return nil , err
79- }
80-
81- b .mu .Lock ()
82- defer b .mu .Unlock ()
83- if b .client == nil {
84- config , err := rest .InClusterConfig ()
85- if err != nil {
86- return nil , fmt .Errorf ("could not create k8s client: %w" , err )
87- }
88- b .client , err = kubernetes .NewForConfig (config )
89- if err != nil {
90- return nil , fmt .Errorf ("could not create k8s client: %w" , err )
91- }
92- }
74+ // podWatcher owns a single Get+Watch loop for a specific (namespace, podName)
75+ // pair. Multiple kubeResolvers can subscribe to the same podWatcher.
76+ type podWatcher struct {
77+ client kubernetes.Interface
78+ namespace string
79+ podName string
9380
94- ctx , cancel := context .WithCancel (context .Background ())
95- r := & kubeResolver {
96- cc : cc ,
97- client : b .client ,
98- podTarget : pt ,
99- ctx : ctx ,
100- cancel : cancel ,
101- resolveCh : make (chan struct {}, 1 ),
102- }
81+ // Only accessed from the watch goroutine.
82+ resourceVersion string
10383
104- go r .watch ()
105- return r , nil
84+ mu sync.Mutex
85+ subscribers map [* kubeResolver ]func (podResolution )
86+ cancel context.CancelFunc
87+ latestResolution * podResolution
10688}
10789
108- func (b * kubeResolverBuilder ) Scheme () string {
109- return scheme
90+ func newPodWatcher (ctx context.Context , client kubernetes.Interface , namespace , podName string ) * podWatcher {
91+ ctx , cancel := context .WithCancel (ctx )
92+ pw := & podWatcher {
93+ client : client ,
94+ namespace : namespace ,
95+ podName : podName ,
96+ subscribers : make (map [* kubeResolver ]func (podResolution )),
97+ cancel : cancel ,
98+ }
99+ go pw .watch (ctx )
100+ return pw
110101}
111102
112- type kubeResolver struct {
113- cc resolver.ClientConn
114- client kubernetes.Interface
115- podTarget podTarget
116- ctx context.Context
117- cancel context.CancelFunc
118- resolveCh chan struct {}
119-
120- // resourceVersion is the last known resource version from a Get or Watch
121- // event. It is used to resume watches without missing events.
122- // Only accessed from the watch goroutine, so no mutex is needed.
123- resourceVersion string
103+ // subscribe adds a subscriber with callback cb. cb will be called immediately
104+ // with the latestResolution known state (pod or error) and then whenever the pod state
105+ // changes. cb must not block.
106+ func (pw * podWatcher ) subscribe (r * kubeResolver , cb func (podResolution )) {
107+ pw .mu .Lock ()
108+ pw .subscribers [r ] = cb
109+ if pw .latestResolution != nil {
110+ cb (* pw .latestResolution )
111+ }
112+ pw .mu .Unlock ()
124113}
125114
126- func (r * kubeResolver ) resolve () {
127- log .Infof ("Resolving pod %s/%s" , r .podTarget .namespace , r .podTarget .podName )
128- pod , err := r .client .CoreV1 ().Pods (r .podTarget .namespace ).Get (r .ctx , r .podTarget .podName , metav1.GetOptions {})
129- if err != nil {
130- log .Warningf ("Failed to get pod %s/%s from k8s: %s" , r .podTarget .namespace , r .podTarget .podName , err )
131- r .cc .ReportError (fmt .Errorf ("failed to get pod: %w" , err ))
132- return
133- }
134- r .updateStateFromPod (pod )
135- r .resourceVersion = pod .ResourceVersion
115+ // unsubscribe removes a subscriber. It returns true if no subscribers remain.
116+ func (pw * podWatcher ) unsubscribe (r * kubeResolver ) bool {
117+ pw .mu .Lock ()
118+ defer pw .mu .Unlock ()
119+ delete (pw .subscribers , r )
120+ return len (pw .subscribers ) == 0
136121}
137122
138- func (r * kubeResolver ) updateStateFromPod (pod * corev1.Pod ) {
139- ip := pod .Status .PodIP
140- if ip == "" {
141- log .Infof ("Pod %s/%s doesn't have an IP yet" , r .podTarget .namespace , r .podTarget .podName )
142- // Setting an empty address list will return an error, but it should
143- // still cause the balancer to close any existing connections.
144- _ = r .cc .UpdateState (resolver.State {Addresses : nil })
145- return
123+ func (pw * podWatcher ) notifySubscribers (r podResolution ) {
124+ pw .mu .Lock ()
125+ pw .latestResolution = & r
126+ callbacks := make ([]func (podResolution ), 0 , len (pw .subscribers ))
127+ for _ , cb := range pw .subscribers {
128+ callbacks = append (callbacks , cb )
146129 }
130+ pw .mu .Unlock ()
147131
148- addr := ip
149- if r .podTarget .port != "" {
150- addr = ip + ":" + r .podTarget .port
132+ for _ , cb := range callbacks {
133+ cb (r )
151134 }
135+ }
152136
153- log .Infof ("Pod %s/%s resolved to IP %s" , r .podTarget .namespace , r .podTarget .podName , ip )
154-
155- err := r .cc .UpdateState (resolver.State {
156- Addresses : []resolver.Address {{Addr : addr }},
157- })
137+ // resolve does a one-shot Get and notifies subscribers. Returns true on success.
138+ func (pw * podWatcher ) resolve (ctx context.Context ) bool {
139+ log .Infof ("Resolving pod %s/%s" , pw .namespace , pw .podName )
140+ pod , err := pw .client .CoreV1 ().Pods (pw .namespace ).Get (ctx , pw .podName , metav1.GetOptions {})
158141 if err != nil {
159- log .Warningf ("failed to update state: %s" , err )
142+ log .Warningf ("Failed to get pod %s/%s from k8s: %s" , pw .namespace , pw .podName , err )
143+ pw .notifySubscribers (podResolution {err : fmt .Errorf ("failed to get pod: %w" , err )})
144+ return false
160145 }
146+ pw .notifySubscribers (podResolution {pod : pod })
147+ pw .resourceVersion = pod .ResourceVersion
148+ return true
161149}
162150
163- func (r * kubeResolver ) watch () {
164- // Do an initial resolve before subscribing to async updates.
165- r .resolve ()
166-
151+ func (pw * podWatcher ) watch (ctx context.Context ) {
167152 backoff := time .Second
168153 maxBackoff := 30 * time .Second
169154
155+ // Resolve the pod before starting the watch loop. Retry until it
156+ // succeeds so that the watch has a valid resourceVersion to start from.
157+ for ! pw .resolve (ctx ) {
158+ select {
159+ case <- time .After (backoff ):
160+ backoff = min (backoff * 2 , maxBackoff )
161+ case <- ctx .Done ():
162+ return
163+ }
164+ }
165+ backoff = time .Second
166+
170167 for {
171168 select {
172- case <- r . ctx .Done ():
169+ case <- ctx .Done ():
173170 return
174171 default :
175172 }
176173
177- log .Infof ("Watching pod %s/%s for updates." , r . podTarget . namespace , r . podTarget .podName )
178- watcher , err := r .client .CoreV1 ().Pods (r . podTarget . namespace ).Watch (r . ctx , metav1.ListOptions {
179- FieldSelector : "metadata.name=" + r . podTarget .podName ,
180- ResourceVersion : r .resourceVersion ,
174+ log .Infof ("Watching pod %s/%s for updates." , pw . namespace , pw .podName )
175+ watcher , err := pw .client .CoreV1 ().Pods (pw . namespace ).Watch (ctx , metav1.ListOptions {
176+ FieldSelector : "metadata.name=" + pw .podName ,
177+ ResourceVersion : pw .resourceVersion ,
181178 })
182179 if err != nil {
183- log .Warningf ("Failed to watch pod %s/%s: %s" , r . podTarget . namespace , r . podTarget .podName , err )
180+ log .Warningf ("Failed to watch pod %s/%s: %s" , pw . namespace , pw .podName , err )
184181 select {
185182 case <- time .After (backoff ):
186183 backoff = min (backoff * 2 , maxBackoff )
187- case <- r . ctx .Done ():
184+ case <- ctx .Done ():
188185 return
189186 }
190187 continue
191188 }
192189
193- if err := r .processWatchEvents (watcher ); err != nil {
190+ if err := pw .processWatchEvents (ctx , watcher ); err != nil {
191+ log .Warningf ("Watch for pod %s/%s ended abruptly: %s" , pw .namespace , pw .podName , err )
194192 watcher .Stop ()
195- log .Warningf ("Watch for pod %s/%s ended abruptly: %s" , r .podTarget .namespace , r .podTarget .podName , err )
196193 select {
197194 case <- time .After (backoff ):
198195 backoff = min (backoff * 2 , maxBackoff )
199- case <- r . ctx .Done ():
196+ case <- ctx .Done ():
200197 return
201198 }
202199 continue
@@ -209,18 +206,18 @@ func (r *kubeResolver) watch() {
209206
210207// processWatchEvents processes events received from the watch stream.
211208// A non-nil error is returned when the watch ends abruptly.
212- func (r * kubeResolver ) processWatchEvents (watcher watch.Interface ) error {
209+ func (pw * podWatcher ) processWatchEvents (ctx context. Context , watcher watch.Interface ) error {
213210 for {
214211 select {
215- case <- r . ctx .Done ():
212+ case <- ctx .Done ():
216213 return nil
217214 case event , ok := <- watcher .ResultChan ():
218215 if ! ok {
219216 return fmt .Errorf ("watch channel closed" )
220217 }
221218 if obj , ok := event .Object .(metav1.ObjectMetaAccessor ); ok {
222219 if rv := obj .GetObjectMeta ().GetResourceVersion (); rv != "" {
223- r .resourceVersion = rv
220+ pw .resourceVersion = rv
224221 }
225222 }
226223 switch event .Type {
@@ -229,32 +226,139 @@ func (r *kubeResolver) processWatchEvents(watcher watch.Interface) error {
229226 if ! ok {
230227 continue
231228 }
232- r . updateStateFromPod ( pod )
229+ pw . notifySubscribers ( podResolution { pod : pod } )
233230 case watch .Deleted :
234- log .Warningf ("Pod %s/%s was deleted" , r . podTarget . namespace , r . podTarget .podName )
235- r . cc . ReportError ( fmt .Errorf ("pod %s/%s was deleted" , r . podTarget . namespace , r . podTarget . podName ))
231+ log .Warningf ("Pod %s/%s was deleted" , pw . namespace , pw .podName )
232+ pw . notifySubscribers ( podResolution { err : fmt .Errorf ("pod %s/%s was deleted" , pw . namespace , pw . podName )} )
236233 case watch .Bookmark :
237234 // No-op: bookmark events are informational.
238235 case watch .Error :
239236 err := errors .FromObject (event .Object )
240- log .Warningf ("Watch error for pod %s/%s: %s" , r . podTarget . namespace , r . podTarget .podName , err )
237+ log .Warningf ("Watch error for pod %s/%s: %s" , pw . namespace , pw .podName , err )
241238 return fmt .Errorf ("watch error: %w" , err )
242239 }
243- case <- r .resolveCh :
244- r .resolve ()
245240 }
246241 }
247242}
248243
249- func (r * kubeResolver ) ResolveNow (_ resolver.ResolveNowOptions ) {
250- select {
251- case r .resolveCh <- struct {}{}:
252- default :
244+ type kubeResolverBuilder struct {
245+ mu sync.Mutex
246+ client kubernetes.Interface
247+ watchers map [string ]* podWatcher
248+ }
249+
250+ func watcherKey (namespace , podName string ) string {
251+ return namespace + "/" + podName
252+ }
253+
254+ func (b * kubeResolverBuilder ) Build (target resolver.Target , cc resolver.ClientConn , opts resolver.BuildOptions ) (resolver.Resolver , error ) {
255+ endpoint := target .Endpoint ()
256+ pt , err := parsePodTarget (endpoint )
257+ if err != nil {
258+ return nil , err
259+ }
260+
261+ b .mu .Lock ()
262+ defer b .mu .Unlock ()
263+ if b .client == nil {
264+ config , err := rest .InClusterConfig ()
265+ if err != nil {
266+ return nil , fmt .Errorf ("could not create k8s client: %w" , err )
267+ }
268+ b .client , err = kubernetes .NewForConfig (config )
269+ if err != nil {
270+ return nil , fmt .Errorf ("could not create k8s client: %w" , err )
271+ }
253272 }
273+
274+ // gRPC creates one resolver per connection.
275+ // To avoid making redundant calls to the k8s APIs we create a single
276+ // watcher for each namespace/pod pair.
277+ key := watcherKey (pt .namespace , pt .podName )
278+ pw := b .watchers [key ]
279+ if pw == nil {
280+ if b .watchers == nil {
281+ b .watchers = make (map [string ]* podWatcher )
282+ }
283+ pw = newPodWatcher (context .Background (), b .client , pt .namespace , pt .podName )
284+ b .watchers [key ] = pw
285+ }
286+
287+ r := & kubeResolver {
288+ cc : cc ,
289+ podTarget : pt ,
290+ watcher : pw ,
291+ builder : b ,
292+ }
293+
294+ pw .subscribe (r , func (result podResolution ) {
295+ if result .err != nil {
296+ r .cc .ReportError (result .err )
297+ return
298+ }
299+ r .updateStateFromPod (result .pod )
300+ })
301+
302+ return r , nil
303+ }
304+
305+ func (b * kubeResolverBuilder ) Scheme () string {
306+ return scheme
307+ }
308+
309+ // removeWatcher unsubscribes a resolver from its podWatcher and cleans up
310+ // the watcher if no subscribers remain.
311+ func (b * kubeResolverBuilder ) removeWatcher (r * kubeResolver ) {
312+ b .mu .Lock ()
313+ defer b .mu .Unlock ()
314+ pw := r .watcher
315+ if pw .unsubscribe (r ) {
316+ pw .cancel ()
317+ delete (b .watchers , watcherKey (pw .namespace , pw .podName ))
318+ }
319+ }
320+
321+ type kubeResolver struct {
322+ cc resolver.ClientConn
323+ podTarget podTarget
324+ watcher * podWatcher
325+ builder * kubeResolverBuilder
326+ }
327+
328+ func (r * kubeResolver ) updateStateFromPod (pod * corev1.Pod ) {
329+ ip := pod .Status .PodIP
330+ // This will happen when a pod restarts.
331+ if ip == "" {
332+ log .Infof ("Pod %s/%s doesn't have an IP yet" , r .podTarget .namespace , r .podTarget .podName )
333+ // If we previously reported an IP, reporting an error should cause
334+ // the balancer to close existing connections to the old IP.
335+ r .cc .ReportError (fmt .Errorf ("pod %s/%s doesn't have an IP yet" , r .podTarget .namespace , r .podTarget .podName ))
336+ return
337+ }
338+
339+ addr := ip
340+ if r .podTarget .port != "" {
341+ addr = ip + ":" + r .podTarget .port
342+ }
343+
344+ log .Infof ("Pod %s/%s resolved to IP %s" , r .podTarget .namespace , r .podTarget .podName , ip )
345+
346+ err := r .cc .UpdateState (resolver.State {
347+ Addresses : []resolver.Address {{Addr : addr }},
348+ })
349+ if err != nil {
350+ log .Warningf ("failed to update state: %s" , err )
351+ }
352+ }
353+
354+ func (r * kubeResolver ) ResolveNow (_ resolver.ResolveNowOptions ) {
355+ // Per the documentation, ResolveNow is a hint.
356+ // We already resolve and watch the target for changes as soon as the
357+ // resolver is created so there's no benefit in doing anything here.
254358}
255359
256360func (r * kubeResolver ) Close () {
257- r .cancel ( )
361+ r .builder . removeWatcher ( r )
258362}
259363
260364func init () {
0 commit comments