Skip to content

Commit e5e62c1

Browse files
authored
feat: cleanup done queue (#37)
1 parent f3a9e42 commit e5e62c1

File tree

17 files changed

+468
-51
lines changed

17 files changed

+468
-51
lines changed

cmd/api/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ func main() {
105105
logger.Fatal("failed to init ProcessingRecoveryService", zap.Error(err))
106106
}
107107

108+
cleanupSvc := services.NewCleanupService(redisStore, timeSvc)
109+
108110
svisor := supervisor.NewSupervisor(
109111
supervisor.WithLogger(logger),
110112
supervisor.WithMessageFetcher(messageFetcher),
@@ -114,6 +116,7 @@ func main() {
114116
supervisor.WithProcessingResultsService(processingResultsSvc),
115117
supervisor.WithSchedulerService(schedulerSvc),
116118
supervisor.WithProcessingRecoveryService(processingRecoverySvc),
119+
supervisor.WithCleanupService(cleanupSvc),
117120
)
118121

119122
wg.Add(1)

pkg/lib/config.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,18 @@ type RedisConfig struct {
3030

3131
// Supervisor queues handling settings
3232
type SupervisorConfig struct {
33-
ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"`
34-
ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"`
33+
ReadyWaitTime time.Duration `env:"SUPERVISOR_READY_WAIT_TIME,default=5s"`
34+
ErrSleepTime time.Duration `env:"SUPERVISOR_ERR_SLEEP_TIME,default=5s"`
35+
// interval between scheduler runs to move scheduled jobs to "ready for processing" queue
3536
SchedulerInterval time.Duration `env:"SUPERVISOR_SCHEDULER_INTERVAL,default=30s"`
3637
// interval to move back stuck messages from processing to ready queue
3738
ProcessingRecoveryInterval time.Duration `env:"SUPERVISOR_PROCESSING_RECOVERY_INTERVAL,default=5m"`
39+
// enables deleting done messages from the database after DoneQueueCleanupDelay
40+
DoneQueueCleanupEnabled bool `env:"SUPERVISOR_DONE_QUEUE_CLEANUP_ENABLED,default=false"`
41+
// delay after which done messages are deleted from the database. Default 14 days = 336 hours
42+
DoneQueueCleanupDelay time.Duration `env:"SUPERVISOR_DONE_QUEUE_CLEANUP_DELAY,default=336h"`
43+
// interval between done queue cleanup runs
44+
DoneQueueCleanupInterval time.Duration `env:"SUPERVISOR_DONE_QUEUE_CLEANUP_INTERVAL,default=60m"`
3845
}
3946

4047
type HTTPClientConfig struct {

pkg/services/cleanup_service.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package services
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/didil/inhooks/pkg/lib"
8+
"github.com/didil/inhooks/pkg/models"
9+
"github.com/pkg/errors"
10+
)
11+
12+
type CleanupService interface {
13+
CleanupDoneQueue(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error)
14+
}
15+
16+
func NewCleanupService(redisStore RedisStore, timeSvc TimeService) CleanupService {
17+
return &cleanupService{
18+
redisStore: redisStore,
19+
timeSvc: timeSvc,
20+
}
21+
}
22+
23+
type cleanupService struct {
24+
redisStore RedisStore
25+
timeSvc TimeService
26+
}
27+
28+
func (s *cleanupService) CleanupDoneQueue(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error) {
29+
doneQueueKey := queueKey(f.ID, sink.ID, models.QueueStatusDone)
30+
31+
cutOffTimeEpoch := s.timeSvc.Now().Add(-doneQueueCleanupDelay).Unix()
32+
mIDs, err := s.redisStore.ZRangeBelowScore(ctx, doneQueueKey, float64(cutOffTimeEpoch))
33+
if err != nil {
34+
return 0, err
35+
}
36+
if err != nil {
37+
return 0, errors.Wrapf(err, "failed to zrange below score")
38+
}
39+
if len(mIDs) == 0 {
40+
// no messages do cleanup
41+
return 0, nil
42+
}
43+
44+
// move message ids in chunks
45+
chunkSize := 50
46+
mIDChunks := lib.ChunkSliceBy(mIDs, chunkSize)
47+
48+
for i := 0; i < len(mIDChunks); i++ {
49+
messageKeys := make([]string, 0, len(mIDChunks[i]))
50+
for _, mId := range mIDChunks[i] {
51+
mKey := messageKey(f.ID, sink.ID, mId)
52+
messageKeys = append(messageKeys, mKey)
53+
}
54+
55+
err := s.redisStore.ZRemDel(ctx, doneQueueKey, mIDChunks[i], messageKeys)
56+
if err != nil {
57+
return 0, errors.Wrapf(err, "failed to zremdel")
58+
}
59+
}
60+
61+
return len(mIDs), nil
62+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package services
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/didil/inhooks/pkg/models"
9+
"github.com/didil/inhooks/pkg/testsupport/mocks"
10+
"github.com/golang/mock/gomock"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestCleanUpServiceCleanupDoneQueue(t *testing.T) {
15+
ctrl := gomock.NewController(t)
16+
defer ctrl.Finish()
17+
18+
redisStore := mocks.NewMockRedisStore(ctrl)
19+
timeSvc := mocks.NewMockTimeService(ctrl)
20+
21+
now := time.Date(2023, 05, 5, 8, 46, 20, 0, time.UTC)
22+
timeSvc.EXPECT().Now().Return(now)
23+
24+
ctx := context.Background()
25+
26+
flowId := "flow-1"
27+
sinkID := "sink-1"
28+
29+
flow := &models.Flow{
30+
ID: flowId,
31+
}
32+
sink := &models.Sink{
33+
ID: sinkID,
34+
}
35+
36+
queueKey := "f:flow-1:s:sink-1:q:done"
37+
38+
doneQueueCleanupDelay := 30 * time.Minute
39+
cutoffTime := time.Date(2023, 05, 5, 8, 16, 20, 0, time.UTC)
40+
41+
mIds := []string{"message-1", "message-2"}
42+
messageKeys := []string{"f:flow-1:s:sink-1:m:message-1", "f:flow-1:s:sink-1:m:message-2"}
43+
44+
redisStore.EXPECT().ZRangeBelowScore(ctx, queueKey, float64(cutoffTime.Unix())).Return(mIds, nil)
45+
redisStore.EXPECT().ZRemDel(ctx, queueKey, mIds, messageKeys).Return(nil)
46+
47+
s := NewCleanupService(redisStore, timeSvc)
48+
count, err := s.CleanupDoneQueue(ctx, flow, sink, doneQueueCleanupDelay)
49+
assert.NoError(t, err)
50+
51+
assert.Equal(t, 2, count)
52+
}

pkg/services/redis_store.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package services
33
import (
44
"context"
55
"fmt"
6+
"strconv"
67
"time"
78

89
"github.com/pkg/errors"
@@ -22,6 +23,8 @@ type RedisStore interface {
2223
ZRemRpush(ctx context.Context, messageIDs []string, sourceQueueKey string, destQueueKey string) error
2324
LRangeAll(ctx context.Context, queueKey string) ([]string, error)
2425
LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey string, messageIDs []string) error
26+
ZRemRangeBelowScore(ctx context.Context, queueKey string, maxScore int) (int, error)
27+
ZRemDel(ctx context.Context, queueKey string, messageIDs []string, messageKeys []string) error
2528
}
2629

2730
type redisStore struct {
@@ -252,3 +255,36 @@ func (s *redisStore) LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey
252255

253256
return nil
254257
}
258+
259+
func (s *redisStore) ZRemRangeBelowScore(ctx context.Context, queueKey string, maxScore int) (int, error) {
260+
queueKeyWithPrefix := s.keyWithPrefix(queueKey)
261+
262+
count, err := s.client.ZRemRangeByScore(ctx, queueKeyWithPrefix, "-inf", strconv.Itoa(maxScore)).Result()
263+
if err != nil {
264+
return 0, errors.Wrapf(err, "failed to zremrangebyscore. queueKey: %s", queueKeyWithPrefix)
265+
}
266+
267+
return int(count), nil
268+
}
269+
270+
func (s *redisStore) ZRemDel(ctx context.Context, queueKey string, messageIDs []string, messageKeys []string) error {
271+
pipe := s.client.TxPipeline()
272+
273+
queueKeyWithPrefix := s.keyWithPrefix(queueKey)
274+
pipe.ZRem(ctx, queueKeyWithPrefix, messageIDs)
275+
276+
messageKeysWithPrefix := []string{}
277+
for _, messageKey := range messageKeys {
278+
messageKeyWithPrefix := s.keyWithPrefix(messageKey)
279+
messageKeysWithPrefix = append(messageKeysWithPrefix, messageKeyWithPrefix)
280+
}
281+
282+
pipe.Del(ctx, messageKeysWithPrefix...)
283+
284+
_, err := pipe.Exec(ctx)
285+
if err != nil {
286+
return err
287+
}
288+
289+
return nil
290+
}

pkg/services/redis_store_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,3 +441,87 @@ func (s *RedisStoreSuite) TestLRemRPush() {
441441
s.NoError(err)
442442
s.Equal([]string{`message-4`, "message-1", "message-3"}, results)
443443
}
444+
445+
func (s *RedisStoreSuite) TestZRemRangeBelowScore() {
446+
ctx := context.Background()
447+
prefix := fmt.Sprintf("inhooks:%s", s.appConf.Redis.InhooksDBName)
448+
defer func() {
449+
err := testsupport.DeleteAllRedisKeys(ctx, s.client, prefix)
450+
s.NoError(err)
451+
}()
452+
453+
now := time.Date(2023, 05, 5, 8, 9, 24, 0, time.UTC)
454+
455+
queueKey := "q:scheduled"
456+
queueKeyWithPrefix := fmt.Sprintf("%s:%s", prefix, queueKey)
457+
458+
m1ID := "message-1"
459+
m2ID := "message-2"
460+
m3ID := "message-3"
461+
m4ID := "message-4"
462+
463+
_, err := s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Unix()), Member: m1ID}).Result()
464+
s.NoError(err)
465+
466+
_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(5 * time.Minute).Unix()), Member: m2ID}).Result()
467+
s.NoError(err)
468+
469+
_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(-5 * time.Minute).Unix()), Member: m3ID}).Result()
470+
s.NoError(err)
471+
472+
_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(20 * time.Minute).Unix()), Member: m4ID}).Result()
473+
s.NoError(err)
474+
475+
count, err := s.redisStore.ZRemRangeBelowScore(ctx, queueKey, int(now.Unix()))
476+
s.NoError(err)
477+
478+
s.Equal(2, count)
479+
480+
queueResults, err := s.client.ZRange(ctx, queueKeyWithPrefix, 0, -1).Result()
481+
s.NoError(err)
482+
483+
s.Equal([]string{"message-2", "message-4"}, queueResults)
484+
}
485+
486+
func (s *RedisStoreSuite) TestZRemDel() {
487+
ctx := context.Background()
488+
prefix := fmt.Sprintf("inhooks:%s", s.appConf.Redis.InhooksDBName)
489+
defer func() {
490+
err := testsupport.DeleteAllRedisKeys(ctx, s.client, prefix)
491+
s.NoError(err)
492+
}()
493+
494+
now := time.Date(2023, 05, 5, 8, 9, 24, 0, time.UTC)
495+
496+
queueKey := "q:scheduled"
497+
queueKeyWithPrefix := fmt.Sprintf("%s:%s", prefix, queueKey)
498+
499+
m1ID := "message-1"
500+
m2ID := "message-2"
501+
m3ID := "message-3"
502+
m4ID := "message-4"
503+
504+
messageIDs := []string{m1ID, m3ID}
505+
messageKeys := []string{fmt.Sprintf("m:%s", m1ID), fmt.Sprintf("m:%s", m3ID)}
506+
507+
_, err := s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Unix()), Member: m1ID}).Result()
508+
s.NoError(err)
509+
510+
_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(5 * time.Minute).Unix()), Member: m2ID}).Result()
511+
s.NoError(err)
512+
513+
_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(-5 * time.Minute).Unix()), Member: m3ID}).Result()
514+
s.NoError(err)
515+
516+
_, err = s.client.ZAdd(ctx, queueKeyWithPrefix, redis.Z{Score: float64(now.Add(20 * time.Minute).Unix()), Member: m4ID}).Result()
517+
s.NoError(err)
518+
519+
err = s.redisStore.ZRemDel(ctx, queueKey, messageIDs, messageKeys)
520+
s.NoError(err)
521+
522+
queueResults, err := s.client.ZRange(ctx, queueKeyWithPrefix, 0, -1).Result()
523+
s.NoError(err)
524+
525+
s.Equal([]string{"message-2", "message-4"}, queueResults)
526+
527+
}

