@@ -11,7 +11,7 @@ import (
1111)
1212
1313type MessageEnqueuer interface {
14- Enqueue (ctx context.Context , messages []* models.Message ) error
14+ Enqueue (ctx context.Context , messages []* models.Message ) ([] * models. QueuedInfo , error )
1515}
1616
1717func NewMessageEnqueuer (redisStore RedisStore , timeSvc TimeService ) MessageEnqueuer {
@@ -26,34 +26,47 @@ type messageEnqueuer struct {
2626 timeSvc TimeService
2727}
2828
29- func (e * messageEnqueuer ) Enqueue (ctx context.Context , messages []* models.Message ) error {
29+ func (e * messageEnqueuer ) Enqueue (ctx context.Context , messages []* models.Message ) ([]* models.QueuedInfo , error ) {
30+ queuedInfos := []* models.QueuedInfo {}
31+
3032 for _ , m := range messages {
3133 queueStatus := getQueueStatus (m , e .timeSvc .Now ())
3234
33- b , err := json . Marshal ( & m )
35+ err := e . redisEnqueue ( ctx , m , queueStatus )
3436 if err != nil {
35- return errors . Wrapf ( err , "failed to encode message for sink: %s" , m . SinkID )
37+ return nil , err
3638 }
3739
38- mKey := messageKey (m .FlowID , m .SinkID , m .ID )
39- qKey := queueKey (m .FlowID , m .SinkID , queueStatus )
40-
41- switch queueStatus {
42- case models .QueueStatusReady :
43- err = e .redisStore .SetAndEnqueue (ctx , mKey , b , qKey , m .ID )
44- if err != nil {
45- return errors .Wrapf (err , "failed to set and enqueue message for sink: %s" , m .SinkID )
46- }
47- case models .QueueStatusScheduled :
48- err = e .redisStore .SetAndZAdd (ctx , mKey , b , qKey , m .ID , float64 (m .DeliverAfter .Unix ()))
49- if err != nil {
50- return errors .Wrapf (err , "failed to set and enqueue message for sink: %s" , m .SinkID )
51- }
52- default :
53- return fmt .Errorf ("unexpected queue status %s" , queueStatus )
54- }
40+ queuedInfos = append (queuedInfos , & models.QueuedInfo {MessageID : m .ID , QueueStatus : queueStatus , DeliverAfter : m .DeliverAfter })
41+ }
5542
43+ return queuedInfos , nil
44+ }
45+
46+ func (e * messageEnqueuer ) redisEnqueue (ctx context.Context , m * models.Message , queueStatus models.QueueStatus ) error {
47+ b , err := json .Marshal (& m )
48+ if err != nil {
49+ return errors .Wrapf (err , "failed to encode message for sink: %s" , m .SinkID )
5650 }
51+
52+ mKey := messageKey (m .FlowID , m .SinkID , m .ID )
53+ qKey := queueKey (m .FlowID , m .SinkID , queueStatus )
54+
55+ switch queueStatus {
56+ case models .QueueStatusReady :
57+ err = e .redisStore .SetAndEnqueue (ctx , mKey , b , qKey , m .ID )
58+ if err != nil {
59+ return errors .Wrapf (err , "failed to set and enqueue message for sink: %s" , m .SinkID )
60+ }
61+ case models .QueueStatusScheduled :
62+ err = e .redisStore .SetAndZAdd (ctx , mKey , b , qKey , m .ID , float64 (m .DeliverAfter .Unix ()))
63+ if err != nil {
64+ return errors .Wrapf (err , "failed to set and enqueue message for sink: %s" , m .SinkID )
65+ }
66+ default :
67+ return fmt .Errorf ("unexpected queue status %s" , queueStatus )
68+ }
69+
5770 return nil
5871}
5972
0 commit comments