Skip to content

Commit 3d4bf8f

Browse files
authored
fix(timanager): fix lost event issue (#6572) (#6575)
1 parent cee261f commit 3d4bf8f

File tree

10 files changed

+327
-53
lines changed

10 files changed

+327
-53
lines changed

pkg/timanager/manager.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ type ManagerBuilder[Object client.Object, UnderlayClient, Client any] interface
6969
WithNewUnderlayClientFunc(f NewUnderlayClientFunc[Object, UnderlayClient]) ManagerBuilder[Object, UnderlayClient, Client]
7070
WithNewClientFunc(f NewClientFunc[Object, UnderlayClient, Client]) ManagerBuilder[Object, UnderlayClient, Client]
7171
WithNewPollerFunc(obj runtime.Object, f NewPollerFunc[UnderlayClient]) ManagerBuilder[Object, UnderlayClient, Client]
72+
WithResyncPeriod(time.Duration) ManagerBuilder[Object, UnderlayClient, Client]
73+
7274
Build() Manager[Object, Client]
7375
}
7476

@@ -77,6 +79,7 @@ type builder[Object client.Object, UnderlayClient, Client any] struct {
7779
newUnderlayClientFunc NewUnderlayClientFunc[Object, UnderlayClient]
7880
newClientFunc NewClientFunc[Object, UnderlayClient, Client]
7981
cacheKeysFunc CacheKeysFunc[Object]
82+
resyncPeriod time.Duration
8083

8184
newPollerFuncMap map[reflect.Type]NewPollerFunc[UnderlayClient]
8285
}
@@ -122,6 +125,11 @@ func (b *builder[Object, UnderlayClient, Client]) WithNewPollerFunc(
122125
return b
123126
}
124127

