Skip to content

Commit d62ea9b

Browse files
authored
Port retain operator (#279)
* Port retain operator * Force test * Add License
1 parent 732753b commit d62ea9b

File tree

5 files changed

+724
-0
lines changed

5 files changed

+724
-0
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88
### Added
99
- Added doublestar support
1010

11+
## [0.13.22] - Unreleased
12+
### Added
13+
- Added retain operator
14+
1115
## [0.13.21] - 2021-05-07
1216

1317
### Changed

cmd/stanza/init_common.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
_ "github.com/observiq/stanza/operator/builtin/transformer/ratelimit"
3030
_ "github.com/observiq/stanza/operator/builtin/transformer/recombine"
3131
_ "github.com/observiq/stanza/operator/builtin/transformer/restructure"
32+
_ "github.com/observiq/stanza/operator/builtin/transformer/retain"
3233
_ "github.com/observiq/stanza/operator/builtin/transformer/router"
3334

3435
_ "github.com/observiq/stanza/operator/builtin/output/drop"

docs/operators/retain.md

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
## `Retain` operator
2+
3+
The `retain` operator keeps the specified list of fields, and removes the rest.
4+
5+
### Configuration Fields
6+
7+
| Field | Default | Description |
8+
| --- | --- | --- |
9+
| `id` | `retain` | A unique identifier for the operator |
10+
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
11+
| `fields` | required | A list of [fields](/docs/types/field.md) to be kept. |
12+
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
13+
| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. |
14+
<hr>
15+
<b>NOTE:</b> If no fields in a group (labels, resource, or record) are specified, that entire group will be retained.
16+
<hr>
17+
Example usage:
18+
<hr>
19+
Retain fields in the record
20+
21+
```yaml
22+
- type: retain
23+
fields:
24+
- key1
25+
- key2
26+
```
27+
28+
<table>
29+
<tr><td> Input Entry </td> <td> Output Entry </td></tr>
30+
<tr>
31+
<td>
32+
33+
```json
34+
{
35+
"resource": { },
36+
"labels": { },
37+
"record": {
38+
"key1": "val1",
39+
"key2": "val2",
40+
"key3": "val3",
41+
"key4": "val4"
42+
}
43+
}
44+
```
45+
46+
</td>
47+
<td>
48+
49+
```json
50+
{
51+
"resource": { },
52+
"labels": { },
53+
"record": {
54+
"key1": "val1",
55+
"key2": "val2"
56+
}
57+
}
58+
```
59+
60+
</td>
61+
</tr>
62+
</table>
63+
64+
<hr>
65+
Retain an object in the record
66+
67+
```yaml
68+
- type: retain
69+
fields:
70+
- object
71+
```
72+
73+
<table>
74+
<tr><td> Input entry </td> <td> Output entry </td></tr>
75+
<tr>
76+
<td>
77+
78+
```json
79+
{
80+
"resource": { },
81+
"labels": { },
82+
"record": {
83+
"key1": "val1",
84+
"object": {
85+
"nestedkey": "val2",
86+
}
87+
}
88+
}
89+
```
90+
91+
</td>
92+
<td>
93+
94+
```json
95+
{
96+
"resource": { },
97+
"labels": { },
98+
"record": {
99+
"object": {
100+
"nestedkey": "val2",
101+
}
102+
}
103+
}
104+
```
105+
106+
</td>
107+
</tr>
108+
</table>
109+
110+
<hr>
111+
Retain fields from resource
112+
113+
```yaml
114+
- type: retain
115+
fields:
116+
- $resource.key1
117+
- $resource.key2
118+
```
119+
120+
<table>
121+
<tr><td> Input entry </td> <td> Output entry </td></tr>
122+
<tr>
123+
<td>
124+
125+
```json
126+
{
127+
"resource": {
128+
"key1": "val1",
129+
"key2": "val2",
130+
"key3": "val3"
131+
},
132+
"labels": { },
133+
"record": {
134+
"key1": "val1",
135+
}
136+
}
137+
}
138+
```
139+
140+
</td>
141+
<td>
142+
143+
```json
144+
{
145+
"resource": {
146+
"key1": "val1",
147+
"key2": "val2",
148+
},
149+
"labels": { },
150+
"record": {
151+
"key1": "val1",
152+
}
153+
}
154+
```
155+
156+
</td>
157+
</tr>
158+
</table>
159+
160+
<hr>
161+
Retain fields from labels
162+
163+
```yaml
164+
- type: retain
165+
fields:
166+
- $labels.key1
167+
- $labels.key2
168+
```
169+
170+
<table>
171+
<tr><td> Input entry </td> <td> Output entry </td></tr>
172+
<tr>
173+
<td>
174+
175+
```json
176+
{
177+
"resource": { },
178+
"labels": {
179+
"key1": "val1",
180+
"key2": "val2",
181+
"key3": "val3"
182+
},
183+
"record": {
184+
"key1": "val1",
185+
}
186+
}
187+
```
188+
189+
</td>
190+
<td>
191+
192+
```json
193+
{
194+
"resource": { },
195+
"labels": {
196+
"key1": "val1",
197+
"key2": "val2",
198+
},
199+
"record": {
200+
"key1": "val1",
201+
}
202+
}
203+
```
204+
205+
</td>
206+
</tr>
207+
</table>
208+
209+
<hr>
210+
Retain fields from all sources
211+
212+
```yaml
213+
- type: retain
214+
fields:
215+
- $resource.key1
216+
- $labels.key3
217+
- key5
218+
```
219+
220+
<table>
221+
<tr><td> Input entry </td> <td> Output entry </td></tr>
222+
<tr>
223+
<td>
224+
225+
```json
226+
{
227+
"resource": {
228+
"key1": "val1",
229+
"key2": "val2"
230+
},
231+
"labels": {
232+
"key3": "val3",
233+
"key4": "val4"
234+
},
235+
"record": {
236+
"key5": "val5",
237+
"key6": "val6",
238+
}
239+
}
240+
```
241+
242+
</td>
243+
<td>
244+
245+
```json
246+
{
247+
"resource": {
248+
"key1": "val1",
249+
},
250+
"labels": {
251+
"key3": "val3",
252+
},
253+
"record": {
254+
"key5": "val5",
255+
}
256+
}
257+
```
258+
259+
</td>
260+
</tr>
261+
</table>
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package retain
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"strings"
21+
22+
"github.com/observiq/stanza/entry"
23+
"github.com/observiq/stanza/operator"
24+
"github.com/observiq/stanza/operator/helper"
25+
)
26+
27+
func init() {
28+
operator.Register("retain", func() operator.Builder { return NewRetainOperatorConfig("") })
29+
}
30+
31+
// NewRetainOperatorConfig creates a new retain operator config with default values
32+
func NewRetainOperatorConfig(operatorID string) *RetainOperatorConfig {
33+
return &RetainOperatorConfig{
34+
TransformerConfig: helper.NewTransformerConfig(operatorID, "retain"),
35+
}
36+
}
37+
38+
// RetainOperatorConfig is the configuration of a retain operator
39+
type RetainOperatorConfig struct {
40+
helper.TransformerConfig `mapstructure:",squash" yaml:",inline"`
41+
Fields []entry.Field `mapstructure:"fields" json:"fields" yaml:"fields"`
42+
}
43+
44+
// Build will build a retain operator from the supplied configuration
45+
func (c RetainOperatorConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
46+
transformerOperator, err := c.TransformerConfig.Build(context)
47+
if err != nil {
48+
return nil, err
49+
}
50+
if c.Fields == nil || len(c.Fields) == 0 {
51+
return nil, fmt.Errorf("retain: 'fields' is empty")
52+
}
53+
54+
retainOp := &RetainOperator{
55+
TransformerOperator: transformerOperator,
56+
Fields: c.Fields,
57+
}
58+
59+
for _, field := range c.Fields {
60+
typeCheck := field.String()
61+
if strings.HasPrefix(typeCheck, "$resource") {
62+
retainOp.AllResourceFields = true
63+
continue
64+
}
65+
if strings.HasPrefix(typeCheck, "$labels") {
66+
retainOp.AllAttributeFields = true
67+
continue
68+
}
69+
retainOp.AllBodyFields = true
70+
}
71+
return []operator.Operator{retainOp}, nil
72+
}
73+
74+
// RetainOperator keeps the given fields and deletes the rest.
75+
type RetainOperator struct {
76+
helper.TransformerOperator
77+
Fields []entry.Field
78+
AllBodyFields bool
79+
AllAttributeFields bool
80+
AllResourceFields bool
81+
}
82+
83+
// Process will process an entry with a retain transformation.
84+
func (p *RetainOperator) Process(ctx context.Context, entry *entry.Entry) error {
85+
return p.ProcessWith(ctx, entry, p.Transform)
86+
}
87+
88+
// Transform will apply the retain operation to an entry
89+
func (p *RetainOperator) Transform(e *entry.Entry) error {
90+
newEntry := entry.New()
91+
newEntry.Timestamp = e.Timestamp
92+
93+
if !p.AllResourceFields {
94+
newEntry.Resource = e.Resource
95+
}
96+
if !p.AllAttributeFields {
97+
newEntry.Labels = e.Labels
98+
}
99+
if !p.AllBodyFields {
100+
newEntry.Record = e.Record
101+
}
102+
103+
for _, field := range p.Fields {
104+
val, ok := e.Get(field)
105+
if !ok {
106+
continue
107+
}
108+
err := newEntry.Set(field, val)
109+
if err != nil {
110+
return err
111+
}
112+
}
113+
114+
*e = *newEntry
115+
return nil
116+
}

0 commit comments

Comments
 (0)