Skip to content

Commit f0ca6ae

Browse files
authored
fix: log warning when pod ports don't match InferencePool targetPorts (#2582)
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
1 parent 6758154 commit f0ca6ae

File tree

7 files changed

+33
-27
lines changed

7 files changed

+33
-27
lines changed

pkg/epp/controller/pod_reconciler.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"fmt"
2222

23-
"github.com/go-logr/logr"
2423
corev1 "k8s.io/api/core/v1"
2524
apierrors "k8s.io/apimachinery/pkg/api/errors"
2625

@@ -58,7 +57,7 @@ func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
5857
return ctrl.Result{}, fmt.Errorf("unable to get pod - %w", err)
5958
}
6059

61-
c.updateDatastore(logger, pod)
60+
c.updateDatastore(ctx, pod)
6261
return ctrl.Result{}, nil
6362
}
6463

@@ -88,12 +87,13 @@ func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
8887
Complete(c)
8988
}
9089

91-
func (c *PodReconciler) updateDatastore(logger logr.Logger, pod *corev1.Pod) {
90+
func (c *PodReconciler) updateDatastore(ctx context.Context, pod *corev1.Pod) {
91+
logger := log.FromContext(ctx)
9292
if !podutil.IsPodReady(pod) || !c.Datastore.PoolLabelsMatch(pod.Labels) {
9393
logger.V(logutil.DEBUG).Info("Pod removed or not added")
9494
c.Datastore.PodDelete(pod.Name)
9595
} else {
96-
if c.Datastore.PodUpdateOrAddIfNotExist(pod) {
96+
if c.Datastore.PodUpdateOrAddIfNotExist(ctx, pod) {
9797
logger.V(logutil.DEFAULT).Info("Pod added")
9898
} else {
9999
logger.V(logutil.DEFAULT).Info("Pod already exists")

pkg/epp/controller/pod_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func TestPodReconciler(t *testing.T) {
207207
store := datastore.NewDatastore(t.Context(), epf, 0)
208208
_ = store.PoolSet(t.Context(), fakeClient, pool.InferencePoolToEndpointPool(test.pool))
209209
for _, pod := range test.existingPods {
210-
store.PodUpdateOrAddIfNotExist(pod)
210+
store.PodUpdateOrAddIfNotExist(t.Context(), pod)
211211
}
212212

213213
podReconciler := &PodReconciler{Reader: fakeClient, Datastore: store}

pkg/epp/datastore/datastore.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ type Datastore interface {
7878

7979
// PodList lists pods matching the given predicate.
8080
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
81-
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
81+
PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod) bool
8282
PodDelete(podName string)
8383

8484
// Clears the store state, happens when the pool gets deleted.
@@ -271,21 +271,21 @@ func (ds *datastore) PodList(predicate func(fwkdl.Endpoint) bool) []fwkdl.Endpoi
271271
return res
272272
}
273273

274-
func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
274+
func (ds *datastore) PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod) bool {
275275
// Take a reference to pool under read lock to avoid racing with PoolSet().
276276
// This is safe because PoolSet() replaces the entire pool struct rather than
277277
// updating it in-place.
278278
ds.mu.RLock()
279279
pool := ds.pool
280280
ds.mu.RUnlock()
281281

282-
return ds.podUpdateOrAddIfNotExist(pod, pool)
282+
return ds.podUpdateOrAddIfNotExist(ctx, pod, pool)
283283
}
284284

285285
// podUpdateOrAddIfNotExist is the lock-free inner implementation.
286286
// Callers must ensure pool is a consistent snapshot (either read under lock
287287
// or already held, as in podResyncAll which runs under ds.mu.Lock via PoolSet).
288-
func (ds *datastore) podUpdateOrAddIfNotExist(pod *corev1.Pod, pool *datalayer.EndpointPool) bool {
288+
func (ds *datastore) podUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod, pool *datalayer.EndpointPool) bool {
289289
if pool == nil {
290290
return true
291291
}
@@ -320,6 +320,12 @@ func (ds *datastore) podUpdateOrAddIfNotExist(pod *corev1.Pod, pool *datalayer.E
320320
})
321321
}
322322

323+
if len(pods) == 0 {
324+
logger := log.FromContext(ctx)
325+
logger.V(logutil.VERBOSE).Info("No container ports match pool targetPorts, pod will not receive traffic",
326+
"pod", pod.Name, "namespace", pod.Namespace, "targetPorts", pool.TargetPorts)
327+
}
328+
323329
result := true
324330
existingEpSet := sets.Set[types.NamespacedName]{}
325331
for _, endpointMetadata := range pods {
@@ -386,7 +392,7 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err
386392
for idx := range ds.pool.TargetPorts {
387393
activeEndpoints.Insert(createEndpointNamespacedName(&pod, idx))
388394
}
389-
if !ds.podUpdateOrAddIfNotExist(&pod, ds.pool) {
395+
if !ds.podUpdateOrAddIfNotExist(ctx, &pod, ds.pool) {
390396
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
391397
} else {
392398
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)

pkg/epp/datastore/datastore_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ func TestMetrics(t *testing.T) {
380380
ds := NewDatastore(ctx, epf, 0)
381381
_ = ds.PoolSet(ctx, fakeClient, pooltuil.InferencePoolToEndpointPool(inferencePool))
382382
for _, pod := range test.storePods {
383-
ds.PodUpdateOrAddIfNotExist(pod)
383+
ds.PodUpdateOrAddIfNotExist(ctx, pod)
384384
}
385385
time.Sleep(1 * time.Second) // Give some time for the metrics to be fetched.
386386
if test.predict == nil {
@@ -414,15 +414,15 @@ func TestPods(t *testing.T) {
414414
existingPods: []*corev1.Pod{},
415415
wantPods: []*corev1.Pod{pod1},
416416
op: func(ctx context.Context, ds Datastore) {
417-
ds.PodUpdateOrAddIfNotExist(pod1)
417+
ds.PodUpdateOrAddIfNotExist(ctx, pod1)
418418
},
419419
},
420420
{
421421
name: "Add new pod, with existing pods, should add",
422422
existingPods: []*corev1.Pod{pod1},
423423
wantPods: []*corev1.Pod{pod1, pod2},
424424
op: func(ctx context.Context, ds Datastore) {
425-
ds.PodUpdateOrAddIfNotExist(pod2)
425+
ds.PodUpdateOrAddIfNotExist(ctx, pod2)
426426
},
427427
},
428428
{
@@ -457,7 +457,7 @@ func TestPods(t *testing.T) {
457457
t.Error(err)
458458
}
459459
for _, pod := range test.existingPods {
460-
ds.PodUpdateOrAddIfNotExist(pod)
460+
ds.PodUpdateOrAddIfNotExist(ctx, pod)
461461
}
462462

463463
test.op(ctx, ds)
@@ -613,7 +613,7 @@ func TestEndpointMetadata(t *testing.T) {
613613
},
614614
},
615615
op: func(ctx context.Context, ds Datastore) {
616-
ds.PodUpdateOrAddIfNotExist(pod1)
616+
ds.PodUpdateOrAddIfNotExist(ctx, pod1)
617617
},
618618
pool: inferencePool,
619619
},
@@ -647,7 +647,7 @@ func TestEndpointMetadata(t *testing.T) {
647647
},
648648
},
649649
op: func(ctx context.Context, ds Datastore) {
650-
ds.PodUpdateOrAddIfNotExist(pod1)
650+
ds.PodUpdateOrAddIfNotExist(ctx, pod1)
651651
},
652652
pool: inferencePoolMultiTarget,
653653
},
@@ -705,7 +705,7 @@ func TestEndpointMetadata(t *testing.T) {
705705
},
706706
},
707707
op: func(ctx context.Context, ds Datastore) {
708-
ds.PodUpdateOrAddIfNotExist(pod2)
708+
ds.PodUpdateOrAddIfNotExist(ctx, pod2)
709709
},
710710
pool: inferencePoolMultiTarget,
711711
},
@@ -760,7 +760,7 @@ func TestEndpointMetadata(t *testing.T) {
760760
t.Error(err)
761761
}
762762
for _, pod := range test.existingPods {
763-
ds.PodUpdateOrAddIfNotExist(pod)
763+
ds.PodUpdateOrAddIfNotExist(ctx, pod)
764764
}
765765

766766
test.op(ctx, ds)
@@ -919,7 +919,7 @@ func TestActivePortFiltering(t *testing.T) {
919919

920920
// Add all pods
921921
for _, pod := range test.pods {
922-
ds.PodUpdateOrAddIfNotExist(pod)
922+
ds.PodUpdateOrAddIfNotExist(ctx, pod)
923923
}
924924

925925
// Check final endpoint count
@@ -1019,7 +1019,7 @@ func TestActivePortEndpointRemoval(t *testing.T) {
10191019
operations: []func(Datastore){
10201020
// Update the pod to reduce active ports from 3 to 1
10211021
func(ds Datastore) {
1022-
ds.PodUpdateOrAddIfNotExist(updatedPod1)
1022+
ds.PodUpdateOrAddIfNotExist(context.Background(), updatedPod1)
10231023
},
10241024
},
10251025
wantEndpointCount: 1, // Only port 8000 should remain active
@@ -1035,7 +1035,7 @@ func TestActivePortEndpointRemoval(t *testing.T) {
10351035
operations: []func(Datastore){
10361036
// Update the pod to have no active ports
10371037
func(ds Datastore) {
1038-
ds.PodUpdateOrAddIfNotExist(inactivePod1)
1038+
ds.PodUpdateOrAddIfNotExist(context.Background(), inactivePod1)
10391039
},
10401040
},
10411041
wantEndpointCount: 0, // No ports should remain active
@@ -1068,7 +1068,7 @@ func TestActivePortEndpointRemoval(t *testing.T) {
10681068
}
10691069

10701070
// Add the initial pod
1071-
ds.PodUpdateOrAddIfNotExist(test.initialPod)
1071+
ds.PodUpdateOrAddIfNotExist(ctx, test.initialPod)
10721072

10731073
// Wait a bit for the datastore to process the pod
10741074
time.Sleep(100 * time.Millisecond)
@@ -1150,7 +1150,7 @@ func TestPodUpdateOrAddIfNotExist_ConcurrentPoolSet(t *testing.T) {
11501150
go func() {
11511151
defer wg.Done()
11521152
for range 1000 {
1153-
ds.PodUpdateOrAddIfNotExist(pod)
1153+
ds.PodUpdateOrAddIfNotExist(ctx, pod)
11541154
}
11551155
}()
11561156

pkg/epp/metrics/collectors/inference_pool_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func TestMetricsCollected(t *testing.T) {
9797
Build()
9898

9999
_ = ds.PoolSet(context.Background(), fakeClient, poolutil.InferencePoolToEndpointPool(inferencePool))
100-
_ = ds.PodUpdateOrAddIfNotExist(pod1)
100+
_ = ds.PodUpdateOrAddIfNotExist(context.Background(), pod1)
101101

102102
time.Sleep(1 * time.Second)
103103

pkg/epp/requestcontrol/director_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ func TestDirector_HandleRequest(t *testing.T) {
641641
Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}},
642642
},
643643
}
644-
ds.PodUpdateOrAddIfNotExist(testPod)
644+
ds.PodUpdateOrAddIfNotExist(ctx, testPod)
645645
}
646646

647647
for _, test := range tests {
@@ -786,7 +786,7 @@ func TestGetRandomEndpoint(t *testing.T) {
786786
t.Errorf("unexpected error setting pool: %s", err)
787787
}
788788
for _, pod := range test.storePods {
789-
ds.PodUpdateOrAddIfNotExist(pod)
789+
ds.PodUpdateOrAddIfNotExist(context.Background(), pod)
790790
}
791791
d := &Director{datastore: ds}
792792
gotEndpoint := d.GetRandomEndpoint()

test/utils/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func PrepareForTestStreamingServer(objectives []*v1alpha2.InferenceObjective, po
6060
}
6161
for _, pod := range pods {
6262
initObjs = append(initObjs, pod)
63-
ds.PodUpdateOrAddIfNotExist(pod)
63+
ds.PodUpdateOrAddIfNotExist(ctx, pod)
6464
}
6565

6666
scheme := runtime.NewScheme()

0 commit comments

Comments
 (0)