Skip to content

Commit a443707

Browse files
committed
Refactor periodic progress flushing in Worker to use context-based cancelation, improving reliability and resource management.
1 parent 3aa183c commit a443707

File tree

1 file changed

+16
-7
lines changed

1 file changed

+16
-7
lines changed

services/worker.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -617,9 +617,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
617617
}
618618
}
619619

620-
lastFlush := time.Now()
621620
flush := func(stored int64) error {
622-
lastFlush = time.Now()
623621
if _, err := db.Model(&File{Hash: hash}).
624622
Context(ctx).
625623
Set("stored_size = ?", stored).
@@ -678,15 +676,25 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
678676
}).Info("uploading part")
679677

680678
var partStored int64
681-
pr := &progressReader{
682-
r: r,
683-
onRead: func(n int) error {
684-
partStored += int64(n)
685-
if time.Since(lastFlush) >= 10*time.Second {
679+
partCtx, partCancel := context.WithCancel(ctx)
680+
go func() {
681+
ticker := time.NewTicker(10 * time.Second)
682+
defer ticker.Stop()
683+
for {
684+
select {
685+
case <-partCtx.Done():
686+
return
687+
case <-ticker.C:
686688
if err := flush(stored + partStored); err != nil {
687689
log.WithError(err).Error("periodic flush progress failed")
688690
}
689691
}
692+
}
693+
}()
694+
pr := &progressReader{
695+
r: r,
696+
onRead: func(n int) error {
697+
partStored += int64(n)
690698
return nil
691699
},
692700
}
@@ -698,6 +706,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
698706
PartNumber: aws.Int64(partNumber),
699707
Body: aws.ReadSeekCloser(pr),
700708
})
709+
partCancel()
701710
_ = r.Close()
702711
if err != nil {
703712
return nil, err

0 commit comments

Comments
 (0)