Skip to content

Commit b7d9525

Browse files
dyl10sseongpil0948
authored andcommitted
[receiver/azureeventhubreceiver] Update default checkpoint when storage is enabled and storage compatibility (open-telemetry#44461)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description - When storage was enabled for either the legacy or the new implementation, the receiver would start at the beginning of the stream when it should be starting at the end of the stream. - Updated the new SDK checkpoint storage to be compatible with the legacy storage format. This makes sure there are no negative side effects from upgrading. The new SDK is still under a feature flag. <!--Describe what testing was performed and which tests were added.--> #### Testing - Make sure that both with `legacy` and `azeventhub` when storage is enabled and nothing is stored yet, that the default checkpoint is the latest. Tests were added to validate this case. - Verify that after running the current (legacy) version, you can enable the `ezeventhub` feature flag and the checkpoint is correctly resumed.
1 parent 392cbdb commit b7d9525

File tree

5 files changed

+260
-33
lines changed

5 files changed

+260
-33
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/azureeventhub
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Make storage of new azeventhub library backward compatible and fix checkpoint starting at earliest when storage is enabled
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [44461]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/azureeventhubreceiver/eventhubhandler_azeventhub.go

Lines changed: 83 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,41 @@ package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry
55

66
import (
77
"context"
8+
"encoding/json"
89
"errors"
10+
"strings"
911
"sync"
1012
"time"
1113

1214
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
1315
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
16+
"go.opentelemetry.io/collector/extension/xextension/storage"
1417
"go.uber.org/zap"
1518
)
1619

1720
type checkpointSeqNumber struct {
18-
SeqNumber int64 `json:"seqNumber"`
21+
// Offset only used for backwards compatibility
22+
Offset string `json:"offset"`
23+
SequenceNumber int64 `json:"sequenceNumber"`
24+
}
25+
26+
// UnmarshalJSON is a custom unmarshaller to allow for backward compatibility
27+
// with the sequence number field
28+
func (c *checkpointSeqNumber) UnmarshalJSON(data []byte) error {
29+
// Primary struct shape
30+
type Alias checkpointSeqNumber
31+
var tmp struct {
32+
Alias
33+
SeqNumber *int64 `json:"seqNumber"` // fallback
34+
}
35+
if err := json.Unmarshal(data, &tmp); err != nil {
36+
return err
37+
}
38+
*c = checkpointSeqNumber(tmp.Alias)
39+
if tmp.SeqNumber != nil {
40+
c.SequenceNumber = *tmp.SeqNumber
41+
}
42+
return nil
1943
}
2044

2145
type azPartitionClient interface {
@@ -45,23 +69,25 @@ func newAzeventhubWrapper(h *eventhubHandler) (*hubWrapperAzeventhubImpl, error)
4569
return nil, newHubErr
4670
}
4771

48-
var storage *storageCheckpointPersister[checkpointSeqNumber]
49-
if h.storageClient != nil {
50-
storage = &storageCheckpointPersister[checkpointSeqNumber]{
51-
storageClient: h.storageClient,
52-
defaultValue: checkpointSeqNumber{
53-
SeqNumber: -1,
54-
},
55-
}
56-
}
57-
5872
return &hubWrapperAzeventhubImpl{
5973
hub: azEventHubWrapper{hub},
6074
config: h.config,
61-
storage: storage,
75+
storage: getStorageCheckpointPersister(h.storageClient),
6276
}, nil
6377
}
6478

