Skip to content

Commit 75a0092

Browse files
authored
feat: Message transform with javascript (#55)
* json lua transform wip * message transformation * fix unchecked returns
1 parent e9cffb8 commit 75a0092

26 files changed

+892
-14
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ HOST=localhost
33
PORT=3000
44
REDIS_URL=redis://localhost:6379
55
REDIS_INHOOKS_DB_NAME=development
6+
TRANSFORM_JAVASCRIPT_TIMEOUT=1s

.env.test.example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ APP_ENV=test
22
HOST=localhost
33
PORT=3001
44
REDIS_URL=redis://localhost:6379
5-
REDIS_INHOOKS_DB_NAME=test
5+
REDIS_INHOOKS_DB_NAME=test

README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Inhooks listens to HTTP webhooks and saves the messages to Redis. A processing m
2626
- Supports delayed processing
2727
- Supports retries on failure with configurable number of attempts, interval and constant or exponential backoff
2828
- Supports different HTTP payloads types: JSON, x-www-form-urlencoded, multipart/form-data
29+
- Supports message transformation using JavaScript ECMAScript 5.1
2930
- ... more features planned
3031

3132
## Usage
@@ -79,6 +80,82 @@ flows:
7980
currentSecretEnvVar: VERIFICATION_FLOW_1_CURRENT_SECRET # the name of the environment variable containing the verification secret
8081
previousSecretEnvVar: VERIFICATION_FLOW_1_PREVIOUS_SECRET # optional env var that allows rotating secrets without service interruption
8182
```
83+
84+
### Message transformation
85+
86+
#### Transform definition
87+
88+
Message transformation allows you to modify the payload and headers of messages before they are sent to the sinks (destinations). This can be useful for tasks such as adding or removing fields, changing the format of the data, or adding custom headers.
89+
90+
Currently, only JavaScript transformations are supported. The JavaScript function should be named `transform` and should take two parameters: `bodyStr` (the message body as a string) and `headers` (the message headers as a JSON object). The function should return an array with two elements: the transformed payload as a string and the transformed headers as a JSON object.
91+
The `headers` fields has the following format:
92+
```
93+
{
94+
"header-name": ["value1", "value2"]
95+
}
96+
```
97+
98+
Only JavaScript ECMAScript 5.1 is supported at the moment. We use the [goja](https://github.com/dop251/goja) library to execute the JavaScript code. You can read about the limitations on goja's documentation pages.
99+
100+
Here is an example configuration:
101+
```yaml
102+
flows:
103+
- id: flow-1
104+
source:
105+
id: source-1
106+
slug: source-1-slug
107+
type: http
108+
sinks:
109+
- id: sink-1
110+
type: http
111+
url: https://example.com/target
112+
transform:
113+
id: js-transform-1
114+
transform_definitions:
115+
- id: js-transform-1
116+
type: javascript
117+
script: |
118+
function transform(bodyStr, headers) {
119+
const body = JSON.parse(bodyStr);
120+
121+
// add a header
122+
headers["X-INHOOKS-TRANSFORMED"] = ["1"];
123+
// capitalize the message if present
124+
if (body.msg) {
125+
body.msg = body.msg.toUpperCase();
126+
}
127+
// delete a key from the body
128+
delete body.my_dummy_key;
129+
130+
return [JSON.stringify(body), headers];
131+
}
132+
```
133+
134+
135+
#### Testing transform scripts
136+
137+
You can use the `/api/v1/transform` endpoint to test your transform scripts before adding them to your flow configuration. This endpoint allows you to simulate the transformation process and see the results immediately.
138+
139+
To use this endpoint, send a POST request with a JSON payload containing the following fields:
140+
- `body`: The message body as a string
141+
- `headers`: The message headers as a JSON object
142+
- `transformDefinition`: An object containing the `type` and `script` of your transformation
143+
144+
Here's an example of how to use the `/api/v1/transform` endpoint:
145+
```shell
146+
curl -X POST http://localhost:3000/api/v1/transform \
147+
-H "Content-Type: application/json" \
148+
-d '{
149+
"body": "{\"msg\": \"hello world\", \"my_dummy_key\": \"value\"}",
150+
"headers": {"Content-Type": ["application/json"]},
151+
"transformDefinition": {
152+
"type": "javascript",
153+
"script": "function transform(bodyStr, headers) { const body = JSON.parse(bodyStr); headers[\"X-INHOOKS-TRANSFORMED\"] = [\"1\"]; if (body.msg) { body.msg = body.msg.toUpperCase(); } delete body.my_dummy_key; return [JSON.stringify(body), headers]; }"
154+
}
155+
}'
156+
```
157+
158+
82159
### Prometheus metrics
83160
Inhooks exposes Prometheus metrics at the `/api/v1/metrics` endpoint.
84161

cmd/api/main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"flag"
56
"fmt"
67
"log"
78
"net/http"
@@ -26,6 +27,15 @@ var (
2627
func main() {
2728
versionpkg.SetVersion(version)
2829

30+
// handle version command
31+
isVersionCmd := flag.Bool("version", false, "print the version")
32+
flag.Parse()
33+
if *isVersionCmd {
34+
fmt.Println(version)
35+
os.Exit(0)
36+
}
37+
38+
// start server
2939
err := lib.LoadEnv()
3040
if err != nil {
3141
log.Fatalf("failed to load env: %v", err)
@@ -41,6 +51,8 @@ func main() {
4151
log.Fatalf("failed to initialize logger: %v", err)
4252
}
4353

54+
logger.Info("starting Inhooks", zap.String("version", version))
55+
4456
inhooksConfigSvc := services.NewInhooksConfigService(logger, appConf)
4557
logger.Info("loading inhooks config", zap.String("inhooksConfigFile", appConf.InhooksConfigFile))
4658

@@ -65,13 +77,15 @@ func main() {
6577
messageEnqueuer := services.NewMessageEnqueuer(redisStore, timeSvc)
6678
messageFetcher := services.NewMessageFetcher(redisStore, timeSvc)
6779
messageVerifier := services.NewMessageVerifier()
80+
messageTransformer := services.NewMessageTransformer(&appConf.Transform)
6881

6982
app := handlers.NewApp(
7083
handlers.WithLogger(logger),
7184
handlers.WithInhooksConfigService(inhooksConfigSvc),
7285
handlers.WithMessageBuilder(messageBuilder),
7386
handlers.WithMessageEnqueuer(messageEnqueuer),
7487
handlers.WithMessageVerifier(messageVerifier),
88+
handlers.WithMessageTransformer(messageTransformer),
7589
)
7690

7791
r := server.NewRouter(app)
@@ -119,6 +133,7 @@ func main() {
119133
supervisor.WithSchedulerService(schedulerSvc),
120134
supervisor.WithProcessingRecoveryService(processingRecoverySvc),
121135
supervisor.WithCleanupService(cleanupSvc),
136+
supervisor.WithMessageTransformer(messageTransformer),
122137
)
123138

124139
wg.Add(1)

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ require (
4949
github.com/davecgh/go-spew v1.1.1 // indirect
5050
github.com/denis-tingaikin/go-header v0.4.3 // indirect
5151
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
52+
github.com/dlclark/regexp2 v1.7.0 // indirect
53+
github.com/dop251/goja v0.0.0-20240627195025-eb1f15ee67d2 // indirect
5254
github.com/dustin/go-humanize v1.0.0 // indirect
5355
github.com/esimonov/ifshort v1.0.4 // indirect
5456
github.com/ettle/strcase v0.1.1 // indirect
@@ -58,6 +60,7 @@ require (
5860
github.com/fsnotify/fsnotify v1.5.4 // indirect
5961
github.com/fzipp/gocyclo v0.6.0 // indirect
6062
github.com/go-critic/go-critic v0.7.0 // indirect
63+
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
6164
github.com/go-toolsmith/astcast v1.1.0 // indirect
6265
github.com/go-toolsmith/astcopy v1.1.0 // indirect
6366
github.com/go-toolsmith/astequal v1.1.0 // indirect
@@ -79,6 +82,7 @@ require (
7982
github.com/golangci/revgrep v0.0.0-20220804021717-745bb2f7c2e6 // indirect
8083
github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect
8184
github.com/google/go-cmp v0.6.0 // indirect
85+
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect
8286
github.com/gordonklaus/ineffassign v0.0.0-20230107090616-13ace0543b28 // indirect
8387
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
8488
github.com/gostaticanalysis/comment v1.4.2 // indirect

go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
122122
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
123123
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
124124
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
125+
github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo=
126+
github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8=
127+
github.com/dop251/goja v0.0.0-20240627195025-eb1f15ee67d2 h1:4Ew88p5s9dwIk5/woUyqI9BD89NgZoUNH4/rM/h2UDg=
128+
github.com/dop251/goja v0.0.0-20240627195025-eb1f15ee67d2/go.mod h1:o31y53rb/qiIAONF7w3FHJZRqqP3fzHUr1HqanthByw=
125129
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
126130
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
127131
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -159,6 +163,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
159163
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
160164
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
161165
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
166+
github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU=
167+
github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
162168
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
163169
github.com/go-toolsmith/astcast v1.1.0 h1:+JN9xZV1A+Re+95pgnMgDboWNVnIMMQXwfBwLRPgSC8=
164170
github.com/go-toolsmith/astcast v1.1.0/go.mod h1:qdcuFWeGGS2xX5bLM/c3U9lewg7+Zu4mr+xPwZIB4ZU=
@@ -264,6 +270,8 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf
264270
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
265271
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
266272
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
273+
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U=
274+
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
267275
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
268276
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
269277
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=

pkg/lib/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type AppConfig struct {
1515
Supervisor SupervisorConfig
1616
HTTPClient HTTPClientConfig
1717
Sink SinkConfig
18+
Transform TransformConfig
1819
}
1920

2021
type ServerConfig struct {
@@ -57,6 +58,10 @@ type SinkConfig struct {
5758
DefaultRetryExpMultiplier float64 `env:"SINK_DEFAULT_RETRY_EXP_MULTIPLIER,default=1"`
5859
}
5960

61+
type TransformConfig struct {
62+
JavascriptTimeout time.Duration `env:"TRANSFORM_JAVASCRIPT_TIMEOUT,default=1s"`
63+
}
64+
6065
func InitAppConfig(ctx context.Context) (*AppConfig, error) {
6166
appConf := &AppConfig{}
6267
err := envconfig.Process(ctx, appConf)

pkg/models/inhooks_config.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import (
1010
)
1111

1212
type InhooksConfig struct {
13-
Flows []*Flow `yaml:"flows"`
13+
Flows []*Flow `yaml:"flows"`
14+
TransformDefinitions []*TransformDefinition `yaml:"transform_definitions"`
1415
}
1516

1617
var idRegex = regexp.MustCompile(`^[a-zA-Z0-9\-]{1,255}$`)
@@ -27,6 +28,28 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error {
2728

2829
flowIDs := map[string]bool{}
2930
sourceSlugs := map[string]bool{}
31+
transformIDs := map[string]bool{}
32+
33+
if c.TransformDefinitions != nil {
34+
for i, transform := range c.TransformDefinitions {
35+
if !slices.Contains(TransformTypes, transform.Type) {
36+
return fmt.Errorf("invalid transform type: %s. allowed: %v", transform.Type, TransformTypes)
37+
}
38+
39+
if !idRegex.MatchString(transform.ID) {
40+
return idValidationErr(fmt.Sprintf("transforms[%d].id", i))
41+
}
42+
43+
if transformIDs[transform.ID] {
44+
return fmt.Errorf("transform ids must be unique. duplicate transform id: %s", transform.ID)
45+
}
46+
transformIDs[transform.ID] = true
47+
48+
if transform.Script == "" {
49+
return fmt.Errorf("transform script cannot be empty")
50+
}
51+
}
52+
}
3053

3154
for i, f := range c.Flows {
3255
if !idRegex.MatchString(f.ID) {
@@ -119,6 +142,13 @@ func ValidateInhooksConfig(appConf *lib.AppConfig, c *InhooksConfig) error {
119142
return fmt.Errorf("invalid url scheme: %s", sink.URL)
120143
}
121144
}
145+
146+
// validate transform
147+
if sink.Transform != nil {
148+
if !transformIDs[sink.Transform.ID] {
149+
return fmt.Errorf("transform id not found: %s", sink.Transform.ID)
150+
}
151+
}
122152
}
123153
}
124154

0 commit comments

Comments
 (0)