Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
462922b
Community ID or a network flow hash implementation.
mashhurs Oct 9, 2025
cc07ed2
Lint issues fixed.
mashhurs Oct 9, 2025
fc9d72e
Place back accidental change in test file.
mashhurs Oct 9, 2025
0b679d0
Lint issue fixed.
mashhurs Oct 9, 2025
249917d
make generate changes
mashhurs Oct 9, 2025
e85c251
Add more tests to cover for each protocol.
mashhurs Oct 9, 2025
93b2e7d
Split community ID main function into multiple functions for better r…
mashhurs Oct 30, 2025
b2d1766
Add docs
mashhurs Oct 30, 2025
b006aee
Merge branch 'main' into community-id-function
mashhurs Oct 30, 2025
3150604
Formatting.
mashhurs Oct 30, 2025
3599725
Linting the community_id.go file.
mashhurs Oct 30, 2025
8f4f2fb
Merge branch 'main' into community-id-function
edmocosta Oct 31, 2025
45dda0e
Merge branch 'main' into community-id-function
mashhurs Nov 3, 2025
06305da
Push after make generate
mashhurs Nov 3, 2025
c9bb1d0
Fix the lint and make generate agreement.
mashhurs Nov 3, 2025
0641e15
Apply suggestions from code review
mashhurs Nov 18, 2025
b134645
Adjust after code review changes apply.
mashhurs Nov 18, 2025
d00dbf2
Merge branch 'main' into community-id-function
mashhurs Nov 18, 2025
5037d54
Restructuring hash computation with the normalize and compute approach.
mashhurs Nov 18, 2025
3791dd8
Merge branch 'main' into community-id-function
mashhurs Nov 18, 2025
374eb92
Apply suggestions from code review
mashhurs Nov 19, 2025
ac642d5
Apply suggestions from code review
mashhurs Nov 19, 2025
17030a8
Merge branch 'main' into community-id-function
mashhurs Nov 19, 2025
48391b7
Merge branch 'main' into community-id-function
mashhurs Nov 20, 2025
c3f2404
Fix the format.
mashhurs Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Split community ID main function into multiple functions for better r…
…eadability. Validate int conversion cases to make sure we get desired value with desired byte spaces. Make protocol map as a value.
  • Loading branch information
mashhurs committed Oct 30, 2025
commit 93b2e7d07c961ba0e0158b805bb0e17ea4e9630c
198 changes: 114 additions & 84 deletions pkg/ottl/ottlfuncs/func_community_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package ottlfuncs // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs"

import (
"bytes"
"context"
//gosec:disable G505 -- SHA1 is intentionally used for generating unique identifiers
"crypto/sha1"
Expand All @@ -16,6 +17,15 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
)

var protocolMap = map[string]uint8{
"ICMP": 1,
"TCP": 6,
"UDP": 17,
"RSVP": 46,
"ICMP6": 58,
"SCTP": 132,
}

type CommunityIDArguments[K any] struct {
SourceIP ottl.StringGetter[K]
SourcePort ottl.IntGetter[K]
Expand Down Expand Up @@ -46,6 +56,15 @@ func createCommunityIDFunction[K any](_ ottl.FunctionContext, oArgs ottl.Argumen
), nil
}

type communityIDParams struct {
srcIPBytes []byte
dstIPBytes []byte
srcPort uint16
dstPort uint16
protocol uint8
seed uint16
}

func communityID[K any](
sourceIP ottl.StringGetter[K],
sourcePort ottl.IntGetter[K],
Expand All @@ -55,126 +74,137 @@ func communityID[K any](
seed ottl.Optional[ottl.IntGetter[K]],
) ottl.ExprFunc[K] {
return func(ctx context.Context, tCtx K) (any, error) {
srcIPValue, err := sourceIP.Get(ctx, tCtx)
params, err := validateAndExtractParams(ctx, tCtx, sourceIP, sourcePort, destinationIP, destinationPort, protocol, seed)
if err != nil {
return nil, fmt.Errorf("failed to get source IP: %w", err)
}

dstIPValue, err := destinationIP.Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get destination IP: %w", err)
}

srcPort, err := sourcePort.Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get source port: %w", err)
}

dstPort, err := destinationPort.Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get destination port: %w", err)
}

protocolValue := uint8(6) // defaults to TCP
if !protocol.IsEmpty() {
protocolStr, err := protocol.Get().Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get protocol: %w", err)
}

resolvedProtocolValue, err := resolveProtocolValue(protocolStr)
if err != nil {
return nil, err
}
protocolValue = resolvedProtocolValue
}

srcIPAddr := net.ParseIP(srcIPValue)
if srcIPAddr == nil {
return nil, fmt.Errorf("invalid source IP: %s", srcIPValue)
}

dstIPAddr := net.ParseIP(dstIPValue)
if dstIPAddr == nil {
return nil, fmt.Errorf("invalid destination IP: %s", dstIPValue)
}

// Get seed value (default: 0) if applied
seedValue := uint16(0)
if !seed.IsEmpty() {
seedInt, err := seed.Get().Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get seed: %w", err)
}
seedValue = uint16(seedInt)
}

// Determine a flow direction and normalize
srcIPBytes := srcIPAddr.To4()
dstIPBytes := dstIPAddr.To4()
srcPortForHash, dstPortForHash := uint16(srcPort), uint16(dstPort)

// IPv4 (4-bytes) or IPv6 (16-bytes) mapped address
if srcIPBytes == nil {
srcIPBytes = srcIPAddr.To16()
}