128+
func (b *builder[Object, UnderlayClient, Client]) WithResyncPeriod(d time.Duration) ManagerBuilder[Object, UnderlayClient, Client] {
129+
b.resyncPeriod = d
130+
return b
131+
}
132+
125133
func (b *builder[Object, UnderlayClient, Client]) Build() Manager[Object, Client] {
126134
s := runtime.NewScheme()
127135
if err := pdv1.Install(s); err != nil {
@@ -136,6 +144,7 @@ func (b *builder[Object, UnderlayClient, Client]) Build() Manager[Object, Client
136144
cacheKeysFunc: b.cacheKeysFunc,
137145
newPollerFuncMap: b.newPollerFuncMap,
138146
sources: map[reflect.Type][]EventSource{},
147+
resyncPeriod: b.resyncPeriod,
139148
}
140149
}
141150

@@ -152,6 +161,8 @@ type clientManager[Object client.Object, UnderlayClient, Client any] struct {
152161
newPollerFuncMap map[reflect.Type]NewPollerFunc[UnderlayClient]
153162
sources map[reflect.Type][]EventSource
154163

164+
resyncPeriod time.Duration
165+
155166
ctx context.Context
156167
started bool
157168
}
@@ -184,7 +195,7 @@ func (m *clientManager[Object, UnderlayClient, Client]) Register(obj Object) err
184195

185196
var f SharedInformerFactory[UnderlayClient]
186197
if len(m.newPollerFuncMap) != 0 {
187-
f = NewSharedInformerFactory(keys[0], m.logger, m.scheme, underlay, m.newPollerFuncMap, time.Hour)
198+
f = NewSharedInformerFactory(keys[0], m.logger, m.scheme, underlay, m.newPollerFuncMap, m.resyncPeriod)
188199
}
189200

190201
c := m.newClientFunc(obj, underlay, f)

pkg/timanager/manager_test.go

Lines changed: 144 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"time"
2323

2424
"github.com/go-logr/logr"
25+
gocmp "github.com/google/go-cmp/cmp"
2526
"github.com/stretchr/testify/assert"
2627
"github.com/stretchr/testify/require"
2728
corev1 "k8s.io/api/core/v1"
@@ -143,96 +144,200 @@ func TestClientManagerSource(t *testing.T) {
143144
previous []pdv1.Store
144145
updated []pdv1.Store
145146

147+
resyncPeriod time.Duration
148+
resyncTimes int
149+
146150
expectedCreateEvents []event.TypedCreateEvent[client.Object]
147151
expectedUpdateEvents []event.TypedUpdateEvent[client.Object]
148152
expectedDeleteEvents []event.TypedDeleteEvent[client.Object]
149153
}{
150154
{
151155
desc: "no update",
152156
previous: []pdv1.Store{
153-
*fake.FakeObj[pdv1.Store]("aa"),
154-
*fake.FakeObj[pdv1.Store]("bb"),
157+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
158+
*fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
155159
},
156160
updated: []pdv1.Store{
157-
*fake.FakeObj[pdv1.Store]("aa"),
158-
*fake.FakeObj[pdv1.Store]("bb"),
161+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
162+
*fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
159163
},
160164
expectedCreateEvents: []event.TypedCreateEvent[client.Object]{
161165
{
162-
Object: fake.FakeObj[pdv1.Store]("aa"),
166+
Object: fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
163167
},
164168
{
165-
Object: fake.FakeObj[pdv1.Store]("bb"),
169+
Object: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
166170
},
167171
},
168172
},
169173
{
170174
desc: "add new obj",
171175
previous: []pdv1.Store{
172-
*fake.FakeObj[pdv1.Store]("aa"),
176+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
173177
},
174178
updated: []pdv1.Store{
175-
*fake.FakeObj[pdv1.Store]("aa"),
176-
*fake.FakeObj[pdv1.Store]("bb"),
179+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
180+
*fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
177181
},
178182
expectedCreateEvents: []event.TypedCreateEvent[client.Object]{
179183
{
180-
Object: fake.FakeObj[pdv1.Store]("aa"),
184+
Object: fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
181185
},
182186
{
183-
Object: fake.FakeObj[pdv1.Store]("bb"),
187+
Object: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
184188
},
185189
},
186190
},
187191
{
188192
desc: "del existing obj",
189193
previous: []pdv1.Store{
190-
*fake.FakeObj[pdv1.Store]("aa"),
191-
*fake.FakeObj[pdv1.Store]("bb"),
194+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
195+
*fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
192196
},
193197
updated: []pdv1.Store{
194-
*fake.FakeObj[pdv1.Store]("aa"),
198+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
195199
},
196200
expectedCreateEvents: []event.TypedCreateEvent[client.Object]{
197201
{
198-
Object: fake.FakeObj[pdv1.Store]("aa"),
202+
Object: fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
199203
},
200204
{
201-
Object: fake.FakeObj[pdv1.Store]("bb"),
205+
Object: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
202206
},
203207
},
204208
expectedDeleteEvents: []event.TypedDeleteEvent[client.Object]{
205209
{
206-
Object: fake.FakeObj[pdv1.Store]("bb"),
210+
Object: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
207211
},
208212
},
209213
},
210214
{
211215
desc: "update existing obj",
212216
previous: []pdv1.Store{
213-
*fake.FakeObj[pdv1.Store]("aa"),
214-
*fake.FakeObj[pdv1.Store]("bb"),
217+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
218+
*fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
215219
},
216220
updated: []pdv1.Store{
217-
*fake.FakeObj[pdv1.Store]("aa"),
221+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
218222
*fake.FakeObj("bb", func(obj *pdv1.Store) *pdv1.Store {
219223
obj.Labels = map[string]string{"test": "test"}
224+
obj.ResourceVersion = "bbb"
220225
return obj
221226
}),
222227
},
223228
expectedCreateEvents: []event.TypedCreateEvent[client.Object]{
224229
{
225-
Object: fake.FakeObj[pdv1.Store]("aa"),
230+
Object: fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
226231
},
227232
{
228-
Object: fake.FakeObj[pdv1.Store]("bb"),
233+
Object: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
229234
},
230235
},
231236
expectedUpdateEvents: []event.TypedUpdateEvent[client.Object]{
232237
{
233-
ObjectOld: fake.FakeObj[pdv1.Store]("bb"),
238+
ObjectOld: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
234239
ObjectNew: fake.FakeObj("bb", func(obj *pdv1.Store) *pdv1.Store {
235240
obj.Labels = map[string]string{"test": "test"}
241+
obj.ResourceVersion = "bbb"
242+
return obj
243+
}),
244+
},
245+
},
246+
resyncPeriod: time.Second * 3,
247+
resyncTimes: 1,
248+
},
249+
{
250+
// Events after first resync will be lost if no rv
251+
// It's unexpected.
252+
desc: "update existing obj no rv [Unexpected]",
253+
previous: []pdv1.Store{
254+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
255+
*fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
256+
},
257+
updated: []pdv1.Store{
258+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
259+
*fake.FakeObj("bb", func(obj *pdv1.Store) *pdv1.Store {
260+
obj.Labels = map[string]string{"test": "test"}
261+
obj.ResourceVersion = "aaa"
262+
return obj
263+
}),
264+
},
265+
expectedCreateEvents: []event.TypedCreateEvent[client.Object]{
266+
{
267+
Object: fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
268+
},
269+
{
270+
Object: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
271+
},
272+
},
273+
// NOTE: update event is lost
274+
expectedUpdateEvents: nil,
275+
resyncPeriod: time.Second * 3,
276+
resyncTimes: 1,
277+
},
278+
{
279+
desc: "update existing obj no rv resync twice",
280+
previous: []pdv1.Store{
281+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
282+
*fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
283+
},
284+
updated: []pdv1.Store{
285+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
286+
*fake.FakeObj("bb", func(obj *pdv1.Store) *pdv1.Store {
287+
obj.Labels = map[string]string{"test": "test"}
288+
obj.ResourceVersion = "aaa"
289+
return obj
290+
}),
291+
},
292+
expectedCreateEvents: []event.TypedCreateEvent[client.Object]{
293+
{
294+
Object: fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
295+
},
296+
{
297+
Object: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
298+
},
299+
},
300+
expectedUpdateEvents: []event.TypedUpdateEvent[client.Object]{
301+
{
302+
ObjectOld: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
303+
ObjectNew: fake.FakeObj("bb", func(obj *pdv1.Store) *pdv1.Store {
304+
obj.Labels = map[string]string{"test": "test"}
305+
obj.ResourceVersion = "aaa"
306+
return obj
307+
}),
308+
},
309+
},
310+
resyncPeriod: time.Second * 3,
311+
resyncTimes: 2,
312+
},
313+
{
314+
desc: "update existing obj no rv no resync",
315+
previous: []pdv1.Store{
316+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
317+
*fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
318+
},
319+
updated: []pdv1.Store{
320+
*fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
321+
*fake.FakeObj("bb", func(obj *pdv1.Store) *pdv1.Store {
322+
obj.Labels = map[string]string{"test": "test"}
323+
obj.ResourceVersion = "aaa"
324+
return obj
325+
}),
326+
},
327+
expectedCreateEvents: []event.TypedCreateEvent[client.Object]{
328+
{
329+
Object: fake.FakeObj("aa", fake.ResourceVersion[pdv1.Store]("aaa")),
330+
},
331+
{
332+
Object: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
333+
},
334+
},
335+
expectedUpdateEvents: []event.TypedUpdateEvent[client.Object]{
336+
{
337+
ObjectOld: fake.FakeObj("bb", fake.ResourceVersion[pdv1.Store]("aaa")),
338+
ObjectNew: fake.FakeObj("bb", func(obj *pdv1.Store) *pdv1.Store {
339+
obj.Labels = map[string]string{"test": "test"}
340+
obj.ResourceVersion = "aaa"
236341
return obj
237342
}),
238343
},
@@ -243,11 +348,7 @@ func TestClientManagerSource(t *testing.T) {
243348
for i := range cases {
244349
c := &cases[i]
245350
t.Run(c.desc, func(tt *testing.T) {
246-
lister := FakeLister[pdv1.Store, *pdv1.Store]{
247-
L: List[pdv1.Store, *pdv1.Store]{
248-
Items: c.previous,
249-
},
250-
}
351+
lister := NewFakeLister(c.previous)
251352
cm := NewManagerBuilder[client.Object, int, int]().
252353
WithNewUnderlayClientFunc(func(client.Object) (int, error) {
253354
return 0, nil
@@ -260,11 +361,16 @@ func TestClientManagerSource(t *testing.T) {
260361
return 0
261362
}).
262363
WithNewPollerFunc(&pdv1.Store{}, func(name string, logger logr.Logger, _ int) Poller {
263-
return NewPoller(name, logger, &lister, NewDeepEquality[pdv1.Store](), time.Millisecond*200)
364+
return NewPoller(name, logger, lister, NewDeepEquality[pdv1.Store](logger), time.Millisecond*200)
264365
}).
366+
WithResyncPeriod(c.resyncPeriod).
265367
Build()
266368

267-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
369+
timeout := c.resyncPeriod * (time.Duration(c.resyncTimes + 1))
370+
if c.resyncPeriod == 0 {
371+
timeout = time.Second * 10
372+
}
373+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
268374
defer cancel()
269375

270376
done := make(chan struct{})
@@ -287,6 +393,12 @@ func TestClientManagerSource(t *testing.T) {
287393
},
288394

289395
UpdateFunc: func(_ context.Context, event event.TypedUpdateEvent[client.Object], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
396+
// ignore no diff
397+
diff := gocmp.Diff(event.ObjectOld, event.ObjectNew)
398+
if diff == "" {
399+
return
400+
}
401+
290402
_, ok1 := event.ObjectOld.(*pdv1.Store)
291403
assert.True(tt, ok1)
292404

@@ -326,6 +438,8 @@ func TestClientManagerSource(t *testing.T) {
326438
})
327439
assert.True(tt, synced)
328440

441+
time.Sleep(c.resyncPeriod * time.Duration(c.resyncTimes))
442+
329443
lister.UpdateItems(c.updated)
330444

331445
select {

pkg/timanager/pd/member.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
"github.com/go-logr/logr"
26+
"github.com/google/uuid"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/runtime/schema"
2829

@@ -57,7 +58,7 @@ func NewMemberPoller(name string, logger logr.Logger, c pdapi.PDClient) timanage
5758
lister := NewMemberLister(name, c)
5859

5960
// TODO: change interval
60-
return timanager.NewPoller(name, logger, lister, timanager.NewDeepEquality[pdv1.Member](), defaultPollInterval)
61+
return timanager.NewPoller(name, logger, lister, timanager.NewDeepEquality[pdv1.Member](logger), defaultPollInterval)
6162
}
6263

6364
type memberLister struct {
@@ -88,8 +89,9 @@ func (l *memberLister) List(ctx context.Context) (*pdv1.MemberList, error) {
8889
for _, m := range info.Members {
8990
mm[m.MemberId] = &pdv1.Member{
9091
ObjectMeta: metav1.ObjectMeta{
91-
Name: m.Name,
92-
Namespace: l.cluster,
92+
Name: m.Name,
93+
Namespace: l.cluster,
94+
ResourceVersion: uuid.NewString(),
9395
},
9496
ClusterID: strconv.FormatUint(info.Header.ClusterId, 10),
9597
ID: strconv.FormatUint(m.MemberId, 10),

0 commit comments

Comments
 (0)