@@ -51,6 +51,10 @@ type DeltaFIFOOptions struct {
5151 // When true, `Replaced` events will be sent for items passed to a Replace() call.
5252 // When false, `Sync` events will be sent instead.
5353 EmitDeltaTypeReplaced bool
54+
55+ // If set, will be called for objects before enqueueing them. Please
56+ // see the comment on TransformFunc for details.
57+ Transformer TransformFunc
5458}
5559
5660// DeltaFIFO is like FIFO, but differs in two ways. One is that the
@@ -129,8 +133,32 @@ type DeltaFIFO struct {
129133 // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
130134 // DeltaType when Replace() is called (to preserve backwards compat).
131135 emitDeltaTypeReplaced bool
136+
137+ // Called with every object if non-nil.
138+ transformer TransformFunc
132139}
133140
141+ // TransformFunc allows for transforming an object before it will be processed.
142+ // TransformFunc (similarly to ResourceEventHandler functions) should be able
143+ // to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
144+ //
145+ // New in v1.27: In such cases, the contained object will already have gone
146+ // through the transform object separately (when it was added / updated prior
147+ // to the delete), so the TransformFunc can likely safely ignore such objects
148+ // (i.e., just return the input object).
149+ //
150+ // The most common usage pattern is to clean-up some parts of the object to
151+ // reduce component memory usage if a given component doesn't care about them.
152+ //
153+ // New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
154+ // sees the object before any other actor, and it is now safe to mutate the
155+ // object in place instead of making a copy.
156+ //
157+ // Note that TransformFunc is called while inserting objects into the
158+ // notification queue and is therefore extremely performance sensitive; please
159+ // do not do anything that will take a long time.
160+ type TransformFunc func (interface {}) (interface {}, error )
161+
134162// DeltaType is the type of a change (addition, deletion, etc)
135163type DeltaType string
136164
@@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
227255 knownObjects : opts .KnownObjects ,
228256
229257 emitDeltaTypeReplaced : opts .EmitDeltaTypeReplaced ,
258+ transformer : opts .Transformer ,
230259 }
231260 f .cond .L = & f .lock
232261 return f
@@ -411,6 +440,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
411440 if err != nil {
412441 return KeyError {obj , err }
413442 }
443+
444+ // Every object comes through this code path once, so this is a good
445+ // place to call the transform func. If obj is a
446+ // DeletedFinalStateUnknown tombstone, then the containted inner object
447+ // will already have gone through the transformer, but we document that
448+ // this can happen. In cases involving Replace(), such an object can
449+ // come through multiple times.
450+ if f .transformer != nil {
451+ var err error
452+ obj , err = f .transformer (obj )
453+ if err != nil {
454+ return err
455+ }
456+ }
457+
414458 oldDeltas := f .items [id ]
415459 newDeltas := append (oldDeltas , Delta {actionType , obj })
416460 newDeltas = dedupDeltas (newDeltas )
@@ -566,12 +610,11 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
566610// using the Sync or Replace DeltaType and then (2) it does some deletions.
567611// In particular: for every pre-existing key K that is not the key of
568612// an object in `list` there is the effect of
569- // `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
570- // of K. If `f.knownObjects == nil` then the pre-existing keys are
571- // those in `f.items` and the current object of K is the `.Newest()`
572- // of the Deltas associated with K. Otherwise the pre-existing keys
573- // are those listed by `f.knownObjects` and the current object of K is
574- // what `f.knownObjects.GetByKey(K)` returns.
613+ // `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
614+ // object of K. The pre-existing keys are those in the union set of the keys in
615+ // `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
616+ // the one present in the last delta in `f.items`. If there is no delta for K
617+ // in `f.items`, it is the object in `f.knownObjects`
575618func (f * DeltaFIFO ) Replace (list []interface {}, _ string ) error {
576619 f .lock .Lock ()
577620 defer f .lock .Unlock ()
@@ -595,58 +638,56 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
595638 }
596639 }
597640
598- if f .knownObjects == nil {
599- // Do deletion detection against our own list.
600- queuedDeletions := 0
601- for k , oldItem := range f .items {
602- if keys .Has (k ) {
603- continue
604- }
605- // Delete pre-existing items not in the new list.
606- // This could happen if watch deletion event was missed while
607- // disconnected from apiserver.
608- var deletedObj interface {}
609- if n := oldItem .Newest (); n != nil {
610- deletedObj = n .Object
611- }
612- queuedDeletions ++
613- if err := f .queueActionLocked (Deleted , DeletedFinalStateUnknown {k , deletedObj }); err != nil {
614- return err
615- }
616- }
617-
618- if ! f .populated {
619- f .populated = true
620- // While there shouldn't be any queued deletions in the initial
621- // population of the queue, it's better to be on the safe side.
622- f .initialPopulationCount = keys .Len () + queuedDeletions
623- }
624-
625- return nil
626- }
627-
628- // Detect deletions not already in the queue.
629- knownKeys := f .knownObjects .ListKeys ()
641+ // Do deletion detection against objects in the queue
630642 queuedDeletions := 0
631- for _ , k := range knownKeys {
643+ for k , oldItem := range f . items {
632644 if keys .Has (k ) {
633645 continue
634646 }
635-
636- deletedObj , exists , err := f .knownObjects .GetByKey (k )
637- if err != nil {
638- deletedObj = nil
639- klog .Errorf ("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object" , err , k )
640- } else if ! exists {
641- deletedObj = nil
642- klog .Infof ("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object" , k )
647+ // Delete pre-existing items not in the new list.
648+ // This could happen if watch deletion event was missed while
649+ // disconnected from apiserver.
650+ var deletedObj interface {}
651+ if n := oldItem .Newest (); n != nil {
652+ deletedObj = n .Object
653+
654+ // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
655+ if d , ok := deletedObj .(DeletedFinalStateUnknown ); ok {
656+ deletedObj = d .Obj
657+ }
643658 }
644659 queuedDeletions ++
645660 if err := f .queueActionLocked (Deleted , DeletedFinalStateUnknown {k , deletedObj }); err != nil {
646661 return err
647662 }
648663 }
649664
665+ if f .knownObjects != nil {
666+ // Detect deletions for objects not present in the queue, but present in KnownObjects
667+ knownKeys := f .knownObjects .ListKeys ()
668+ for _ , k := range knownKeys {
669+ if keys .Has (k ) {
670+ continue
671+ }
672+ if len (f .items [k ]) > 0 {
673+ continue
674+ }
675+
676+ deletedObj , exists , err := f .knownObjects .GetByKey (k )
677+ if err != nil {
678+ deletedObj = nil
679+ klog .Errorf ("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object" , err , k )
680+ } else if ! exists {
681+ deletedObj = nil
682+ klog .Infof ("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object" , k )
683+ }
684+ queuedDeletions ++
685+ if err := f .queueActionLocked (Deleted , DeletedFinalStateUnknown {k , deletedObj }); err != nil {
686+ return err
687+ }
688+ }
689+ }
690+
650691 if ! f .populated {
651692 f .populated = true
652693 f .initialPopulationCount = keys .Len () + queuedDeletions
0 commit comments