44 "context"
55 "log/slog"
66 "sync"
7+ "sync/atomic"
78 "time"
89
910 "github.com/haileyok/at-kafka/atkafka"
@@ -44,12 +45,17 @@ type Outbox struct {
4445 outgoing chan * OutboxEvt
4546
4647 ctx context.Context
48+
49+ cachedTime atomic.Value
4750}
4851
4952// Run starts the outbox workers for event delivery and cleanup.
5053func (o * Outbox ) Run (ctx context.Context ) {
5154 o .ctx = ctx
5255
56+ o .cachedTime .Store (time .Now ())
57+ go o .updateCachedTime (ctx )
58+
5359 if o .mode == OutboxModeWebsocketAck || o .mode == OutboxModeKafka {
5460 go o .checkTimeouts (ctx )
5561 }
@@ -65,6 +71,22 @@ func (o *Outbox) Run(ctx context.Context) {
6571 <- ctx .Done ()
6672}
6773
74+ // updateCachedTime updates a cached time for us every 250ms, so we avoid calling time.Now() all the time in
75+ // handlers
76+ func (o * Outbox ) updateCachedTime (ctx context.Context ) {
77+ ticker := time .NewTicker (250 * time .Millisecond )
78+ defer ticker .Stop ()
79+
80+ for {
81+ select {
82+ case <- ctx .Done ():
83+ return
84+ case <- ticker .C :
85+ o .cachedTime .Store (time .Now ())
86+ }
87+ }
88+ }
89+
6890// runDelivery continuously pulls from pendingIDs and delivers events
6991func (o * Outbox ) runDelivery (ctx context.Context ) {
7092 for {
@@ -299,7 +321,8 @@ func (w *DIDWorker) processPendingEvts() {
299321 w .blockedOnLive = true
300322 }
301323 w .pendingEvts = w .pendingEvts [1 :]
302- w .inFlightSentAt [eventID ] = time .Now ()
324+
325+ w .inFlightSentAt [eventID ] = w .outbox .cachedTime .Load ().(time.Time )
303326 w .mu .Unlock ()
304327
305328 w .outbox .sendEvent (evt )
@@ -316,7 +339,7 @@ func (w *DIDWorker) addEvent(evt *OutboxEvt) {
316339
317340 // Fast path: no contention, send immediately without goroutine
318341 if ! hasInFlight {
319- w .inFlightSentAt [evt .ID ] = time . Now ( )
342+ w .inFlightSentAt [evt .ID ] = w . outbox . cachedTime . Load ().(time. Time )
320343 w .mu .Unlock ()
321344 w .outbox .sendEvent (evt )
322345 return
0 commit comments