pkg/supervisor/done.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package supervisor
2+
3+
import (
4+
"time"
5+
6+
"github.com/didil/inhooks/pkg/models"
7+
"go.uber.org/zap"
8+
)
9+
10+
func (s *Supervisor) HandleDoneQueue(f *models.Flow, sink *models.Sink) {
11+
logger := s.logger.With(zap.String("flowID", f.ID), zap.String("sinkID", sink.ID))
12+
for {
13+
if s.appConf.Supervisor.DoneQueueCleanupEnabled {
14+
count, err := s.cleanupSvc.CleanupDoneQueue(s.ctx, f, sink, s.appConf.Supervisor.DoneQueueCleanupDelay)
15+
if err != nil {
16+
logger.Error("failed to cleanup done queue", zap.Error(err))
17+
}
18+
if count > 0 {
19+
logger.Info("done queue cleanup ok. messages removed", zap.Int("messagesCount", count))
20+
}
21+
}
22+
23+
// wait before next check
24+
timer := time.NewTimer(s.appConf.Supervisor.DoneQueueCleanupInterval)
25+
26+
select {
27+
case <-s.ctx.Done():
28+
return
29+
case <-timer.C:
30+
continue
31+
}
32+
}
33+
}

pkg/supervisor/done_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package supervisor
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/didil/inhooks/pkg/models"
9+
"github.com/didil/inhooks/pkg/testsupport"
10+
"github.com/didil/inhooks/pkg/testsupport/mocks"
11+
"github.com/golang/mock/gomock"
12+
"github.com/stretchr/testify/assert"
13+
"go.uber.org/zap"
14+
)
15+
16+
func TestSupervisor_HandleDoneQueue(t *testing.T) {
17+
appConf, err := testsupport.InitAppConfig(context.Background())
18+
assert.NoError(t, err)
19+
20+
appConf.Supervisor.DoneQueueCleanupInterval = 45 * time.Second
21+
appConf.Supervisor.DoneQueueCleanupDelay = 5 * time.Hour
22+
appConf.Supervisor.DoneQueueCleanupEnabled = true
23+
24+
ctrl := gomock.NewController(t)
25+
defer ctrl.Finish()
26+
27+
flowId1 := "flow-1"
28+
sinkID1 := "sink-1"
29+
30+
sink1 := &models.Sink{
31+
ID: sinkID1,
32+
}
33+
34+
flow1 := &models.Flow{
35+
ID: flowId1,
36+
Sinks: []*models.Sink{sink1},
37+
}
38+
39+
cleanupSvc := mocks.NewMockCleanupService(ctrl)
40+
41+
logger, err := zap.NewDevelopment()
42+
assert.NoError(t, err)
43+
44+
s := NewSupervisor(
45+
WithCleanupService(cleanupSvc),
46+
WithAppConfig(appConf),
47+
WithLogger(logger),
48+
)
49+
50+
count := 2
51+
cleanupSvc.EXPECT().
52+
CleanupDoneQueue(gomock.Any(), flow1, sink1, appConf.Supervisor.DoneQueueCleanupDelay).
53+
DoAndReturn(func(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error) {
54+
s.Shutdown()
55+
56+
return count, nil
57+
})
58+
59+
s.HandleDoneQueue(flow1, sink1)
60+
61+
}

0 commit comments

Comments
 (0)