From 65d2813c9c962d61611d0c419408df0090b30c37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Fri, 1 Dec 2023 19:53:28 -0800 Subject: [PATCH 1/7] Fix cleaner not discovering deleted users from global dir MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 1 + pkg/storage/tsdb/users_scanner.go | 25 ++++++++++++++++++++++++- pkg/storage/tsdb/users_scanner_test.go | 8 +++++--- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a53ec907731..903d3b90915 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,6 +114,7 @@ * [BUGFIX] Fix bug on objstore when configured to use S3 fips endpoints. #5540 * [BUGFIX] Ruler: Fix bug on ruler where a failure to load a single RuleGroup would prevent rulers to sync all RuleGroup. #5563 * [BUGFIX] Query Frontend: Fix query string being omitted in query stats log. #5655 +* [BUGFIX] Compactor: Fix cleaner not discovering deleted users from global dir. ## 1.15.3 2023-06-22 diff --git a/pkg/storage/tsdb/users_scanner.go b/pkg/storage/tsdb/users_scanner.go index 54f40bcc9dc..a448b004749 100644 --- a/pkg/storage/tsdb/users_scanner.go +++ b/pkg/storage/tsdb/users_scanner.go @@ -38,6 +38,18 @@ func NewUsersScanner(bucketClient objstore.Bucket, isOwned func(userID string) ( // // If sharding is enabled, returned lists contains only the users owned by this instance. func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion []string, err error) { + deletedUsers := make(map[string]struct{}) + + // Discovering the deleted users from the global markers directory. + err = s.bucketClient.Iter(ctx, util.GlobalMarkersDir, func(entry string) error { + user := strings.TrimSuffix(entry, "") + deletedUsers[user] = struct{}{} + return nil + }) + if err != nil { + return nil, nil, err + } + err = s.bucketClient.Iter(ctx, "", func(entry string) error { users = append(users, strings.TrimSuffix(entry, "/")) return nil @@ -65,12 +77,23 @@ func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion level.Warn(s.logger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err) } else if deletionMarkExists { users = append(users[:ix], users[ix+1:]...) - markedForDeletion = append(markedForDeletion, userID) + deletedUsers[userID] = struct{}{} continue } ix++ } + for userID := range deletedUsers { + // Check if it's owned by this instance. + owned, err := s.isOwned(userID) + if err != nil { + level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) + } + if owned { + markedForDeletion = append(markedForDeletion, userID) + } + } + return users, markedForDeletion, nil } diff --git a/pkg/storage/tsdb/users_scanner_test.go b/pkg/storage/tsdb/users_scanner_test.go index 72503865aa4..7f0517da895 100644 --- a/pkg/storage/tsdb/users_scanner_test.go +++ b/pkg/storage/tsdb/users_scanner_test.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "github.com/cortexproject/cortex/pkg/storage/bucket" ) @@ -15,21 +16,22 @@ import ( func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2", "user-3", "user-4"}, nil) + bucketClient.MockIter("__markers__", []string{"user-5", "user-6", "user-7"}, nil) bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetGlobalDeletionMarkPath("user-3"), true, nil) bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), true, nil) isOwned := func(userID string) (bool, error) { - return userID == "user-1" || userID == "user-3", nil + return userID == "user-1" || userID == "user-3" || userID == "user-7", nil } s := NewUsersScanner(bucketClient, isOwned, log.NewNopLogger()) actual, deleted, err := s.ScanUsers(context.Background()) require.NoError(t, err) assert.Equal(t, []string{"user-1"}, actual) - assert.Equal(t, []string{"user-3"}, deleted) - + slices.Sort(deleted) + assert.Equal(t, []string{"user-3", "user-7"}, deleted) } func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDeletionCheckFailed(t *testing.T) { From aac7eccaf9649ef4dd39d8a1c25ba4b484c52228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Fri, 1 Dec 2023 21:01:10 -0800 Subject: [PATCH 2/7] Fix tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 2 +- pkg/compactor/compactor_test.go | 12 ++++++++++++ pkg/querier/blocks_finder_bucket_scan_test.go | 3 +++ pkg/storage/tsdb/users_scanner_test.go | 1 + tools/thanosconvert/thanosconvert_test.go | 16 ++++++++++++---- 5 files changed, 29 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 903d3b90915..0fb13247205 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,7 +114,7 @@ * [BUGFIX] Fix bug on objstore when configured to use S3 fips endpoints. #5540 * [BUGFIX] Ruler: Fix bug on ruler where a failure to load a single RuleGroup would prevent rulers to sync all RuleGroup. #5563 * [BUGFIX] Query Frontend: Fix query string being omitted in query stats log. #5655 -* [BUGFIX] Compactor: Fix cleaner not discovering deleted users from global dir. +* [BUGFIX] Compactor: Fix cleaner not discovering deleted users from global dir. #5691 ## 1.15.3 2023-06-22 diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 8998fa4fae5..4c41bca6507 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -170,6 +170,7 @@ func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) { // No user blocks stored in the bucket. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{userID}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil) @@ -198,6 +199,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { // No user blocks stored in the bucket. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) cfg := prepareConfig() c, _, _, logs, registry := prepare(t, cfg, bucketClient, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -348,6 +350,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket // Fail to iterate over the bucket while discovering users. bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("__markers__", nil, errors.New("failed to iterate the bucket")) bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket")) c, _, _, logs, registry := prepare(t, prepareConfig(), bucketClient, nil) @@ -501,6 +504,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( userID := "test-user" bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{userID}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D/meta.json", userID + "/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath(userID), false, nil) @@ -553,6 +557,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json"}, nil) @@ -598,6 +603,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) @@ -741,6 +747,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) @@ -866,6 +873,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) @@ -944,6 +952,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("__markers__", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockGet(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), `{"deletion_time": 1}`, nil) bucketClient.MockUpload(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), nil) @@ -1107,6 +1116,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-2"), false, nil) @@ -1215,6 +1225,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM // Mock the bucket to contain all users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", userIDs, nil) + bucketClient.MockIter("__markers__", []string{}, nil) for _, userID := range userIDs { bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) @@ -1321,6 +1332,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit // Mock the bucket to contain all users, each one with five blocks, 2 sets of overlapping blocks and 1 separate block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", userIDs, nil) + bucketClient.MockIter("__markers__", []string{}, nil) // Keys with a value greater than 1 will be groups that should be compacted groupHashes := make(map[uint32]int) diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index 21abdee8342..11e3cef206f 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -94,6 +94,7 @@ func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) { // Mock the storage to simulate a failure when reading objects. bucket.MockIter("", []string{"user-1"}, nil) + bucket.MockIter("__markers__", []string{""}, nil) bucket.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json"}, nil) bucket.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucket.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) @@ -139,6 +140,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyTenants(t *t // Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket. bucket := &bucket.ClientMock{} bucket.MockIter("", tenantIDs, nil) + bucket.MockIter("__markers__", []string{""}, nil) for _, tenantID := range tenantIDs { bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() { time.Sleep(time.Second) @@ -177,6 +179,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyBlocks(t *te // Mock the bucket to introduce a 1s sleep while syncing each block in the bucket. bucket := &bucket.ClientMock{} bucket.MockIter("", []string{"user-1"}, nil) + bucket.MockIter("__markers__", []string{""}, nil) bucket.MockIter("user-1/", blockPaths, nil) bucket.On("Exists", mock.Anything, mock.Anything).Return(false, nil).Run(func(args mock.Arguments) { // We return the meta.json doesn't exist, but introduce a 1s delay for each call. diff --git a/pkg/storage/tsdb/users_scanner_test.go b/pkg/storage/tsdb/users_scanner_test.go index 7f0517da895..8b9faba5801 100644 --- a/pkg/storage/tsdb/users_scanner_test.go +++ b/pkg/storage/tsdb/users_scanner_test.go @@ -39,6 +39,7 @@ func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDelet bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", expected, nil) + bucketClient.MockIter("__markers__", []string{}, nil) bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) diff --git a/tools/thanosconvert/thanosconvert_test.go b/tools/thanosconvert/thanosconvert_test.go index f59dd9da4f3..169196a105d 100644 --- a/tools/thanosconvert/thanosconvert_test.go +++ b/tools/thanosconvert/thanosconvert_test.go @@ -45,8 +45,10 @@ func TestThanosBlockConverter(t *testing.T) { assertions func(*testing.T, *bucket.ClientMock, Results, error) }{ { - name: "empty bucket is a noop", - bucketData: fakeBucket{}, + name: "empty bucket is a noop", + bucketData: fakeBucket{ + "__markers__": map[string]metadata.Meta{}, + }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { bkt.AssertNotCalled(t, "Get", mock.Anything, mock.Anything) bkt.AssertNotCalled(t, "Upload", mock.Anything, mock.Anything, mock.Anything) @@ -54,8 +56,11 @@ func TestThanosBlockConverter(t *testing.T) { }, }, { - name: "user with no blocks is a noop", - bucketData: fakeBucket{"user1": map[string]metadata.Meta{}}, + name: "user with no blocks is a noop", + bucketData: fakeBucket{ + "user1": map[string]metadata.Meta{}, + "__markers__": map[string]metadata.Meta{}, + }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { bkt.AssertNotCalled(t, "Get", mock.Anything, mock.Anything) bkt.AssertNotCalled(t, "Upload", mock.Anything, mock.Anything, mock.Anything) @@ -80,6 +85,7 @@ func TestThanosBlockConverter(t *testing.T) { "user3": map[string]metadata.Meta{ block1: cortexMeta("user3"), }, + "__markers__": map[string]metadata.Meta{}, }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { bkt.AssertNotCalled(t, "Upload", mock.Anything, mock.Anything, mock.Anything) @@ -114,6 +120,7 @@ func TestThanosBlockConverter(t *testing.T) { "user3": map[string]metadata.Meta{ block1: thanosMeta(), }, + "__markers__": map[string]metadata.Meta{}, }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { assert.Len(t, results, 3, "expected users in results") @@ -149,6 +156,7 @@ func TestThanosBlockConverter(t *testing.T) { blockWithUploadFailure: thanosMeta(), blockWithMalformedMeta: thanosMeta(), }, + "__markers__": map[string]metadata.Meta{}, }, assertions: func(t *testing.T, bkt *bucket.ClientMock, results Results, err error) { assert.Len(t, results["user1"].FailedBlocks, 1) From 71f2b7703f5b97a3b1ee84da50e6d61fefa86be1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Sat, 2 Dec 2023 12:15:16 -0800 Subject: [PATCH 3/7] Remove the prefix from entry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- pkg/storage/tsdb/users_scanner.go | 4 +++- pkg/storage/tsdb/users_scanner_test.go | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/storage/tsdb/users_scanner.go b/pkg/storage/tsdb/users_scanner.go index a448b004749..0186a9e2a41 100644 --- a/pkg/storage/tsdb/users_scanner.go +++ b/pkg/storage/tsdb/users_scanner.go @@ -42,7 +42,9 @@ func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion // Discovering the deleted users from the global markers directory. err = s.bucketClient.Iter(ctx, util.GlobalMarkersDir, func(entry string) error { - user := strings.TrimSuffix(entry, "") + // entry will be of the form __markers__// + parts := strings.Split(entry, objstore.DirDelim) + user := parts[1] deletedUsers[user] = struct{}{} return nil }) diff --git a/pkg/storage/tsdb/users_scanner_test.go b/pkg/storage/tsdb/users_scanner_test.go index 8b9faba5801..0a80c1146d3 100644 --- a/pkg/storage/tsdb/users_scanner_test.go +++ b/pkg/storage/tsdb/users_scanner_test.go @@ -15,8 +15,8 @@ import ( func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) { bucketClient := &bucket.ClientMock{} - bucketClient.MockIter("", []string{"user-1", "user-2", "user-3", "user-4"}, nil) - bucketClient.MockIter("__markers__", []string{"user-5", "user-6", "user-7"}, nil) + bucketClient.MockIter("", []string{"user-1/", "user-2/", "user-3/", "user-4/"}, nil) + bucketClient.MockIter("__markers__", []string{"__markers__/user-5/", "__markers__/user-6/", "__markers__/user-7/"}, nil) bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetGlobalDeletionMarkPath("user-3"), true, nil) From 815869da41cf0856578cfd1b5842e8643765f5c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Sun, 3 Dec 2023 19:34:34 -0800 Subject: [PATCH 4/7] fix tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- pkg/compactor/compactor_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 4c41bca6507..3a229813791 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -952,7 +952,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) // Mock the bucket to contain two users, each one with one block. bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) - bucketClient.MockIter("__markers__", []string{"user-1"}, nil) + bucketClient.MockIter("__markers__", []string{"__markers__/user-1/"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockGet(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), `{"deletion_time": 1}`, nil) bucketClient.MockUpload(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), nil) @@ -1939,6 +1939,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { // Mock the bucket bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) + bucketClient.MockIter("__markers__", []string{}, nil) ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) From 684f6559264c9370c4b97c72119215d530de994d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Sun, 3 Dec 2023 23:05:35 -0800 Subject: [PATCH 5/7] Fix tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- pkg/querier/blocks_finder_bucket_scan_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index 11e3cef206f..b502c8eed20 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -94,7 +94,7 @@ func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) { // Mock the storage to simulate a failure when reading objects. bucket.MockIter("", []string{"user-1"}, nil) - bucket.MockIter("__markers__", []string{""}, nil) + bucket.MockIter("__markers__", []string{}, nil) bucket.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json"}, nil) bucket.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) bucket.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) @@ -140,7 +140,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyTenants(t *t // Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket. bucket := &bucket.ClientMock{} bucket.MockIter("", tenantIDs, nil) - bucket.MockIter("__markers__", []string{""}, nil) + bucket.MockIter("__markers__", []string{}, nil) for _, tenantID := range tenantIDs { bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() { time.Sleep(time.Second) @@ -179,7 +179,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyBlocks(t *te // Mock the bucket to introduce a 1s sleep while syncing each block in the bucket. bucket := &bucket.ClientMock{} bucket.MockIter("", []string{"user-1"}, nil) - bucket.MockIter("__markers__", []string{""}, nil) + bucket.MockIter("__markers__", []string{}, nil) bucket.MockIter("user-1/", blockPaths, nil) bucket.On("Exists", mock.Anything, mock.Anything).Return(false, nil).Run(func(args mock.Arguments) { // We return the meta.json doesn't exist, but introduce a 1s delay for each call. From 0d20fce40c670d469741b3e66e65d1a3446f9c9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Mon, 4 Dec 2023 22:49:35 -0800 Subject: [PATCH 6/7] Address comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 1 - pkg/storage/tsdb/users_scanner.go | 45 +++++++++----------------- pkg/storage/tsdb/users_scanner_test.go | 4 ++- 3 files changed, 18 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fb13247205..a53ec907731 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,7 +114,6 @@ * [BUGFIX] Fix bug on objstore when configured to use S3 fips endpoints. #5540 * [BUGFIX] Ruler: Fix bug on ruler where a failure to load a single RuleGroup would prevent rulers to sync all RuleGroup. #5563 * [BUGFIX] Query Frontend: Fix query string being omitted in query stats log. #5655 -* [BUGFIX] Compactor: Fix cleaner not discovering deleted users from global dir. #5691 ## 1.15.3 2023-06-22 diff --git a/pkg/storage/tsdb/users_scanner.go b/pkg/storage/tsdb/users_scanner.go index 0186a9e2a41..a983fe40fff 100644 --- a/pkg/storage/tsdb/users_scanner.go +++ b/pkg/storage/tsdb/users_scanner.go @@ -38,63 +38,48 @@ func NewUsersScanner(bucketClient objstore.Bucket, isOwned func(userID string) ( // // If sharding is enabled, returned lists contains only the users owned by this instance. func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion []string, err error) { - deletedUsers := make(map[string]struct{}) + discoveredUsers := make(map[string]struct{}) - // Discovering the deleted users from the global markers directory. + // Discovering the users from the global markers directory including potentially active users. err = s.bucketClient.Iter(ctx, util.GlobalMarkersDir, func(entry string) error { // entry will be of the form __markers__// parts := strings.Split(entry, objstore.DirDelim) - user := parts[1] - deletedUsers[user] = struct{}{} + userID := parts[1] + discoveredUsers[userID] = struct{}{} return nil }) if err != nil { return nil, nil, err } + // Discovering other users in the bucket including potentially active users. err = s.bucketClient.Iter(ctx, "", func(entry string) error { - users = append(users, strings.TrimSuffix(entry, "/")) + userID := strings.TrimSuffix(entry, "/") + discoveredUsers[userID] = struct{}{} return nil }) if err != nil { return nil, nil, err } - // Check users for being owned by instance, and split users into non-deleted and deleted. - // We do these checks after listing all users, to improve cacheability of Iter (result is only cached at the end of Iter call). - for ix := 0; ix < len(users); { - userID := users[ix] - - // Check if it's owned by this instance. - owned, err := s.isOwned(userID) - if err != nil { + for userID := range discoveredUsers { + // Filter out users not owned by this instance. + if owned, err := s.isOwned(userID); err != nil { level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) } else if !owned { - users = append(users[:ix], users[ix+1:]...) continue } - deletionMarkExists, err := TenantDeletionMarkExists(ctx, s.bucketClient, userID) - if err != nil { + // Filter users marked for deletion + if deletionMarkExists, err := TenantDeletionMarkExists(ctx, s.bucketClient, userID); err != nil { level.Warn(s.logger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err) } else if deletionMarkExists { - users = append(users[:ix], users[ix+1:]...) - deletedUsers[userID] = struct{}{} + markedForDeletion = append(markedForDeletion, userID) continue } - ix++ - } - - for userID := range deletedUsers { - // Check if it's owned by this instance. - owned, err := s.isOwned(userID) - if err != nil { - level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) - } - if owned { - markedForDeletion = append(markedForDeletion, userID) - } + // The remaining are the active users owned by this instance. + users = append(users, userID) } return users, markedForDeletion, nil diff --git a/pkg/storage/tsdb/users_scanner_test.go b/pkg/storage/tsdb/users_scanner_test.go index 0a80c1146d3..18c044d49c9 100644 --- a/pkg/storage/tsdb/users_scanner_test.go +++ b/pkg/storage/tsdb/users_scanner_test.go @@ -20,7 +20,9 @@ func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) { bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil) bucketClient.MockExists(GetGlobalDeletionMarkPath("user-3"), true, nil) - bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), true, nil) + bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), false, nil) + bucketClient.MockExists(GetGlobalDeletionMarkPath("user-7"), false, nil) + bucketClient.MockExists(GetLocalDeletionMarkPath("user-7"), true, nil) isOwned := func(userID string) (bool, error) { return userID == "user-1" || userID == "user-3" || userID == "user-7", nil From b940f5be614657ee85020c192ec8356253a45612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Mon, 4 Dec 2023 23:12:43 -0800 Subject: [PATCH 7/7] Update comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- pkg/storage/tsdb/users_scanner.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/storage/tsdb/users_scanner.go b/pkg/storage/tsdb/users_scanner.go index a983fe40fff..2c0d463d259 100644 --- a/pkg/storage/tsdb/users_scanner.go +++ b/pkg/storage/tsdb/users_scanner.go @@ -38,31 +38,31 @@ func NewUsersScanner(bucketClient objstore.Bucket, isOwned func(userID string) ( // // If sharding is enabled, returned lists contains only the users owned by this instance. func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion []string, err error) { - discoveredUsers := make(map[string]struct{}) + scannedUsers := make(map[string]struct{}) - // Discovering the users from the global markers directory including potentially active users. - err = s.bucketClient.Iter(ctx, util.GlobalMarkersDir, func(entry string) error { - // entry will be of the form __markers__// - parts := strings.Split(entry, objstore.DirDelim) - userID := parts[1] - discoveredUsers[userID] = struct{}{} + // Scan users in the bucket. + err = s.bucketClient.Iter(ctx, "", func(entry string) error { + userID := strings.TrimSuffix(entry, "/") + scannedUsers[userID] = struct{}{} return nil }) if err != nil { return nil, nil, err } - // Discovering other users in the bucket including potentially active users. - err = s.bucketClient.Iter(ctx, "", func(entry string) error { - userID := strings.TrimSuffix(entry, "/") - discoveredUsers[userID] = struct{}{} + // Scan users from the __markers__ directory. + err = s.bucketClient.Iter(ctx, util.GlobalMarkersDir, func(entry string) error { + // entry will be of the form __markers__// + parts := strings.Split(entry, objstore.DirDelim) + userID := parts[1] + scannedUsers[userID] = struct{}{} return nil }) if err != nil { return nil, nil, err } - for userID := range discoveredUsers { + for userID := range scannedUsers { // Filter out users not owned by this instance. if owned, err := s.isOwned(userID); err != nil { level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err)