Skip to content

Commit 296662a

Browse files
authored
Add tests to improve coverage for worker heartbeat implementation. (temporalio#8326)
## What changed? Tests for the following cases: 1. Cache eviction logic. 2. ListWorkers API with query predicate. ## Why? Improve coverage, ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) ## Potential risks None.
1 parent 4adcdbf commit 296662a

File tree

2 files changed

+188
-3
lines changed

2 files changed

+188
-3
lines changed

service/matching/workers/registry_impl_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"math/rand"
66
"testing"
7+
"testing/synctest"
78
"time"
89

910
"github.com/stretchr/testify/assert"
@@ -112,6 +113,66 @@ func TestEvictByCapacity(t *testing.T) {
112113
assert.LessOrEqual(t, m.total.Load(), maxItems, "total counter should not exceed maxItems")
113114
}
114115

116+
// Tests the critical edge case where evictByCapacity() cannot evict any entries because they're all
117+
// protected by minEvictAge. This verifies that we do not keep checking the same entries repeatedly
118+
// when there is no space.
119+
func TestEvictByCapacityWithMinAgeProtection(t *testing.T) {
120+
maxItems := int64(2)
121+
minEvictAge := 5 * time.Second
122+
m := newRegistryImpl(1, time.Hour, minEvictAge, maxItems, time.Hour)
123+
defer m.Stop()
124+
125+
// Add 3 entries (over capacity) - all will be "new" (< minEvictAge)
126+
for i := 1; i <= 3; i++ {
127+
key := fmt.Sprintf("worker%d", i)
128+
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
129+
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
130+
}
131+
132+
// Verify we're over capacity
133+
assert.Equal(t, int64(3), m.total.Load(), "should have 3 entries initially")
134+
assert.Greater(t, m.total.Load(), maxItems, "should be over capacity")
135+
136+
// Attempt eviction - should break because all entries are too new
137+
m.evictByCapacity()
138+
139+
// All entries should still be there (protected by minEvictAge)
140+
workers := m.filterWorkers("ns", alwaysTrue)
141+
assert.Len(t, workers, 3, "all entries should be protected by minEvictAge")
142+
assert.Equal(t, int64(3), m.total.Load(), "should still exceed maxItems due to protection")
143+
}
144+
145+
// Tests that entries can be evicted once they exceed minEvictAge.
146+
func TestEvictByCapacityAfterMinAge(t *testing.T) {
147+
synctest.Test(t, func(t *testing.T) {
148+
maxItems := int64(2)
149+
minEvictAge := 100 * time.Millisecond // Short age for test
150+
151+
// Uses real time.NewTicker - synctest provides virtual time control
152+
m := newRegistryImpl(1, time.Hour, minEvictAge, maxItems, time.Hour)
153+
defer m.Stop()
154+
155+
// Add 3 entries (over capacity)
156+
for i := 1; i <= 3; i++ {
157+
key := fmt.Sprintf("worker%d", i)
158+
hb := &workerpb.WorkerHeartbeat{WorkerInstanceKey: key}
159+
m.upsertHeartbeats("ns", []*workerpb.WorkerHeartbeat{hb})
160+
}
161+
162+
// Virtual time advance - instant with synctest!
163+
// nolint:forbidigo
164+
time.Sleep(300 * time.Millisecond)
165+
166+
// Now eviction should work
167+
m.evictByCapacity()
168+
169+
// Should have evicted down to maxItems
170+
workers := m.filterWorkers("ns", alwaysTrue)
171+
assert.LessOrEqual(t, len(workers), int(maxItems), "should evict down to maxItems")
172+
assert.LessOrEqual(t, m.total.Load(), maxItems, "total should be within limits")
173+
})
174+
}
175+
115176
func BenchmarkUpdate(b *testing.B) {
116177
m := newRegistryImpl(16, time.Hour, time.Minute, int64(b.N), time.Hour)
117178
defer m.Stop()

service/matching/workers/registry_test.go

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ func TestRegistryImpl_RecordWorkerHeartbeat(t *testing.T) {
6666
for _, tt := range tests {
6767
t.Run(tt.name, func(t *testing.T) {
6868
r := newRegistryImpl(
69-
defaultBuckets, defaultEntryTTL, defaultMinEvictAge, defaultMaxEntries, defaultEvictionInterval,
69+
defaultBuckets, defaultEntryTTL, defaultMinEvictAge, defaultMaxEntries,
70+
defaultEvictionInterval,
7071
)
7172
tt.setup(r)
7273

@@ -164,7 +165,8 @@ func TestRegistryImpl_ListWorkers(t *testing.T) {
164165
for _, tt := range tests {
165166
t.Run(tt.name, func(t *testing.T) {
166167
r := newRegistryImpl(
167-
defaultBuckets, defaultEntryTTL, defaultMinEvictAge, defaultMaxEntries, defaultEvictionInterval,
168+
defaultBuckets, defaultEntryTTL, defaultMinEvictAge, defaultMaxEntries,
169+
defaultEvictionInterval,
168170
)
169171
tt.setup(r)
170172

@@ -195,6 +197,127 @@ func TestRegistryImpl_ListWorkers(t *testing.T) {
195197
}
196198
}
197199

200+
// Exercises the query matching functionality of ListWorkers.
201+
func TestRegistryImpl_ListWorkersWithQuery(t *testing.T) {
202+
tests := []struct {
203+
name string
204+
setup func(*registryImpl)
205+
nsID namespace.ID
206+
query string
207+
expectedCount int
208+
expectedWorkers []string // WorkerInstanceKeys
209+
expectedError string // Expected error message (empty if no error expected)
210+
}{
211+
{
212+
name: "valid query - basic filtering",
213+
setup: func(r *registryImpl) {
214+
r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{
215+
{WorkerInstanceKey: "worker1", TaskQueue: "queue1"},
216+
{WorkerInstanceKey: "worker2", TaskQueue: "queue2"},
217+
})
218+
},
219+
nsID: "namespace1",
220+
query: "WorkerInstanceKey = 'worker1'",
221+
expectedCount: 1,
222+
expectedWorkers: []string{"worker1"},
223+
},
224+
{
225+
name: "valid compound query - multiple conditions",
226+
setup: func(r *registryImpl) {
227+
r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{
228+
{WorkerInstanceKey: "worker1", TaskQueue: "queue1"},
229+
{WorkerInstanceKey: "worker2", TaskQueue: "queue2"},
230+
})
231+
},
232+
nsID: "namespace1",
233+
query: "WorkerInstanceKey = 'worker1' AND TaskQueue = 'queue1'",
234+
expectedCount: 1,
235+
expectedWorkers: []string{"worker1"},
236+
},
237+
{
238+
name: "valid query - no matches",
239+
setup: func(r *registryImpl) {
240+
r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{
241+
{WorkerInstanceKey: "worker1", TaskQueue: "queue1"},
242+
})
243+
},
244+
nsID: "namespace1",
245+
query: "TaskQueue = 'non-existent-queue'",
246+
expectedCount: 0,
247+
expectedWorkers: []string{},
248+
},
249+
{
250+
name: "invalid query - malformed SQL",
251+
setup: func(r *registryImpl) {
252+
r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{
253+
{WorkerInstanceKey: "worker1"},
254+
})
255+
},
256+
nsID: "namespace1",
257+
query: "invalid SQL syntax here",
258+
expectedError: "malformed query",
259+
},
260+
{
261+
name: "query on empty namespace",
262+
setup: func(r *registryImpl) {
263+
// No workers added
264+
},
265+
nsID: "empty-namespace",
266+
query: "WorkerInstanceKey = 'worker1'",
267+
expectedCount: 0,
268+
expectedWorkers: []string{},
269+
},
270+
{
271+
name: "query returns requested namespace only",
272+
setup: func(r *registryImpl) {
273+
// Add workers to namespace1
274+
r.upsertHeartbeats("namespace1", []*workerpb.WorkerHeartbeat{
275+
{WorkerInstanceKey: "worker1", TaskQueue: "queue"},
276+
})
277+
// Add workers to namespace2
278+
r.upsertHeartbeats("namespace2", []*workerpb.WorkerHeartbeat{
279+
{WorkerInstanceKey: "worker2", TaskQueue: "queue"},
280+
})
281+
},
282+
nsID: "namespace1",
283+
query: "TaskQueue = 'queue'",
284+
expectedCount: 1,
285+
expectedWorkers: []string{"worker1"}, // Only worker1, not worker2 from namespace2
286+
},
287+
}
288+
289+
for _, tt := range tests {
290+
t.Run(tt.name, func(t *testing.T) {
291+
r := newRegistryImpl(
292+
defaultBuckets, defaultEntryTTL, defaultMinEvictAge, defaultMaxEntries,
293+
defaultEvictionInterval,
294+
)
295+
tt.setup(r)
296+
297+
result, err := r.ListWorkers(tt.nsID, tt.query, nil)
298+
299+
if tt.expectedError != "" {
300+
assert.Error(t, err, "expected an error for invalid query")
301+
assert.Contains(t, err.Error(), tt.expectedError, "error message should contain expected text")
302+
assert.Nil(t, result, "result should be nil when an error occurs")
303+
return
304+
}
305+
306+
assert.NoError(t, err, "unexpected error when listing workers with query")
307+
assert.Len(t, result, tt.expectedCount, "unexpected number of workers returned")
308+
309+
// Check that all expected workers are present
310+
if tt.expectedCount > 0 {
311+
actualWorkers := make([]string, len(result))
312+
for i, worker := range result {
313+
actualWorkers[i] = worker.WorkerInstanceKey
314+
}
315+
assert.ElementsMatch(t, tt.expectedWorkers, actualWorkers, "worker lists don't match")
316+
}
317+
})
318+
}
319+
}
320+
198321
func TestRegistryImpl_DescribeWorker(t *testing.T) {
199322
tests := []struct {
200323
name string
@@ -270,7 +393,8 @@ func TestRegistryImpl_DescribeWorker(t *testing.T) {
270393
for _, tt := range tests {
271394
t.Run(tt.name, func(t *testing.T) {
272395
r := newRegistryImpl(
273-
defaultBuckets, defaultEntryTTL, defaultMinEvictAge, defaultMaxEntries, defaultEvictionInterval,
396+
defaultBuckets, defaultEntryTTL, defaultMinEvictAge, defaultMaxEntries,
397+
defaultEvictionInterval,
274398
)
275399
tt.setup(r)
276400

0 commit comments

Comments
 (0)