Skip to content

Commit 874aae8

Browse files
authored
datastore minor cleanup (kubernetes-sigs#2448)
* datastore minor cleanup Signed-off-by: Nir Rozenbaum <nrozenba@redhat.com> * minor update Signed-off-by: Nir Rozenbaum <nrozenba@redhat.com> * remove extra space Signed-off-by: Nir Rozenbaum <nrozenba@redhat.com> --------- Signed-off-by: Nir Rozenbaum <nrozenba@redhat.com>
1 parent 4ba3cfe commit 874aae8

File tree

3 files changed

+43
-46
lines changed

3 files changed

+43
-46
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,8 +439,7 @@ func setupDatastore(ctx context.Context, epFactory datalayer.EndpointFactory, mo
439439
setupLog.Error(err, "Failed to construct endpoint pool from options")
440440
return nil, err
441441
}
442-
endpointPoolOption := datastore.WithEndpointPool(endpointPool)
443-
return datastore.NewDatastore(ctx, epFactory, modelServerMetricsPort, endpointPoolOption), nil
442+
return datastore.NewDatastore(ctx, epFactory, modelServerMetricsPort).WithEndpointPool(endpointPool), nil
444443
}
445444
}
446445

pkg/epp/datastore/datastore.go

Lines changed: 36 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ import (
3030
"k8s.io/apimachinery/pkg/labels"
3131
"k8s.io/apimachinery/pkg/types"
3232
"k8s.io/apimachinery/pkg/util/sets"
33-
3433
"sigs.k8s.io/controller-runtime/pkg/client"
3534
"sigs.k8s.io/controller-runtime/pkg/log"
35+
3636
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
3737
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/observability/logging"
3838
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
@@ -46,6 +46,13 @@ var (
4646
AllPodsPredicate = func(_ fwkdl.Endpoint) bool { return true }
4747
)
4848

49+
const (
50+
// activePortsAnnotation is used to specify which ports on a pod should be considered
51+
// as active for inference traffic. The value should be a comma-separated list of port numbers.
52+
// Example: "8000,8001,8002"
53+
activePortsAnnotation = "inference.networking.k8s.io/active-ports"
54+
)
55+
4956
// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
5057
type Datastore interface {
5158
// InferencePool operations
@@ -78,11 +85,14 @@ type Datastore interface {
7885
Clear()
7986
}
8087

88+
// compile-time type assertion
89+
var _ Datastore = &datastore{}
90+
8191
// NewDatastore creates a new data store.
8292
// TODO: modelServerMetricsPort is being deprecated
83-
func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, opts ...DatastoreOption) Datastore {
93+
func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32) *datastore {
8494
// Initialize with defaults
85-
store := &datastore{
95+
return &datastore{
8696
parentCtx: parentCtx,
8797
pool: nil,
8898
mu: sync.RWMutex{},
@@ -92,13 +102,6 @@ func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory
92102
modelServerMetricsPort: modelServerMetricsPort,
93103
epf: epFactory,
94104
}
95-
96-
// Apply options
97-
for _, opt := range opts {
98-
opt(store)
99-
}
100-
101-
return store
102105
}
103106

104107
type datastore struct {
@@ -119,6 +122,11 @@ type datastore struct {
119122
epf datalayer.EndpointFactory
120123
}
121124

125+
func (ds *datastore) WithEndpointPool(pool *datalayer.EndpointPool) *datastore {
126+
ds.pool = pool
127+
return ds
128+
}
129+
122130
func (ds *datastore) Clear() {
123131
ds.mu.Lock()
124132
defer ds.mu.Unlock()
@@ -278,7 +286,7 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
278286
modelServerMetricsPort = int(ds.modelServerMetricsPort)
279287
}
280288
pods := []*fwkdl.EndpointMetadata{}
281-
activePorts := extractActivePorts(pod, ds.pool.TargetPorts)
289+
activePorts := ds.extractActivePorts(pod)
282290
for idx, port := range ds.pool.TargetPorts {
283291
if !activePorts.Has(port) {
284292
continue
@@ -386,12 +394,25 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err
386394
return nil
387395
}
388396

389-
type DatastoreOption func(*datastore)
397+
// extractActivePorts extracts the active ports from a pod's annotations.
398+
func (ds *datastore) extractActivePorts(pod *corev1.Pod) sets.Set[int] {
399+
allPorts := sets.New(ds.pool.TargetPorts...)
400+
annotations := pod.GetAnnotations()
401+
portsAnnotation, ok := annotations[activePortsAnnotation]
402+
if !ok {
403+
return allPorts
404+
}
390405

391-
func WithEndpointPool(pool *datalayer.EndpointPool) DatastoreOption {
392-
return func(d *datastore) {
393-
d.pool = pool
406+
activePorts := sets.New[int]()
407+
portStrs := strings.Split(portsAnnotation, ",")
408+
for _, portStr := range portStrs {
409+
var portNum int
410+
_, err := fmt.Sscanf(strings.TrimSpace(portStr), "%d", &portNum)
411+
if err == nil && portNum > 0 && allPorts.Has(portNum) {
412+
activePorts.Insert(portNum)
413+
}
394414
}
415+
return activePorts
395416
}
396417

397418
// createEndpointNamespacedName creates a namespaced name for an endpoint based on pod and rank index.
@@ -402,28 +423,3 @@ func createEndpointNamespacedName(pod *corev1.Pod, idx int) types.NamespacedName
402423
Namespace: pod.Namespace,
403424
}
404425
}
405-
406-
// activePortsAnnotation is used to specify which ports on a pod should be considered
407-
// as active for inference traffic. The value should be a comma-separated list of port numbers.
408-
// Example: "8000,8001,8002"
409-
const activePortsAnnotation = "inference.networking.k8s.io/active-ports"
410-
411-
// extractActivePorts extracts the active ports from a pod's annotations.
412-
func extractActivePorts(pod *corev1.Pod, validPorts []int) sets.Set[int] {
413-
activePorts := sets.New[int]()
414-
allPorts := sets.New(validPorts...)
415-
annotations := pod.GetAnnotations()
416-
if portsAnnotation, ok := annotations[activePortsAnnotation]; ok {
417-
portStrs := strings.Split(portsAnnotation, ",")
418-
for _, portStr := range portStrs {
419-
var portNum int
420-
_, err := fmt.Sscanf(strings.TrimSpace(portStr), "%d", &portNum)
421-
if err == nil && portNum > 0 && allPorts.Has(portNum) {
422-
activePorts.Insert(portNum)
423-
}
424-
}
425-
return activePorts
426-
} else {
427-
return allPorts
428-
}
429-
}

pkg/epp/datastore/datastore_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,10 +1214,12 @@ func TestExtractActivePorts(t *testing.T) {
12141214

12151215
for _, tt := range tests {
12161216
t.Run(tt.name, func(t *testing.T) {
1217-
ports := extractActivePorts(tt.pod, tt.validPorts)
1218-
1219-
if !reflect.DeepEqual(ports, tt.expectedPorts) {
1220-
t.Errorf("ExtractActivePorts() ports = %v, want %v", ports, tt.expectedPorts)
1217+
ds := NewDatastore(context.Background(), nil, 0).WithEndpointPool(&datalayer.EndpointPool{
1218+
TargetPorts: tt.validPorts,
1219+
})
1220+
activePorts := ds.extractActivePorts(tt.pod)
1221+
if !reflect.DeepEqual(activePorts, tt.expectedPorts) {
1222+
t.Errorf("ExtractActivePorts() ports = %v, want %v", activePorts, tt.expectedPorts)
12211223
}
12221224
})
12231225
}

0 commit comments

Comments
 (0)