if dstIPBytes == nil {
dstIPBytes = dstIPAddr.To16()
return nil, err
}

// Determine a flow direction for normalization
// If source IP is greater than destination IP, or if IPs are equal and source port is greater than destination port,
// then swap source and destination to normalize the flow direction
srcIPBytes, dstIPBytes := params.srcIPBytes, params.dstIPBytes
srcPort, dstPort := params.srcPort, params.dstPort

shouldSwap := false
if len(srcIPBytes) != len(dstIPBytes) {
shouldSwap = len(srcIPBytes) > len(dstIPBytes)
} else if cmp := compareBytes(srcIPBytes, dstIPBytes); cmp > 0 {
} else if cmp := bytes.Compare(srcIPBytes, dstIPBytes); cmp > 0 {
shouldSwap = true
} else if cmp == 0 && srcPort > dstPort {
shouldSwap = true
}
if shouldSwap {
srcIPBytes, dstIPBytes = dstIPBytes, srcIPBytes
srcPortForHash, dstPortForHash = dstPortForHash, srcPortForHash
srcPort, dstPort = dstPort, srcPort
}

// Build the flow, format: <seed:2><protocol:1><src_ip><dst_ip><src_port:2><dst_port:2>
return flow(srcIPBytes, srcPortForHash, dstIPBytes, dstPortForHash, protocolValue, seedValue), nil
return flow(srcIPBytes, srcPort, dstIPBytes, dstPort, params.protocol, params.seed), nil
}
}

func resolveProtocolValue(protoStr string) (uint8, error) {
protoMap := map[string]uint8{"ICMP": 1, "TCP": 6, "UDP": 17, "RSVP": 46, "ICMP6": 58, "SCTP": 132}
protocolValue := protoMap[protoStr]
func validateAndExtractParams[K any](
ctx context.Context,
tCtx K,
sourceIP ottl.StringGetter[K],
sourcePort ottl.IntGetter[K],
destinationIP ottl.StringGetter[K],
destinationPort ottl.IntGetter[K],
protocol ottl.Optional[ottl.StringGetter[K]],
seed ottl.Optional[ottl.IntGetter[K]],
) (*communityIDParams, error) {
srcIPValue, err := sourceIP.Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get source IP: %w", err)
}

dstIPValue, err := destinationIP.Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get destination IP: %w", err)
}

if protocolValue == 0 {
return 0, fmt.Errorf("unsupported protocol: %s", protoStr)
srcPort, err := sourcePort.Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get source port: %w", err)
}
if srcPort < 1 || srcPort > 65535 {
return nil, fmt.Errorf("source port must be between 1 and 65535, got %d", srcPort)
}

dstPort, err := destinationPort.Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get destination port: %w", err)
}
if dstPort < 1 || dstPort > 65535 {
return nil, fmt.Errorf("destination port must be between 1 and 65535, got %d", dstPort)
}
return protocolValue, nil
}

func compareBytes(a, b []byte) int {
for i := range a {
if a[i] < b[i] {
return -1
protocolValue := protocolMap["TCP"] // defaults to TCP
if !protocol.IsEmpty() {
protocolStr, err := protocol.Get().Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get protocol: %w", err)
}
if a[i] > b[i] {
return 1

protocolValue = protocolMap[protocolStr]
if protocolValue == 0 {
return nil, fmt.Errorf("unsupported protocol: %s", protocolStr)
}
}
return 0

srcIPAddr := net.ParseIP(srcIPValue)
if srcIPAddr == nil {
return nil, fmt.Errorf("invalid source IP: %s", srcIPValue)
}

dstIPAddr := net.ParseIP(dstIPValue)
if dstIPAddr == nil {
return nil, fmt.Errorf("invalid destination IP: %s", dstIPValue)
}

// Get seed value (default: 0) if applied
seedValue := uint16(0)
if !seed.IsEmpty() {
seedInt, err := seed.Get().Get(ctx, tCtx)
if err != nil {
return nil, fmt.Errorf("failed to get seed: %w", err)
}
if seedInt < 0 || seedInt > 65535 {
return nil, fmt.Errorf("seed must be between 0 and 65535, got %d", seedInt)
}
seedValue = uint16(seedInt)
}

srcIPBytes := srcIPAddr.To4()
dstIPBytes := dstIPAddr.To4()

if srcIPBytes == nil {
srcIPBytes = srcIPAddr.To16()
}

if dstIPBytes == nil {
dstIPBytes = dstIPAddr.To16()
}

return &communityIDParams{
srcIPBytes: srcIPBytes,
dstIPBytes: dstIPBytes,
srcPort: uint16(srcPort),
dstPort: uint16(dstPort),
protocol: protocolValue,
seed: seedValue,
}, nil
}

func flow(srcIPBytes net.IP, srcPortForHash uint16, dstIPBytes net.IP, dstPortForHash uint16, protolValue uint8, seedValue uint16) string {
func flow(srcIPBytes net.IP, srcPortForHash uint16, dstIPBytes net.IP, dstPortForHash uint16, protoValue uint8, seedValue uint16) string {
// Add seed (2 bytes, network order)
flowTuple := make([]byte, 2)
binary.BigEndian.PutUint16(flowTuple, seedValue)

// Add source, destination IPs and 1-byte protocol
flowTuple = append(flowTuple, srcIPBytes...)
flowTuple = append(flowTuple, dstIPBytes...)
flowTuple = append(flowTuple, protolValue, 0)
flowTuple = append(flowTuple, protoValue, 0)

// Add source and destination ports (2 bytes each, network order)
srcPortBytes := make([]byte, 2)
Expand Down