Skip to content

Commit 0b869cb

Browse files
committed
feat: use cache for workflow queues
Updates the workflows backend to use the write-through cache for workflow and activity queue items. This eliminates a frequent, expensive range query, and enables us to reduce the queue polling frequency to the default 200ms without overloading Etcd. We were previously performing these range queries every 500ms, but the queries themselves could take up to 1.5s - even when the range was empty.
1 parent 31df024 commit 0b869cb

File tree

9 files changed

+174
-35
lines changed

9 files changed

+174
-35
lines changed

Makefile

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ LOG_LEVEL ?= info
77
DEV_IMAGE_REPO ?= ghcr.io/pgedge
88
CONTROL_PLANE_IMAGE_REPO ?= host.docker.internal:5000/control-plane
99
TEST_RERUN_FAILS ?= 0
10+
TEST_DISABLE_CACHE ?= 0
1011
E2E_FIXTURE ?=
1112
E2E_PARALLEL ?= 8
1213
E2E_RUN ?=
@@ -50,6 +51,9 @@ cluster_test_args=-tags=cluster_test -count=1 -timeout=10m \
5051
$(if $(CLUSTER_TEST_IMAGE_TAG),-image-tag $(CLUSTER_TEST_IMAGE_TAG)) \
5152
$(if $(CLUSTER_TEST_DATA_DIR),-data-dir $(CLUSTER_TEST_DATA_DIR))
5253

54+
test_disable_cache=$(if $(filter 1,$(TEST_DISABLE_CACHE)),-count=1)
55+
workflows_backend_skip=-skip=Test_EtcdBackendE2E/AutoExpiration/StartsWorkflowAndRemoves
56+
5357
# Automatically adds junit output named after the rule, e.g.
5458
# 'test-e2e-results.xml' in CI environment.
5559
gotestsum=$(gobin)/gotestsum \
@@ -69,7 +73,9 @@ test:
6973
$(gotestsum) \
7074
--format-hide-empty-pkg \
7175
--rerun-fails=$(TEST_RERUN_FAILS) \
72-
--packages='./...'
76+
--packages='./...' \
77+
-- \
78+
$(test_disable_cache)
7379

7480
.PHONY: test-etcd
7581
test-etcd-lifecycle:
@@ -78,16 +84,25 @@ test-etcd-lifecycle:
7884
--rerun-fails=$(TEST_RERUN_FAILS) \
7985
--packages='./server/internal/etcd/...' \
8086
-- \
87+
$(test_disable_cache) \
8188
-tags=etcd_lifecycle_test
8289

90+
# We skip StartsWorkflowAndRemoves because it contains a race condition that's
91+
# much more prevalent now that we're executing workflows more quickly. This test
92+
# uses the "autoexpire" feature to remove workflows that are older than 1
93+
# millisecond. It starts a workflow, waits for the result, and then waits for
94+
# the workflow to be removed. Occasionally, the workflow gets removed while the
95+
# "waiting for result" step is still polling the workflow status.
8396
.PHONY: test-workflows-backend
8497
test-workflows-backend:
8598
$(gotestsum) \
8699
--format-hide-empty-pkg \
87100
--rerun-fails=$(TEST_RERUN_FAILS) \
88101
--packages='./server/internal/workflows/backend/etcd/...' \
89102
-- \
90-
-tags=workflows_backend_test
103+
$(test_disable_cache) \
104+
-tags=workflows_backend_test \
105+
$(workflows_backend_skip)
91106

92107
.PHONY: test-ci
93108
test-ci:
@@ -97,7 +112,9 @@ test-ci:
97112
--rerun-fails=$(TEST_RERUN_FAILS) \
98113
--packages='./...' \
99114
-- \
100-
-tags=workflows_backend_test,etcd_lifecycle_test
115+
-count=1 \
116+
-tags=workflows_backend_test,etcd_lifecycle_test \
117+
$(workflows_backend_skip)
101118

102119
.PHONY: test-e2e
103120
test-e2e:

