Skip to content

Commit 4ba3cfe

Browse files
authored
fix(prefix-cache): race conditions in indexer lock management (kubernetes-sigs#2501)
Signed-off-by: Sam Batschelet <sbatschelet@gmail.com>
1 parent b503fd1 commit 4ba3cfe

File tree

2 files changed

+27
-13
lines changed

2 files changed

+27
-13
lines changed

pkg/epp/framework/plugins/scheduling/scorer/prefix/indexer.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ func newIndexer(ctx context.Context, defaultLRUSize int) *indexer {
5252
// Add adds a list of prefix hashes to the cache, tied to the server.
5353
func (i *indexer) Add(hashes []BlockHash, pod Server) {
5454
i.mu.Lock()
55+
defer i.mu.Unlock()
56+
5557
// Check if the LRU pod exist
5658
lruForPod, exists := i.podToLRU[pod.ServerID]
5759
if !exists {
@@ -64,15 +66,12 @@ func (i *indexer) Add(hashes []BlockHash, pod Server) {
6466
lruForPod = newLRU
6567
}
6668

67-
i.mu.Unlock()
68-
6969
// Add to LRU (may evict)
7070
for _, hash := range hashes {
7171
lruForPod.Add(hash, struct{}{})
7272
}
7373

74-
// Update hashToPods once under lock
75-
i.mu.Lock()
74+
// Update hashToPods
7675
for _, hash := range hashes {
7776
podIDs := i.hashToPods[hash]
7877
if podIDs == nil {
@@ -81,8 +80,6 @@ func (i *indexer) Add(hashes []BlockHash, pod Server) {
8180
podIDs[pod.ServerID] = struct{}{}
8281
i.hashToPods[hash] = podIDs
8382
}
84-
85-
i.mu.Unlock()
8683
}
8784

8885
// Get returns a set of servers that have the given prefix hash cached.
@@ -103,8 +100,6 @@ func (i *indexer) Get(hash BlockHash) podSet {
103100
// makeEvictionFn returns a per-pod LRU eviction callback that removes the pod from hashToPods on eviction.
104101
func (i *indexer) makeEvictionFn(pod ServerID) func(BlockHash, struct{}) {
105102
return func(hash BlockHash, _ struct{}) {
106-
i.mu.Lock()
107-
defer i.mu.Unlock()
108103
// Remove the pod from the hash→pods map
109104
if podSet, ok := i.hashToPods[hash]; ok {
110105
delete(podSet, pod)
@@ -156,10 +151,10 @@ func (i *indexer) reportLRUSize(ctx context.Context, interval time.Duration) {
156151

157152
// RemovePod removes a pod and its associated entries from the indexer.
158153
func (i *indexer) RemovePod(pod ServerID) {
159-
i.mu.RLock()
160-
lruCache, exists := i.podToLRU[pod]
161-
i.mu.RUnlock()
154+
i.mu.Lock()
155+
defer i.mu.Unlock()
162156

157+
lruCache, exists := i.podToLRU[pod]
163158
if !exists {
164159
return
165160
}
@@ -169,9 +164,7 @@ func (i *indexer) RemovePod(pod ServerID) {
169164
lruCache.Remove(hash)
170165
}
171166

172-
i.mu.Lock()
173167
delete(i.podToLRU, pod)
174-
i.mu.Unlock()
175168
}
176169

177170
// Pods returns the list of all pods currently tracked in the indexer.

pkg/epp/framework/plugins/scheduling/scorer/prefix/indexer_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package prefix
1818

1919
import (
2020
"context"
21+
"sync"
2122
"testing"
2223

2324
"github.com/stretchr/testify/assert"
@@ -110,3 +111,23 @@ func TestIndexer_RemovePodAndEviction(t *testing.T) {
110111
// Ensure hashToPods contains exactly indexerSize hashes (post-eviction and server2 removal)
111112
assert.Len(t, i.hashToPods, indexerSize, "hashToPods should contain %d hashes after cleanup", indexerSize)
112113
}
114+
115+
func TestIndexer_ConcurrentAddRemovePod(t *testing.T) {
116+
lruSize := 10
117+
for iter := range 100 {
118+
i := newIndexer(context.Background(), lruSize)
119+
pod := Server{ServerID: ServerID{Namespace: "default", Name: "pod1"}}
120+
121+
var wg sync.WaitGroup
122+
wg.Add(2)
123+
go func() { defer wg.Done(); i.Add([]BlockHash{1, 2, 3}, pod) }()
124+
go func() { defer wg.Done(); i.RemovePod(pod.ServerID) }()
125+
wg.Wait()
126+
127+
if _, exists := i.podToLRU[pod.ServerID]; !exists {
128+
for hash, pods := range i.hashToPods {
129+
assert.NotContains(t, pods, pod.ServerID, "iter %d: hashToPods[%v] references removed pod", iter, hash)
130+
}
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)