Skip to content

Commit 0ee51ce

Browse files
committed
Refactor Worker: replace native errors with pkg/errors for enhanced error context wrapping throughout service methods.
1 parent 985dfba commit 0ee51ce

File tree

1 file changed

+51
-43
lines changed

1 file changed

+51
-43
lines changed

services/worker.go

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"context"
66
"crypto/sha256"
77
"encoding/json"
8-
"errors"
98
"fmt"
109
"io"
1110
"sort"
@@ -17,6 +16,7 @@ import (
1716
"github.com/aws/aws-sdk-go/aws/awserr"
1817
awss3 "github.com/aws/aws-sdk-go/service/s3"
1918
pg "github.com/go-pg/pg/v10"
19+
"github.com/pkg/errors"
2020
log "github.com/sirupsen/logrus"
2121
"github.com/urfave/cli"
2222
cs "github.com/webtor-io/common-services"
@@ -146,7 +146,7 @@ func (s *Worker) process(ctx context.Context, db *pg.DB) error {
146146
For("UPDATE SKIP LOCKED").
147147
Select()
148148
if err != nil && !errors.Is(err, pg.ErrNoRows) {
149-
return err
149+
return errors.Wrap(err, "failed to select resources for processing")
150150
}
151151
for _, r := range list {
152152
if err := s.processResource(ctx, tx, r); err != nil {
@@ -157,7 +157,7 @@ func (s *Worker) process(ctx context.Context, db *pg.DB) error {
157157
return nil
158158
})
159159
if err != nil {
160-
return err
160+
return errors.Wrap(err, "failed to run transaction in process")
161161
}
162162
return nil
163163
}
@@ -186,14 +186,14 @@ func (s *Worker) processResource(ctx context.Context, tx *pg.Tx, r Resource) err
186186
if errors.Is(err, pg.ErrNoRows) {
187187
return nil
188188
}
189-
return err
189+
return errors.Wrap(err, "failed to select resource for update")
190190
}
191191
cur.ID = r.ID
192192
cur.Status = processingStatus
193193
cur.UpdatedAt = time.Now()
194194

195195
if _, err = tx.Model(cur).Context(ctx).Column("status", "updated_at").WherePK().Update(); err != nil {
196-
return err
196+
return errors.Wrap(err, "failed to update resource status to processing")
197197
}
198198
select {
199199
case s.jobs <- job{status: processingStatus, id: r.ID}:
@@ -316,15 +316,15 @@ func (s *Worker) handleStore(ctx context.Context, db *pg.DB, id string) (err err
316316
Set("error = null").
317317
Where("resource_id = ?", id).
318318
Update(); err != nil {
319-
return err
319+
return errors.Wrap(err, "failed to reset resource counters")
320320
}
321321

322322
var totalSize, totalStored int64
323323
// Paginate through results to find the file at the specified index
324324
for {
325325
resp, err := s.api.ListResourceContent(ctx, cla, id, listArgs)
326326
if err != nil {
327-
return err
327+
return errors.Wrap(err, "failed to list resource content")
328328
}
329329
for _, item := range resp.Items {
330330
if item.Type == ra.ListTypeFile {
@@ -335,12 +335,12 @@ func (s *Worker) handleStore(ctx context.Context, db *pg.DB, id string) (err err
335335
Set("total_size = ?", totalSize).
336336
Where("resource_id = ?", id).
337337
Update(); err != nil {
338-
return err
338+
return errors.Wrap(err, "failed to update resource total_size")
339339
}
340340

341341
f, err := s.storeFile(ctx, cla, id, item, totalStored)
342342
if err != nil {
343-
return err
343+
return errors.Wrap(err, "failed to store file")
344344
}
345345
totalStored += item.Size
346346

@@ -350,7 +350,7 @@ func (s *Worker) handleStore(ctx context.Context, db *pg.DB, id string) (err err
350350
Set("error = ?", "").
351351
Where("resource_id = ?", id).
352352
Update(); err != nil {
353-
return err
353+
return errors.Wrap(err, "failed to update resource stored_size")
354354
}
355355
rf := &ResourceFile{
356356
ResourceID: id,
@@ -359,7 +359,7 @@ func (s *Worker) handleStore(ctx context.Context, db *pg.DB, id string) (err err
359359
}
360360
_, err = db.Model(rf).Insert()
361361
if err != nil && !strings.Contains(err.Error(), "duplicate key value violates unique constraint") {
362-
return err
362+
return errors.Wrap(err, "failed to insert resource_file")
363363
} else if err != nil {
364364
continue
365365
}
@@ -380,7 +380,10 @@ func (s *Worker) handleStore(ctx context.Context, db *pg.DB, id string) (err err
380380
Column("status").
381381
Where("resource_id = ?", id).
382382
Update()
383-
return err
383+
if err != nil {
384+
return errors.Wrap(err, "failed to update resource status to stored")
385+
}
386+
return nil
384387
}
385388

386389
func (s *Worker) handleDelete(ctx context.Context, db *pg.DB, id string) (err error) {
@@ -390,7 +393,7 @@ func (s *Worker) handleDelete(ctx context.Context, db *pg.DB, id string) (err er
390393
// 1) Collect all files linked to this resource
391394
var rfs []ResourceFile
392395
if err := db.Model(&rfs).Context(ctx).Where("resource_id = ?", id).Select(); err != nil && !errors.Is(err, pg.ErrNoRows) {
393-
return err
396+
return errors.Wrap(err, "failed to select resource files for deletion")
394397
}
395398

396399
// 2) For each file check if it's referenced by any other resource, if not — delete from S3 and DB
@@ -399,7 +402,7 @@ func (s *Worker) handleDelete(ctx context.Context, db *pg.DB, id string) (err er
399402
f := &File{Hash: rf.FileHash}
400403
if err := db.Model(f).Context(ctx).WherePK().Select(); err != nil {
401404
if !errors.Is(err, pg.ErrNoRows) {
402-
return err
405+
return errors.Wrap(err, "failed to select file for deletion")
403406
}
404407
} else {
405408
// Decrease resource stored_size by the size currently accounted for this file
@@ -409,7 +412,7 @@ func (s *Worker) handleDelete(ctx context.Context, db *pg.DB, id string) (err er
409412
Set("updated_at = now()").
410413
Where("resource_id = ?", id).
411414
Update(); err != nil {
412-
return err
415+
return errors.Wrap(err, "failed to update resource stored_size during deletion")
413416
}
414417
}
415418
// Count references excluding current resource
@@ -418,7 +421,7 @@ func (s *Worker) handleDelete(ctx context.Context, db *pg.DB, id string) (err er
418421
Where("resource_id <> ?", id).
419422
Count()
420423
if err != nil {
421-
return err
424+
return errors.Wrap(err, "failed to count file references")
422425
}
423426
if cnt > 0 {
424427
continue
@@ -431,7 +434,7 @@ func (s *Worker) handleDelete(ctx context.Context, db *pg.DB, id string) (err er
431434
Set("updated_at = now()").
432435
WherePK().
433436
Update(); err != nil && !errors.Is(err, pg.ErrNoRows) {
434-
return err
437+
return errors.Wrap(err, "failed to mark file as deleting")
435438
}
436439
// No more references — delete S3 object (if configured) and file row
437440
s3Cl := s.s3.Get()
@@ -440,20 +443,22 @@ func (s *Worker) handleDelete(ctx context.Context, db *pg.DB, id string) (err er
440443
Key: aws.String(rf.FileHash),
441444
})
442445
if delErr != nil {
443-
return delErr
446+
return errors.Wrapf(delErr, "failed to delete object from S3, bucket=%s, key=%s", s.bucket, rf.FileHash)
444447
}
445448
log.WithFields(log.Fields{"bucket": s.bucket, "path": rf.Path, "resource_id": id, "key": rf.FileHash}).Info("deleted from s3")
446449
// Delete file row
447450
f = &File{Hash: rf.FileHash}
448451
if _, err := db.Model(f).Context(ctx).WherePK().Delete(); err != nil && !errors.Is(err, pg.ErrNoRows) {
449-
return err
452+
return errors.Wrap(err, "failed to delete file row from database")
450453
}
451454
}
452455

453456
res := &Resource{ID: id}
454457
_, err = db.Model(res).Context(ctx).WherePK().Delete()
455-
456-
return err
458+
if err != nil {
459+
return errors.Wrap(err, "failed to delete resource from database")
460+
}
461+
return nil
457462
}
458463

459464
func (s *Worker) handleError(ctx context.Context, id string, err error, status Status) {
@@ -491,7 +496,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
491496
Limit(1).
492497
Select()
493498
if err != nil && !errors.Is(err, pg.ErrNoRows) {
494-
return nil, err
499+
return nil, errors.Wrap(err, "failed to check for existing file")
495500
}
496501
if err == nil {
497502
// Found a suitable file that's already stored
@@ -500,7 +505,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
500505
// No existing stored file found by path+size, proceed with exporting and storing by content hash
501506
ei, err := s.api.ExportResourceContent(ctx, cla, id, item.ID)
502507
if err != nil {
503-
return nil, err
508+
return nil, errors.Wrap(err, "failed to export resource content")
504509
}
505510
u := ei.ExportItems["download"].URL
506511

@@ -513,7 +518,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
513518
Set("updated_at = now()").
514519
WherePK().
515520
Update(); err != nil {
516-
return err
521+
return errors.Wrap(err, "failed to update file stored_size")
517522
}
518523
}
519524
if _, err := db.Model(&Resource{ID: id}).
@@ -522,7 +527,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
522527
Set("updated_at = now()").
523528
Where("resource_id = ?", id).
524529
Update(); err != nil {
525-
return err
530+
return errors.Wrap(err, "failed to update resource stored_size during flush")
526531
}
527532
return nil
528533
}
@@ -564,7 +569,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
564569
log.WithField("url", u).Debug("export url")
565570
hash, err = s.generateFileHash(ctx, item, ei)
566571
if err != nil {
567-
return nil, err
572+
return nil, errors.Wrap(err, "failed to generate file hash")
568573
}
569574
log.WithField("hash", hash).Debug("generated hash")
570575
// Prepare file model
@@ -578,14 +583,14 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
578583
WherePK().
579584
Select()
580585
if err != nil && !errors.Is(err, pg.ErrNoRows) {
581-
return nil, err
586+
return nil, errors.Wrap(err, "failed to select file by hash")
582587
}
583588
if err == nil && (f.Status == StatusStored || (f.UpdatedAt.Add(10*time.Second).After(time.Now()) && f.UploadID == "")) {
584589
return f, nil
585590
}
586591
_, err = db.Model(f).Context(ctx).Insert()
587592
if err != nil && !strings.Contains(err.Error(), "duplicate key value violates unique constraint") {
588-
return nil, err
593+
return nil, errors.Wrap(err, "failed to insert file")
589594
}
590595

591596
s3Cl := s.s3.Get()
@@ -605,7 +610,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
605610
f.StoredSize = 0
606611
f.PartSize = partSize
607612
if _, err := db.Model(f).Context(ctx).Column("upload_id", "stored_size", "part_size").WherePK().Update(); err != nil {
608-
return nil, err
613+
return nil, errors.Wrap(err, "failed to reset upload after part size change")
609614
}
610615
}
611616

@@ -615,13 +620,13 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
615620
Key: aws.String(hash),
616621
})
617622
if err != nil {
618-
return nil, err
623+
return nil, errors.Wrapf(err, "failed to create S3 multipart upload, bucket=%s, key=%s", s.bucket, hash)
619624
}
620625
f.UploadID = *out.UploadId
621626
f.StoredSize = 0
622627
f.PartSize = partSize
623628
if _, err := db.Model(f).Context(ctx).Column("upload_id", "stored_size", "part_size").WherePK().Update(); err != nil {
624-
return nil, err
629+
return nil, errors.Wrap(err, "failed to update file with new upload_id")
625630
}
626631
}
627632

@@ -653,18 +658,18 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
653658
Key: aws.String(hash),
654659
})
655660
if err != nil {
656-
return nil, err
661+
return nil, errors.Wrapf(err, "failed to recreate S3 multipart upload after NoSuchUpload, bucket=%s, key=%s", s.bucket, hash)
657662
}
658663
f.UploadID = *out.UploadId
659664
f.StoredSize = 0
660665
f.PartSize = partSize
661666
if _, err := db.Model(f).Context(ctx).Column("upload_id", "stored_size", "part_size").WherePK().Update(); err != nil {
662-
return nil, err
667+
return nil, errors.Wrap(err, "failed to update file after recreating upload")
663668
}
664669
partNumber = 1
665670
completedParts = nil
666671
} else {
667-
return nil, err
672+
return nil, errors.Wrapf(err, "failed to list S3 multipart upload parts, bucket=%s, key=%s, upload_id=%s", s.bucket, hash, f.UploadID)
668673
}
669674
}
670675

@@ -683,7 +688,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
683688
defer dcancel()
684689
r, err := s.api.DownloadWithRange(dctx, u, int(stored), -1)
685690
if err != nil {
686-
return nil, err
691+
return nil, errors.Wrapf(err, "failed to download file content with range, url=%s, start=%d", u, stored)
687692
}
688693
defer func(r io.ReadCloser) {
689694
_ = r.Close()
@@ -796,7 +801,7 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
796801
buf := make([]byte, currentPartSize)
797802
_, err := io.ReadFull(r, buf)
798803
if err != nil {
799-
setUploadErr(err)
804+
setUploadErr(errors.Wrap(err, "failed to read part data from download stream"))
800805
break
801806
}
802807

@@ -834,15 +839,15 @@ func (s *Worker) storeFile(ctx context.Context, cla *Claims, id string, item ra.
834839
},
835840
})
836841
if err != nil {
837-
return nil, err
842+
return nil, errors.Wrapf(err, "failed to complete S3 multipart upload, bucket=%s, key=%s, upload_id=%s", s.bucket, hash, f.UploadID)
838843
}
839844

840845
// Ensure file status and stored_size are finalized
841846
f.Status = StatusStored
842847
f.StoredSize = f.TotalSize
843848
f.UploadID = ""
844849
if _, err := db.Model(f).Context(ctx).Column("status", "stored_size", "upload_id").WherePK().Update(); err != nil {
845-
return nil, err
850+
return nil, errors.Wrap(err, "failed to finalize file status after upload")
846851
}
847852
log.WithFields(log.Fields{"bucket": s.bucket, "resource_id": id, "path": item.PathStr, "key": hash, "size": item.Size}).Info("stored to s3")
848853
return f, nil
@@ -860,35 +865,38 @@ func (s *Worker) generateFileHash(ctx context.Context, item ra.ListItem, ei *ra.
860865
if size < limitStart+limitEnd {
861866
r, err := s.api.Download(dctx, u)
862867
if err != nil {
863-
return "", err
868+
return "", errors.Wrapf(err, "failed to download file for hash generation, url=%s", u)
864869
}
865870
defer func(r io.ReadCloser) {
866871
_ = r.Close()
867872
}(r)
868873
_, err = io.Copy(h, r)
869874
if err != nil {
870-
return "", err
875+
return "", errors.Wrap(err, "failed to copy downloaded data to hash")
871876
}
872877
} else {
873878
r, err := s.api.DownloadWithRange(dctx, u, 0, int(limitStart))
874879
if err != nil {
875-
return "", err
880+
return "", errors.Wrapf(err, "failed to download file start for hash generation, url=%s, range=0-%d", u, limitStart)
876881
}
877882
defer func(r io.ReadCloser) {
878883
_ = r.Close()
879884
}(r)
880885
_, err = io.Copy(h, r)
881886
if err != nil {
882-
return "", err
887+
return "", errors.Wrap(err, "failed to copy file start data to hash")
883888
}
884889
r, err = s.api.DownloadWithRange(dctx, u, int(size-limitEnd), -1)
885890
if err != nil {
886-
return "", err
891+
return "", errors.Wrapf(err, "failed to download file end for hash generation, url=%s, range=%d-end", u, size-limitEnd)
887892
}
888893
defer func(r io.ReadCloser) {
889894
_ = r.Close()
890895
}(r)
891896
_, err = io.Copy(h, r)
897+
if err != nil {
898+
return "", errors.Wrap(err, "failed to copy file end data to hash")
899+
}
892900
}
893901
return fmt.Sprintf("%x", h.Sum(nil)), nil
894902
}

0 commit comments

Comments
 (0)