server/internal/app/app.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ func (a *App) runInitialized(parentCtx context.Context) error {
203203
if err := worker.Start(a.serviceCtx); err != nil {
204204
return handleError(fmt.Errorf("failed to start worker: %w", err))
205205
}
206+
a.addErrorProducer(parentCtx, worker)
206207

207208
if err := a.api.ServePostInit(a.serviceCtx); err != nil {
208209
return handleError(fmt.Errorf("failed to serve post-init API: %w", err))

server/internal/workflows/backend/etcd/activity_queue_item/store.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package activity_queue_item
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/cschleiden/go-workflows/backend/history"
@@ -23,9 +24,12 @@ func (v *Value) UpdateLastLocked() {
2324
v.LastLocked = &now
2425
}
2526

27+
// Store is a storage implementation for activity queue items. StartCache must
28+
// be called before the store can be used.
2629
type Store struct {
2730
client *clientv3.Client
2831
root string
32+
cache storage.Cache[*Value]
2933
}
3034

3135
func NewStore(client *clientv3.Client, root string) *Store {
@@ -47,36 +51,53 @@ func (s *Store) Key(queue, instanceID, eventID string) string {
4751
return storage.Key(s.QueuePrefix(queue), instanceID, eventID)
4852
}
4953

54+
func (s *Store) StartCache(ctx context.Context) error {
55+
if s.cache != nil {
56+
return nil
57+
}
58+
s.cache = storage.NewCache(s.client, s.AllQueuesPrefix(), func(item *Value) string {
59+
return s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID)
60+
})
61+
return s.cache.Start(ctx)
62+
}
63+
64+
func (s *Store) StopCache() {
65+
if s.cache != nil {
66+
s.cache.Stop()
67+
}
68+
}
69+
70+
func (s *Store) PropagateErrors(ctx context.Context, ch chan error) {
71+
s.cache.PropagateErrors(ctx, ch)
72+
}
73+
5074
func (s *Store) GetAll() storage.GetMultipleOp[*Value] {
51-
return storage.NewGetPrefixOp[*Value](s.client, s.AllQueuesPrefix())
75+
return s.cache.GetPrefix(s.AllQueuesPrefix())
5276
}
5377

5478
func (s *Store) GetByKey(queue, instanceID, eventID string) storage.GetOp[*Value] {
5579
key := s.Key(queue, instanceID, eventID)
56-
return storage.NewGetOp[*Value](s.client, key)
80+
return s.cache.Get(key)
5781
}
5882

5983
func (s *Store) GetByQueue(queue string) storage.GetMultipleOp[*Value] {
6084
prefix := s.QueuePrefix(queue)
61-
return storage.NewGetPrefixOp[*Value](s.client, prefix)
85+
return s.cache.GetPrefix(prefix)
6286
}
6387

6488
func (s *Store) Create(item *Value) storage.PutOp[*Value] {
65-
key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID)
66-
return storage.NewCreateOp(s.client, key, item)
89+
return s.cache.Create(item)
6790
}
6891

6992
func (s *Store) Update(item *Value) storage.PutOp[*Value] {
70-
key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID)
71-
return storage.NewUpdateOp(s.client, key, item)
93+
return s.cache.Update(item)
7294
}
7395

7496
func (s *Store) DeleteByKey(queue, instanceID, eventID string) storage.DeleteOp {
7597
key := s.Key(queue, instanceID, eventID)
76-
return storage.NewDeleteKeyOp(s.client, key)
98+
return s.cache.DeleteByKey(key)
7799
}
78100

79101
func (s *Store) DeleteItem(item *Value) storage.DeleteValueOp[*Value] {
80-
key := s.Key(item.Queue, item.WorkflowInstanceID, item.Event.ID)
81-
return storage.NewDeleteValueOp(s.client, key, item)
102+
return s.cache.DeleteValue(item)
82103
}

server/internal/workflows/backend/etcd/etcd.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"slices"
9+
"sync"
910
"time"
1011

1112
"github.com/cschleiden/go-workflows/backend"
@@ -34,6 +35,8 @@ type Backend struct {
3435
options *backend.Options
3536
workerID string
3637
workerInstanceID string
38+
workflowMu sync.Mutex
39+
activityMu sync.Mutex
3740
}
3841

3942
func NewBackend(store *Store, options *backend.Options, workerID string) *Backend {
@@ -45,6 +48,14 @@ func NewBackend(store *Store, options *backend.Options, workerID string) *Backen
4548
}
4649
}
4750

