Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions internal/runtime/internal/controller/foreach_targets_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// controller_test package helps avoid a circular dependency when testing using discovery.Target capsules.
package controller_test

import (
"context"
"os"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/discovery"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/internal/controller"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/parser"
"github.com/grafana/alloy/syntax/vm"
)

func TestForeachCollectionTargetsUsesId(t *testing.T) {
config := `foreach "default" {
collection = targets
var = "each"
id = "selected_id"
template {
}
}`
moduleController := &moduleControllerStub{}
foreachConfigNode := controller.NewForeachConfigNode(getBlockFromConfig(t, config), getComponentGlobals(t, moduleController), nil)
vars := map[string]any{
"targets": []discovery.Target{
discovery.NewTargetFromMap(map[string]string{
"__address__": "192.0.2.10",
"selected_id": "8201",
"instance": "192.0.2.10",
}),
discovery.NewTargetFromMap(map[string]string{
"__address__": "198.51.100.24",
"selected_id": "8202",
"instance": "198.51.100.24",
}),
},
}
require.NoError(t, foreachConfigNode.Evaluate(vm.NewScope(vars)))
require.ElementsMatch(t, []string{"foreach_8201_1", "foreach_8202_1"}, moduleController.customComponents)
}

func getBlockFromConfig(t *testing.T, config string) *ast.BlockStmt {
file, err := parser.ParseFile("", []byte(config))
require.NoError(t, err)
return file.Body[0].(*ast.BlockStmt)
}

func getComponentGlobals(t *testing.T, moduleController controller.ModuleController) controller.ComponentGlobals {
l, _ := logging.New(os.Stderr, logging.DefaultOptions)
return controller.ComponentGlobals{
Logger: l,
TraceProvider: noop.NewTracerProvider(),
DataPath: t.TempDir(),
MinStability: featuregate.StabilityGenerallyAvailable,
OnBlockNodeUpdate: func(cn controller.BlockNode) { /* no-op */ },
Registerer: prometheus.NewRegistry(),
NewModuleController: func(opts controller.ModuleControllerOpts) controller.ModuleController {
return moduleController
},
}
}

type moduleControllerStub struct {
customComponents []string
}

func (m *moduleControllerStub) NewModule(id string, export component.ExportFunc) (component.Module, error) {
return nil, nil
}

func (m *moduleControllerStub) ModuleIDs() []string {
return nil
}

func (m *moduleControllerStub) NewCustomComponent(id string, export component.ExportFunc) (controller.CustomComponent, error) {
m.customComponents = append(m.customComponents, id)
return &customComponentStub{}, nil
}

type customComponentStub struct{}

func (c *customComponentStub) LoadBody(body ast.Body, args map[string]any, customComponentRegistry *controller.CustomComponentRegistry) error {
return nil
}

func (c *customComponentStub) Run(ctx context.Context) error {
return nil
}
68 changes: 61 additions & 7 deletions internal/runtime/internal/controller/node_config_foreach.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/alloy/internal/nodeconf/foreach"
"github.com/grafana/alloy/internal/runner"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/syntax"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/vm"
)
Expand Down Expand Up @@ -200,13 +201,9 @@ func (fn *ForeachConfigNode) evaluate(scope *vm.Scope) error {

// Extract Id from collection if exists
if args.Id != "" {
if m, ok := args.Collection[i].(map[string]any); ok {
if val, exists := m[args.Id]; exists {
// Use the field's value for fingerprinting
id = val
} else {
level.Warn(fn.logger).Log("msg", "specified id not found in collection item", "id", args.Id)
}
if val, ok := collectionItemID(args.Collection[i], args.Id, fn.logger); ok {
// Use the field's value for fingerprinting
id = val
}
}

Expand Down Expand Up @@ -444,6 +441,63 @@ func objectFingerprint(id any, hashId bool) string {
}
}

func collectionItemID(item any, key string, logger log.Logger) (any, bool) {
switch value := item.(type) {
case map[string]any:
// Inline object literals with simple values.
// Example: collection = [{name = "one", port = "8080"}, {name = "two", port = "8081"}]
val, ok := value[key]
if !ok {
logMissingCollectionID(logger, key)
return nil, false
}
return val, true
case map[string]string:
// Plain Go maps - used to be common, but are now replaced by Target capsules for performance.
// We keep it for maximum compatibility in case it's needed in the future.
val, ok := value[key]
if !ok {
logMissingCollectionID(logger, key)
return nil, false
}
return val, true
case map[string]syntax.Value:
// Inline object literals with expressions or computed values.
// Example: collection = [{name = "one", url = "http://" + hostname}]
val, ok := value[key]
if !ok {
logMissingCollectionID(logger, key)
return nil, false
}
return val.Interface(), true
case syntax.ConvertibleIntoCapsule:
// Capsules from component exports, such as discovery.Target.
// Example: collection = discovery.kubernetes.pods.targets
return collectionItemIDFromCapsule(value, key, logger)
default:
level.Debug(logger).Log("msg", "unsupported collection item type encountered in foreach", "item", fmt.Sprintf("%#v", item))
return nil, false
}
}

func collectionItemIDFromCapsule(value syntax.ConvertibleIntoCapsule, key string, logger log.Logger) (any, bool) {
var obj map[string]syntax.Value
if err := value.ConvertInto(&obj); err == nil {
val, ok := obj[key]
if ok {
return val.Interface(), true
}
logMissingCollectionID(logger, key)
return nil, false
}

return nil, false
}

func logMissingCollectionID(logger log.Logger, key string) {
level.Warn(logger).Log("msg", "specified id not found in collection item", "id", key)
}

func replaceNonAlphaNumeric(s string) string {
var builder strings.Builder
for _, r := range s {
Expand Down
Loading
Loading