79+
func getStorageCheckpointPersister(storageClient storage.Client) *storageCheckpointPersister[checkpointSeqNumber] {
80+
if storageClient == nil {
81+
return nil
82+
}
83+
return &storageCheckpointPersister[checkpointSeqNumber]{
84+
storageClient: storageClient,
85+
defaultValue: checkpointSeqNumber{
86+
SequenceNumber: -1,
87+
},
88+
}
89+
}
90+
6591
type azEventHubWrapper struct {
6692
*azeventhubs.ConsumerClient
6793
}
@@ -121,18 +147,13 @@ func (h *hubWrapperAzeventhubImpl) Receive(ctx context.Context, partitionID stri
121147
if err != nil {
122148
return nil, err
123149
}
124-
startPos := azeventhubs.StartPosition{Latest: to.Ptr(true)}
125-
if applyOffset && h.config.Offset != "" {
126-
startPos = azeventhubs.StartPosition{Offset: &h.config.Offset}
127-
}
128-
if h.storage != nil {
129-
checkpoint, readErr := h.storage.Read(namespace, pProps.EventHubName, h.config.ConsumerGroup, partitionID)
130-
if readErr == nil {
131-
startPos = azeventhubs.StartPosition{
132-
SequenceNumber: &checkpoint.SeqNumber,
133-
}
134-
}
135-
}
150+
startPos := h.getStartPos(
151+
applyOffset,
152+
namespace,
153+
pProps.EventHubName,
154+
getConsumerGroup(h.config),
155+
partitionID,
156+
)
136157
pc, err := h.hub.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
137158
StartPosition: startPos,
138159
})
@@ -185,8 +206,8 @@ func (h *hubWrapperAzeventhubImpl) Receive(ctx context.Context, partitionID stri
185206

186207
if h.storage != nil {
187208
err := h.storage.Write(
188-
namespace, pProps.EventHubName, h.config.ConsumerGroup, partitionID, checkpointSeqNumber{
189-
SeqNumber: lastEvent.SequenceNumber,
209+
namespace, pProps.EventHubName, getConsumerGroup(h.config), partitionID, checkpointSeqNumber{
210+
SequenceNumber: lastEvent.SequenceNumber,
190211
},
191212
)
192213
if err != nil {
@@ -210,13 +231,48 @@ func (h *hubWrapperAzeventhubImpl) Close(ctx context.Context) error {
210231
return errNoConfig
211232
}
212233

234+
func (h *hubWrapperAzeventhubImpl) getStartPos(
235+
applyOffset bool,
236+
namespace string,
237+
eventHubName string,
238+
consumerGroup string,
239+
partitionID string,
240+
) azeventhubs.StartPosition {
241+
startPos := azeventhubs.StartPosition{Latest: to.Ptr(true)}
242+
if applyOffset && h.config.Offset != "" {
243+
startPos = azeventhubs.StartPosition{Offset: &h.config.Offset}
244+
}
245+
if h.storage != nil {
246+
checkpoint, readErr := h.storage.Read(
247+
namespace,
248+
eventHubName,
249+
consumerGroup,
250+
partitionID,
251+
)
252+
// Only apply the checkpoint seq number offset if we have one saved
253+
if readErr == nil && checkpoint.SequenceNumber != -1 && checkpoint.Offset != "@latest" {
254+
startPos = azeventhubs.StartPosition{
255+
SequenceNumber: &checkpoint.SequenceNumber,
256+
}
257+
}
258+
}
259+
260+
return startPos
261+
}
262+
213263
func (h *hubWrapperAzeventhubImpl) namespace() (string, error) {
214264
parsed, err := azeventhubs.ParseConnectionString(h.config.Connection)
215265
if err != nil {
216266
return "", err
217267
}
218268

219-
return parsed.FullyQualifiedNamespace, nil
269+
// Return the first part of the namespace
270+
// Ex: example.servicebus.windows.net => example
271+
n := parsed.FullyQualifiedNamespace
272+
if s := strings.Split(n, "."); len(s) > 0 {
273+
n = s[0]
274+
}
275+
return n, nil
220276
}
221277

222278
type partitionListener struct {

receiver/azureeventhubreceiver/eventhubhandler_azeventhub_test.go

Lines changed: 138 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"testing"
1111
"time"
1212

13+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
1314
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
@@ -205,7 +206,7 @@ func TestHubWrapperAzeventhubImpl_Namespace(t *testing.T) {
205206

206207
namespace, err := mockHubWrapper.namespace()
207208
require.NoError(t, err)
208-
assert.Equal(t, "test.servicebus.windows.net", namespace)
209+
assert.Equal(t, "test", namespace)
209210

210211
mockHubWrapper = &hubWrapperAzeventhubImpl{
211212
hub: &mockAzeventHub{},
@@ -255,3 +256,139 @@ func TestGetConsumerGroup(t *testing.T) {
255256
})
256257
}
257258
}
259+
260+
func TestStartPos(t *testing.T) {
261+
testCases := []struct {
262+
enableStorage bool
263+
applyOffset bool
264+
offset string
265+
namespace string
266+
eventHubName string
267+
consumerGroup string
268+
partitionID string
269+
270+
storageClient *mockStorageClient
271+
272+
expectedSeqNumber *int64
273+
expectedLatest *bool
274+
expectedOffset *string
275+
}{
276+
{
277+
enableStorage: false,
278+
applyOffset: true,
279+
offset: "10",
280+
namespace: "test",
281+
eventHubName: "name",
282+
consumerGroup: "cg",
283+
partitionID: "0",
284+
285+
expectedOffset: to.Ptr("10"),
286+
},
287+
{
288+
enableStorage: false,
289+
applyOffset: false,
290+
offset: "10",
291+
namespace: "test",
292+
eventHubName: "name",
293+
consumerGroup: "cg",
294+
partitionID: "0",
295+
296+
expectedLatest: to.Ptr(true),
297+
},
298+
{
299+
enableStorage: false,
300+
applyOffset: false,
301+
offset: "",
302+
namespace: "test",
303+
eventHubName: "name",
304+
consumerGroup: "cg",
305+
partitionID: "0",
306+
307+
expectedLatest: to.Ptr(true),
308+
},
309+
{
310+
enableStorage: true,
311+
applyOffset: false,
312+
offset: "",
313+
namespace: "test",
314+
eventHubName: "name",
315+
consumerGroup: "cg",
316+
partitionID: "0",
317+
318+
expectedLatest: to.Ptr(true),
319+
},
320+
{
321+
enableStorage: true,
322+
applyOffset: true,
323+
offset: "10",
324+
namespace: "test",
325+
eventHubName: "name",
326+
consumerGroup: "cg",
327+
partitionID: "0",
328+
329+
expectedOffset: to.Ptr("10"),
330+
},
331+
{
332+
enableStorage: true,
333+
applyOffset: false,
334+
offset: "",
335+
namespace: "test",
336+
eventHubName: "name",
337+
consumerGroup: "cg",
338+
partitionID: "0",
339+
storageClient: &mockStorageClient{
340+
storage: map[string][]byte{
341+
"test/name/cg/0": []byte(`{"sequenceNumber": 100}`),
342+
},
343+
},
344+
345+
expectedSeqNumber: to.Ptr(int64(100)),
346+
},
347+
{
348+
enableStorage: true,
349+
applyOffset: false,
350+
offset: "",
351+
namespace: "test",
352+
eventHubName: "name",
353+
consumerGroup: "cg",
354+
partitionID: "0",
355+
storageClient: &mockStorageClient{
356+
storage: map[string][]byte{
357+
"test/name/cg/0": []byte(`{"seqNumber": 200}`),
358+
},
359+
},
360+
361+
expectedSeqNumber: to.Ptr(int64(200)),
362+
},
363+
}
364+
365+
for _, test := range testCases {
366+
h := hubWrapperAzeventhubImpl{}
367+
h.config = &Config{
368+
Offset: test.offset,
369+
}
370+
if test.enableStorage {
371+
s := &mockStorageClient{}
372+
if test.storageClient != nil {
373+
s = test.storageClient
374+
}
375+
h.storage = getStorageCheckpointPersister(s)
376+
}
377+
startPos := h.getStartPos(
378+
test.applyOffset,
379+
test.namespace,
380+
test.eventHubName,
381+
test.consumerGroup,
382+
test.partitionID,
383+
)
384+
if test.expectedSeqNumber != nil {
385+
require.Equal(t, *test.expectedSeqNumber, *startPos.SequenceNumber)
386+
}
387+
if test.expectedLatest != nil {
388+
require.Equal(t, *test.expectedLatest, *startPos.Latest)
389+
}
390+
if test.expectedOffset != nil {
391+
require.Equal(t, *test.expectedOffset, *startPos.Offset)
392+
}
393+
}
394+
}

receiver/azureeventhubreceiver/eventhubhandler_legacy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func newLegacyHubWrapper(h *eventhubHandler) (*hubWrapperLegacyImpl, error) {
2424
eventhub.HubWithOffsetPersistence(
2525
&storageCheckpointPersister[persist.Checkpoint]{
2626
storageClient: h.storageClient,
27-
defaultValue: persist.NewCheckpointFromStartOfStream(),
27+
defaultValue: persist.NewCheckpointFromEndOfStream(),
2828
},
2929
),
3030
)

receiver/azureeventhubreceiver/eventhubhandler_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,17 +145,24 @@ func TestShouldInitializeStorageClient(t *testing.T) {
145145
}
146146
}
147147

148-
type mockStorageClient struct{}
148+
type mockStorageClient struct {
149+
storage map[string][]byte
150+
}
149151

150-
func (*mockStorageClient) Get(_ context.Context, _ string) ([]byte, error) {
152+
func (m *mockStorageClient) Get(_ context.Context, key string) ([]byte, error) {
153+
if len(m.storage[key]) > 0 {
154+
return m.storage[key], nil
155+
}
151156
return nil, nil
152157
}
153158

154-
func (*mockStorageClient) Set(_ context.Context, _ string, _ []byte) error {
159+
func (m *mockStorageClient) Set(_ context.Context, key string, val []byte) error {
160+
m.storage[key] = val
155161
return nil
156162
}
157163

158-
func (*mockStorageClient) Delete(_ context.Context, _ string) error {
164+
func (m *mockStorageClient) Delete(_ context.Context, key string) error {
165+
m.storage[key] = []byte{}
159166
return nil
160167
}
161168

0 commit comments

Comments
 (0)