51+
func (b *Backend) StartCaches(ctx context.Context) error {
52+
return b.store.StartCaches(ctx)
53+
}
54+
55+
func (b *Backend) StopCaches() {
56+
b.store.StopCaches()
57+
}
58+
4859
func (b *Backend) CreateWorkflowInstance(ctx context.Context, instance *workflow.Instance, event *history.Event) error {
4960
// Check for existing active instance execution
5061
instances, err := b.store.WorkflowInstance.
@@ -244,6 +255,11 @@ func (b *Backend) PrepareActivityQueues(ctx context.Context, queues []workflow.Q
244255
}
245256

246257
func (b *Backend) GetWorkflowTask(ctx context.Context, queues []workflow.Queue) (*backend.WorkflowTask, error) {
258+
// This lock reduces unnecessary contention from concurrent calls within the
259+
// same worker.
260+
b.workflowMu.Lock()
261+
defer b.workflowMu.Unlock()
262+
247263
for _, queue := range queues {
248264
items, err := b.store.WorkflowQueueItem.
249265
GetByQueue(string(queue)).
@@ -578,6 +594,11 @@ func (b *Backend) CompleteWorkflowTask(
578594
}
579595

580596
func (b *Backend) GetActivityTask(ctx context.Context, queues []workflow.Queue) (*backend.ActivityTask, error) {
597+
// This lock reduces unnecessary contention from concurrent calls within the
598+
// same worker.
599+
b.activityMu.Lock()
600+
defer b.activityMu.Unlock()
601+
581602
for _, queue := range queues {
582603
items, err := b.store.ActivityQueueItem.
583604
GetByQueue(string(queue)).
@@ -795,6 +816,10 @@ func (b *Backend) FeatureSupported(feature backend.Feature) bool {
795816
return false
796817
}
797818

819+
func (b *Backend) Error() <-chan error {
820+
return b.store.Error()
821+
}
822+
798823
func sortPendingEvents(events []*pending_event.Value) {
799824
slices.SortStableFunc(events, func(a *pending_event.Value, b *pending_event.Value) int {
800825
if a.CreatedAt > 0 && b.CreatedAt > 0 {

server/internal/workflows/backend/etcd/etcd_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/google/uuid"
12+
"github.com/stretchr/testify/require"
1213

1314
"github.com/cschleiden/go-workflows/backend"
1415
"github.com/cschleiden/go-workflows/backend/history"
@@ -23,7 +24,10 @@ func Test_EtcdBackend(t *testing.T) {
2324

2425
test.BackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
2526
opts := backend.ApplyOptions(options...)
26-
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
27+
backend := NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
28+
require.NoError(t, backend.StartCaches(t.Context()))
29+
t.Cleanup(backend.StopCaches)
30+
return backend
2731
}, nil)
2832
}
2933

@@ -33,7 +37,10 @@ func Test_EtcdBackendE2E(t *testing.T) {
3337

3438
test.EndToEndBackendTest(t, func(options ...backend.BackendOption) test.TestBackend {
3539
opts := backend.ApplyOptions(options...)
36-
return NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
40+
backend := NewBackend(NewStore(client, uuid.NewString()), opts, uuid.NewString())
41+
require.NoError(t, backend.StartCaches(t.Context()))
42+
t.Cleanup(backend.StopCaches)
43+
return backend
3744
}, nil)
3845
}
3946

server/internal/workflows/backend/etcd/store.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package etcd
22

33
import (
4+
"context"
5+
"fmt"
6+
7+
clientv3 "go.etcd.io/etcd/client/v3"
8+
49
"github.com/pgEdge/control-plane/server/internal/storage"
510
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/activity_lock"
611
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/activity_queue_item"
@@ -10,11 +15,11 @@ import (
1015
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_instance_lock"
1116
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_instance_sticky"
1217
"github.com/pgEdge/control-plane/server/internal/workflows/backend/etcd/workflow_queue_item"
13-
clientv3 "go.etcd.io/etcd/client/v3"
1418
)
1519

1620
type Store struct {
1721
client *clientv3.Client
22+
errCh chan error
1823
ActivityLock *activity_lock.Store
1924
ActivityQueueItem *activity_queue_item.Store
2025
HistoryEvent *history_event.Store
@@ -28,6 +33,7 @@ type Store struct {
2833
func NewStore(client *clientv3.Client, root string) *Store {
2934
return &Store{
3035
client: client,
36+
errCh: make(chan error, 1),
3137
ActivityLock: activity_lock.NewStore(client, root),
3238
ActivityQueueItem: activity_queue_item.NewStore(client, root),
3339
HistoryEvent: history_event.NewStore(client, root),
@@ -42,3 +48,25 @@ func NewStore(client *clientv3.Client, root string) *Store {
4248
func (s *Store) Txn(ops ...storage.TxnOperation) storage.Txn {
4349
return storage.NewTxn(s.client, ops...)
4450
}
51+
52+
func (s *Store) StartCaches(ctx context.Context) error {
53+
if err := s.WorkflowQueueItem.StartCache(ctx); err != nil {
54+
return fmt.Errorf("failed to start workflow queue item cache: %w", err)
55+
}
56+
if err := s.ActivityQueueItem.StartCache(ctx); err != nil {
57+
return fmt.Errorf("failed to start activity queue item cache: %w", err)
58+
}
59+
s.WorkflowQueueItem.PropagateErrors(ctx, s.errCh)
60+
s.ActivityQueueItem.PropagateErrors(ctx, s.errCh)
61+
62+
return nil
63+
}
64+
65+
func (s *Store) StopCaches() {
66+
s.WorkflowQueueItem.StopCache()
67+
s.ActivityQueueItem.StopCache()
68+
}
69+
70+
func (s *Store) Error() <-chan error {
71+
return s.errCh
72+
}

0 commit comments

Comments
 (0)