Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/update-azeventhub-stable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/azureeventhub

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Upgrade `receiver.azureeventhubreceiver.UseAzeventhubs` feature gate to stable.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [45527]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
11 changes: 2 additions & 9 deletions receiver/azureeventhubreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Event Hub, transforms them, and pushes them through the collector pipeline.
A string describing the connection to an Azure event hub. Ignored if `auth` is specified.

### event_hub
This section is required when using `auth`. Only applicable when feature gate `receiver.azureeventhubreceiver.UseAzeventhubs` is enabled.
This section is required when using `auth`.

#### name (Required when using auth)
The name of the Event Hub.
Expand All @@ -42,7 +42,7 @@ The fully qualified namespace (e.g., `namespace.servicebus.windows.net`).

### auth (Optional)
The ID of an authentication extension to use. This can be used to authenticate using Azure Active Directory (AAD) pod identity,
managed identity, or service principal. Only supported when feature gate `receiver.azureeventhubreceiver.UseAzeventhubs` is enabled.
managed identity, or service principal.
When this field is set, `connection` is ignored and `event_hub` section is required.

### group (Optional)
Expand Down Expand Up @@ -93,13 +93,6 @@ these datapoints.

Default: `nil`

> [!NOTE]
> You can opt out of using the [`azeventhubs`](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs) sdk by disabling the feature gate
> `receiver.azureeventhubreceiver.UseAzeventhubs` when you run the OpenTelemetry Collector. See the following page
> for more details: [Feature Gates](https://github.com/open-telemetry/opentelemetry-collector/tree/main/featuregate#controlling-gates)
>
> The following configuration options can only be used with this feature flag enabled

### max_poll_events (optional)
Specifies the maximum number of events to retrieve in a single poll from the Event Hub.

Expand Down
13 changes: 1 addition & 12 deletions receiver/azureeventhubreceiver/azure_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,28 @@ package azureeventhubreceiver // import "github.com/open-telemetry/opentelemetry
import (
"time"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)

type azureEvent struct {
EventHubEvent *eventhub.Event
AzEventData *azeventhubs.ReceivedEventData
AzEventData *azeventhubs.ReceivedEventData
}

func (a *azureEvent) EnqueueTime() *time.Time {
if a.EventHubEvent != nil {
return a.EventHubEvent.SystemProperties.EnqueuedTime
}
if a.AzEventData != nil {
return a.AzEventData.EnqueuedTime
}
return nil
}

func (a *azureEvent) Properties() map[string]any {
if a.EventHubEvent != nil {
return a.EventHubEvent.Properties
}
if a.AzEventData != nil {
return a.AzEventData.Properties
}
return nil
}

func (a *azureEvent) Data() []byte {
if a.EventHubEvent != nil {
return a.EventHubEvent.Data
}
if a.AzEventData != nil {
return a.AzEventData.Body
}
Expand Down
28 changes: 2 additions & 26 deletions receiver/azureeventhubreceiver/azure_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,12 @@ import (
"testing"
"time"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"github.com/stretchr/testify/assert"
)

func TestAzureEvent_EnqueueTime(t *testing.T) {
now := time.Now()
t.Run("EventHubEvent non-nil", func(t *testing.T) {
ev := &eventhub.Event{
SystemProperties: &eventhub.SystemProperties{
EnqueuedTime: &now,
},
}
a := azureEvent{EventHubEvent: ev}
assert.Equal(t, &now, a.EnqueueTime())
})
t.Run("AzEventData non-nil", func(t *testing.T) {
ev := &azeventhubs.ReceivedEventData{
EnqueuedTime: &now,
Expand All @@ -41,13 +31,6 @@ func TestAzureEvent_Properties(t *testing.T) {
"key1": "value1",
"key2": 2,
}
t.Run("EventHubEvent non-nil", func(t *testing.T) {
ev := &eventhub.Event{
Properties: props,
}
a := azureEvent{EventHubEvent: ev}
assert.Equal(t, props, a.Properties())
})
t.Run("AzEventData non-nil", func(t *testing.T) {
ev := &azeventhubs.ReceivedEventData{
EventData: azeventhubs.EventData{
Expand All @@ -57,21 +40,14 @@ func TestAzureEvent_Properties(t *testing.T) {
a := azureEvent{AzEventData: ev}
assert.Equal(t, props, a.Properties())
})
t.Run("Both nil", func(t *testing.T) {
t.Run("nil", func(t *testing.T) {
a := azureEvent{}
assert.Nil(t, a.Properties())
})
}

func TestAzureEvent_Data(t *testing.T) {
data := []byte("Testing azure events")
t.Run("EventHubEvent non-nil", func(t *testing.T) {
ev := &eventhub.Event{
Data: data,
}
a := azureEvent{EventHubEvent: ev}
assert.Equal(t, data, a.Data())
})
t.Run("AzEventData non-nil", func(t *testing.T) {
ev := &azeventhubs.ReceivedEventData{
EventData: azeventhubs.EventData{
Expand All @@ -81,7 +57,7 @@ func TestAzureEvent_Data(t *testing.T) {
a := azureEvent{AzEventData: ev}
assert.Equal(t, data, a.Data())
})
t.Run("Both nil", func(t *testing.T) {
t.Run("nil", func(t *testing.T) {
a := azureEvent{}
assert.Nil(t, a.Data())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package azureeventhubreceiver
import (
"testing"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
Expand Down Expand Up @@ -56,7 +56,7 @@ var encodedMetrics = `{"records":[
]}`

func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) {
event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}}
event := azureEvent{AzEventData: &azeventhubs.ReceivedEventData{EventData: azeventhubs.EventData{Body: []byte(encodedMetrics)}}}
logger := zap.NewNop()
unmarshaler := newAzureResourceMetricsUnmarshaler(
component.BuildInfo{
Expand All @@ -74,7 +74,7 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalMixedMetrics(t *testing.T) {
}

func TestAzureResourceMetricsUnmarshaler_UnmarshalAppMetricsWithAttributes(t *testing.T) {
event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}}
event := azureEvent{AzEventData: &azeventhubs.ReceivedEventData{EventData: azeventhubs.EventData{Body: []byte(encodedMetrics)}}}
logger := zap.NewNop()
unmarshaler := newAzureResourceMetricsUnmarshaler(
component.BuildInfo{
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestAzureResourceMetricsUnmarshaler_UnmarshalAppMetricsWithAttributes(t *te
}

func TestAzureResourceMetricsUnmarshaler_UnmarshalAggregatedAppMetrics(t *testing.T) {
event := azureEvent{EventHubEvent: &eventhub.Event{Data: []byte(encodedMetrics)}}
event := azureEvent{AzEventData: &azeventhubs.ReceivedEventData{EventData: azeventhubs.EventData{Body: []byte(encodedMetrics)}}}
logger := zap.NewNop()
unmarshaler := newAzureResourceMetricsUnmarshaler(
component.BuildInfo{
Expand Down
25 changes: 4 additions & 21 deletions receiver/azureeventhubreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"

"github.com/Azure/azure-amqp-common-go/v4/conn"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
"go.opentelemetry.io/collector/component"
)
Expand All @@ -21,9 +20,8 @@ const (
)

var (
validFormats = []logFormat{defaultLogFormat, rawLogFormat, azureLogFormat}
errMissingConnection = errors.New("missing connection")
errFeatureGateRequired = fmt.Errorf("poll_rate and max_poll_events can only be used with %s enabled", azEventHubFeatureGateName)
validFormats = []logFormat{defaultLogFormat, rawLogFormat, azureLogFormat}
errMissingConnection = errors.New("missing connection")
)

type Config struct {
Expand Down Expand Up @@ -61,15 +59,7 @@ type TimeFormat struct {

// Validate config
func (config *Config) Validate() error {
if !azEventHubFeatureGate.IsEnabled() &&
(config.PollRate != 0 || config.MaxPollEvents != 0) {
return errFeatureGateRequired
}

if config.Auth != nil {
if !azEventHubFeatureGate.IsEnabled() {
return fmt.Errorf("auth can only be used with %s enabled", azEventHubFeatureGateName)
}
if config.EventHub.Name == "" {
return errors.New("event_hub.name is required when using auth")
}
Expand All @@ -80,15 +70,8 @@ func (config *Config) Validate() error {
if config.Connection == "" {
return errMissingConnection
}

if azEventHubFeatureGate.IsEnabled() {
if _, err := azeventhubs.ParseConnectionString(config.Connection); err != nil {
return err
}
} else {
if _, err := conn.ParsedConnectionFromStr(config.Connection); err != nil {
return err
}
if _, err := azeventhubs.ParseConnectionString(config.Connection); err != nil {
return err
}
}

Expand Down
26 changes: 3 additions & 23 deletions receiver/azureeventhubreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/confmap/xconfmap"
"go.opentelemetry.io/collector/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureeventhubreceiver/internal/metadata"
)
Expand All @@ -28,7 +27,6 @@ func TestLoadConfig(t *testing.T) {
tests := []struct {
id component.ID
expected component.Config
featureGateEnabled bool
expectedErrContains string
}{
{
Expand All @@ -38,8 +36,7 @@ func TestLoadConfig(t *testing.T) {
},
},
{
id: component.NewIDWithName(metadata.Type, "auth"),
featureGateEnabled: true,
id: component.NewIDWithName(metadata.Type, "auth"),
expected: &Config{
EventHub: EventHubConfig{
Name: "hubName",
Expand All @@ -56,51 +53,34 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid_connection_string"),
expectedErrContains: "failed parsing connection string",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_connection_string_with_gate"),
featureGateEnabled: true,
expectedErrContains: "failed parsing connection string",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_format"),
expectedErrContains: "invalid format",
},
{
id: component.NewIDWithName(metadata.Type, "offset_with_partition"),
id: component.NewIDWithName(metadata.Type, "offset_without_partition"),
expectedErrContains: "cannot use 'offset' without 'partition'",
},
{
id: component.NewIDWithName(metadata.Type, "offset_without_partition"),
id: component.NewIDWithName(metadata.Type, "offset_with_partition"),
expected: &Config{
Connection: "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName",
Partition: "foo",
Offset: "1234-5566",
},
},
{
id: component.NewIDWithName(metadata.Type, "feature_gate_exclusive_config"),
expectedErrContains: "poll_rate and max_poll_events can only be used with receiver.azureeventhubreceiver.UseAzeventhubs enabled",
},
{
id: component.NewIDWithName(metadata.Type, "auth_missing_event_hub_name"),
featureGateEnabled: true,
expectedErrContains: "event_hub.name is required when using auth",
},
{
id: component.NewIDWithName(metadata.Type, "auth_missing_namespace"),
featureGateEnabled: true,
expectedErrContains: "event_hub.namespace is required when using auth",
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
prev := azEventHubFeatureGate.IsEnabled()
require.NoError(t, featuregate.GlobalRegistry().Set(azEventHubFeatureGateName, tt.featureGateEnabled))
defer func() {
require.NoError(t, featuregate.GlobalRegistry().Set(azEventHubFeatureGateName, prev))
}()

factory := NewFactory()
cfg := factory.CreateDefaultConfig()

Expand Down
16 changes: 4 additions & 12 deletions receiver/azureeventhubreceiver/eventhubhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,11 @@ func (h *eventhubHandler) run(ctx context.Context, host component.Host) error {
}

if h.hub == nil { // set manually for testing.
if azEventHubFeatureGate.IsEnabled() {
newHub, err := newAzeventhubWrapper(h, host)
if err != nil {
return err
}
h.hub = newHub
} else {
newHub, err := newLegacyHubWrapper(h)
if err != nil {
return err
}
h.hub = newHub
newHub, err := newAzeventhubWrapper(h, host)
if err != nil {
return err
}
h.hub = newHub
}

if h.config.Partition != "" {
Expand Down
Loading