Skip to content

Commit b42e37b

Browse files
committed
Improve Node and Go header validation
1 parent c1731c7 commit b42e37b

File tree

13 files changed

+620
-348
lines changed

13 files changed

+620
-348
lines changed

bdd/go/tests/tcp_test/messages_steps.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ import (
2828
"github.com/onsi/gomega"
2929
)
3030

31-
func createDefaultMessageHeaders() map[iggcon.HeaderKey]iggcon.HeaderValue {
32-
return map[iggcon.HeaderKey]iggcon.HeaderValue{
33-
{Value: createRandomString(4)}: {Kind: iggcon.String, Value: []byte(createRandomString(8))},
34-
{Value: createRandomString(8)}: {Kind: iggcon.Uint32, Value: []byte{0x01, 0x02, 0x03, 0x04}},
31+
func createDefaultMessageHeaders() []iggcon.HeaderEntry {
32+
return []iggcon.HeaderEntry{
33+
{Key: iggcon.HeaderKey{Kind: iggcon.String, Value: []byte(createRandomString(4))}, Value: iggcon.HeaderValue{Kind: iggcon.String, Value: []byte(createRandomString(8))}},
34+
{Key: iggcon.HeaderKey{Kind: iggcon.String, Value: []byte(createRandomString(8))}, Value: iggcon.HeaderValue{Kind: iggcon.Uint32, Value: []byte{0x01, 0x02, 0x03, 0x04}}},
3535
}
3636
}
3737

foreign/go/binary_serialization/send_messages_request_serializer_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ func TestSerialize_SendMessagesRequest(t *testing.T) {
6666
}
6767
}
6868

69-
func createDefaultMessageHeaders() map[iggcon.HeaderKey]iggcon.HeaderValue {
70-
return map[iggcon.HeaderKey]iggcon.HeaderValue{
71-
{Kind: iggcon.String, Value: "HeaderKey1"}: {Kind: iggcon.String, Value: []byte("Value 1")},
72-
{Kind: iggcon.String, Value: "HeaderKey2"}: {Kind: iggcon.Uint32, Value: []byte{0x01, 0x02, 0x03, 0x04}},
69+
func createDefaultMessageHeaders() []iggcon.HeaderEntry {
70+
return []iggcon.HeaderEntry{
71+
{Key: iggcon.HeaderKey{Kind: iggcon.String, Value: []byte("HeaderKey1")}, Value: iggcon.HeaderValue{Kind: iggcon.String, Value: []byte("Value 1")}},
72+
{Key: iggcon.HeaderKey{Kind: iggcon.String, Value: []byte("HeaderKey2")}, Value: iggcon.HeaderValue{Kind: iggcon.Uint32, Value: []byte{0x01, 0x02, 0x03, 0x04}}},
7373
}
7474
}
7575

foreign/go/contracts/messages.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func WithID(id [16]byte) IggyMessageOpt {
118118
}
119119
}
120120

121-
func WithUserHeaders(userHeaders map[HeaderKey]HeaderValue) IggyMessageOpt {
121+
func WithUserHeaders(userHeaders []HeaderEntry) IggyMessageOpt {
122122
return func(m *IggyMessage) {
123123
userHeaderBytes := GetHeadersBytes(userHeaders)
124124
m.UserHeaders = userHeaderBytes

foreign/go/contracts/user_headers.go

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,32 @@ type HeaderValue struct {
2929

3030
type HeaderKey struct {
3131
Kind HeaderKind
32-
Value string
32+
Value []byte
33+
}
34+
35+
type HeaderEntry struct {
36+
Key HeaderKey
37+
Value HeaderValue
3338
}
3439

3540
func NewHeaderKeyString(val string) (HeaderKey, error) {
3641
if len(val) == 0 || len(val) > 255 {
3742
return HeaderKey{}, errors.New("value has incorrect size, must be between 1 and 255")
3843
}
39-
return HeaderKey{Kind: String, Value: val}, nil
44+
return HeaderKey{Kind: String, Value: []byte(val)}, nil
45+
}
46+
47+
func NewHeaderKeyRaw(val []byte) (HeaderKey, error) {
48+
if len(val) == 0 || len(val) > 255 {
49+
return HeaderKey{}, errors.New("value has incorrect size, must be between 1 and 255")
50+
}
51+
return HeaderKey{Kind: Raw, Value: val}, nil
52+
}
53+
54+
func NewHeaderKeyInt32(val int32) HeaderKey {
55+
buf := make([]byte, 4)
56+
binary.LittleEndian.PutUint32(buf, uint32(val))
57+
return HeaderKey{Kind: Int32, Value: buf}
4058
}
4159

4260
type HeaderKind int
@@ -59,34 +77,50 @@ const (
5977
Double HeaderKind = 15
6078
)
6179

62-
func GetHeadersBytes(headers map[HeaderKey]HeaderValue) []byte {
80+
func (k HeaderKind) ExpectedSize() int {
81+
switch k {
82+
case Bool, Int8, Uint8:
83+
return 1
84+
case Int16, Uint16:
85+
return 2
86+
case Int32, Uint32, Float:
87+
return 4
88+
case Int64, Uint64, Double:
89+
return 8
90+
case Int128, Uint128:
91+
return 16
92+
default:
93+
return -1
94+
}
95+
}
96+
97+
func GetHeadersBytes(headers []HeaderEntry) []byte {
6398
headersLength := 0
64-
for key, header := range headers {
65-
headersLength += 1 + 4 + len([]byte(key.Value)) + 1 + 4 + len(header.Value)
99+
for _, entry := range headers {
100+
headersLength += 1 + 4 + len(entry.Key.Value) + 1 + 4 + len(entry.Value.Value)
66101
}
67102
headersBytes := make([]byte, headersLength)
68103
position := 0
69-
for key, value := range headers {
70-
headerBytes := getBytesFromHeader(key, value)
104+
for _, entry := range headers {
105+
headerBytes := getBytesFromHeader(entry.Key, entry.Value)
71106
copy(headersBytes[position:position+len(headerBytes)], headerBytes)
72107
position += len(headerBytes)
73108
}
74109
return headersBytes
75110
}
76111

77112
func getBytesFromHeader(key HeaderKey, value HeaderValue) []byte {
78-
keyBytes := []byte(key.Value)
79-
headerBytesLength := 1 + 4 + len(keyBytes) + 1 + 4 + len(value.Value)
113+
headerBytesLength := 1 + 4 + len(key.Value) + 1 + 4 + len(value.Value)
80114
headerBytes := make([]byte, headerBytesLength)
81115
pos := 0
82116

83117
headerBytes[pos] = byte(key.Kind)
84118
pos++
85119

86-
binary.LittleEndian.PutUint32(headerBytes[pos:pos+4], uint32(len(keyBytes)))
120+
binary.LittleEndian.PutUint32(headerBytes[pos:pos+4], uint32(len(key.Value)))
87121
pos += 4
88-
copy(headerBytes[pos:pos+len(keyBytes)], keyBytes)
89-
pos += len(keyBytes)
122+
copy(headerBytes[pos:pos+len(key.Value)], key.Value)
123+
pos += len(key.Value)
90124

91125
headerBytes[pos] = byte(value.Kind)
92126
pos++
@@ -98,8 +132,8 @@ func getBytesFromHeader(key HeaderKey, value HeaderValue) []byte {
98132
return headerBytes
99133
}
100134

101-
func DeserializeHeaders(userHeadersBytes []byte) (map[HeaderKey]HeaderValue, error) {
102-
headers := make(map[HeaderKey]HeaderValue)
135+
func DeserializeHeaders(userHeadersBytes []byte) ([]HeaderEntry, error) {
136+
var headers []HeaderEntry
103137
position := 0
104138

105139
for position < len(userHeadersBytes) {
@@ -123,7 +157,12 @@ func DeserializeHeaders(userHeadersBytes []byte) (map[HeaderKey]HeaderValue, err
123157
return nil, errors.New("invalid header key")
124158
}
125159

126-
keyValue := string(userHeadersBytes[position : position+int(keyLength)])
160+
if expected := keyKind.ExpectedSize(); expected != -1 && int(keyLength) != expected {
161+
return nil, errors.New("invalid header key size for kind")
162+
}
163+
164+
keyValue := make([]byte, keyLength)
165+
copy(keyValue, userHeadersBytes[position:position+int(keyLength)])
127166
position += int(keyLength)
128167

129168
valueKind, err := deserializeHeaderKind(userHeadersBytes, position)
@@ -147,13 +186,18 @@ func DeserializeHeaders(userHeadersBytes []byte) (map[HeaderKey]HeaderValue, err
147186
return nil, errors.New("invalid header value")
148187
}
149188

150-
value := userHeadersBytes[position : position+int(valueLength)]
189+
if expected := valueKind.ExpectedSize(); expected != -1 && int(valueLength) != expected {
190+
return nil, errors.New("invalid header value size for kind")
191+
}
192+
193+
valueBytes := make([]byte, valueLength)
194+
copy(valueBytes, userHeadersBytes[position:position+int(valueLength)])
151195
position += int(valueLength)
152196

153-
headers[HeaderKey{Kind: keyKind, Value: keyValue}] = HeaderValue{
154-
Kind: valueKind,
155-
Value: value,
156-
}
197+
headers = append(headers, HeaderEntry{
198+
Key: HeaderKey{Kind: keyKind, Value: keyValue},
199+
Value: HeaderValue{Kind: valueKind, Value: valueBytes},
200+
})
157201
}
158202

159203
return headers, nil

foreign/node/src/examples/stream-file-to-topic.ts

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,84 +17,107 @@
1717
* under the License.
1818
*/
1919

20-
21-
import { open } from 'node:fs/promises';
22-
import { resolve } from 'node:path';
23-
import { pipeline } from 'node:stream/promises';
24-
import { Writable, type TransformCallback } from 'node:stream';
25-
import { HeaderValue } from '../index.js';
26-
import { getClient } from './utils.js';
20+
import { open } from "node:fs/promises";
21+
import { resolve } from "node:path";
22+
import { pipeline } from "node:stream/promises";
23+
import { Writable, type TransformCallback } from "node:stream";
24+
import { HeaderValue, HeaderKeyFactory } from "../index.js";
25+
import { getClient } from "./utils.js";
2726

2827
export const fileToTopic = async (
2928
filepath: string,
3029
streamName: string,
3130
topicName: string,
32-
highWaterMark = 512 * 1024
31+
highWaterMark = 512 * 1024,
3332
) => {
3433
const cli = getClient();
3534
await cli.stream.ensure(streamName);
3635
await cli.topic.ensure(streamName, topicName);
3736
const fd = await open(filepath);
38-
const fname = filepath.split('/').pop() || filepath;
37+
const fname = filepath.split("/").pop() || filepath;
3938
const st = await fd.stat();
40-
console.log('FILE/STAT', fname, '~', (st.size / (1024 * 1024)).toFixed(2), 'mb', st);
39+
console.log(
40+
"FILE/STAT",
41+
fname,
42+
"~",
43+
(st.size / (1024 * 1024)).toFixed(2),
44+
"mb",
45+
st,
46+
);
4147
const dStart = Date.now();
4248

4349
try {
4450
let idx = 0;
4551

4652
await pipeline(
47-
fd.createReadStream({highWaterMark}),
53+
fd.createReadStream({ highWaterMark }),
4854

4955
new Writable({
50-
async write(chunks: Buffer, encoding: BufferEncoding, cb: TransformCallback) {
51-
const messages = [{
52-
headers: {
53-
fileindex: HeaderValue.Uint32(idx++),
54-
filename: HeaderValue.String(fname)
56+
async write(
57+
chunks: Buffer,
58+
encoding: BufferEncoding,
59+
cb: TransformCallback,
60+
) {
61+
const messages = [
62+
{
63+
headers: [
64+
{
65+
key: HeaderKeyFactory.String("fileindex"),
66+
value: HeaderValue.Uint32(idx++),
67+
},
68+
{
69+
key: HeaderKeyFactory.String("filename"),
70+
value: HeaderValue.String(fname),
71+
},
72+
],
73+
payload: chunks,
5574
},
56-
payload: chunks
57-
}];
75+
];
5876

5977
try {
60-
await cli.message.send({ streamId: streamName, topicId: topicName, messages });
78+
await cli.message.send({
79+
streamId: streamName,
80+
topicId: topicName,
81+
messages,
82+
});
6183
cb();
6284
} catch (err) {
63-
console.log('WRITE ERR', err, chunks);
85+
console.log("WRITE ERR", err, chunks);
6486
cb(err as Error);
6587
}
66-
}
67-
})
88+
},
89+
}),
6890
);
6991

70-
console.log(`Finished ! took ${Date.now() - dStart}ms`,);
92+
console.log(`Finished ! took ${Date.now() - dStart}ms`);
7193
} catch (err) {
72-
console.error('Pipeline failed !', err);
94+
console.error("Pipeline failed !", err);
7395
throw err;
7496
}
7597
};
7698

77-
7899
const argz = process.argv.slice(2);
79100
const [rPath, streamIdStr, topicIdStr] = argz;
80101

81-
if (argz.length < 3 || ['-h', '--help', '?'].includes(argz[0])) {
82-
console.log(`Usage: node stream-file-to-topic.js filePath streamName topicName`)
83-
console.log('got', argz);
84-
console.log('note: this script only accept numerical stream/topic id');
102+
if (argz.length < 3 || ["-h", "--help", "?"].includes(argz[0])) {
103+
console.log(
104+
`Usage: node stream-file-to-topic.js filePath streamName topicName`,
105+
);
106+
console.log("got", argz);
107+
console.log("note: this script only accept numerical stream/topic id");
85108
process.exit(1);
86109
}
87110

88111
const filepath = resolve(rPath);
89112
const streamName = streamIdStr;
90113
const topicName = topicIdStr;
91114

92-
console.log('running with params:', { filepath, streamName, topicName })
115+
console.log("running with params:", { filepath, streamName, topicName });
93116

94117
try {
95118
await fileToTopic(filepath, streamName, topicName, 512 * 1024);
96119
// eslint-disable-next-line @typescript-eslint/no-unused-vars
97-
} catch(err) {
120+
} catch (err) {
98121
process.exit(1);
99122
}
100123

foreign/node/src/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
* under the License.
1818
*/
1919

20-
2120
export {
2221
type Id,
2322
PollingStrategy,
2423
Consumer,
2524
Partitioning,
2625
HeaderValue,
27-
} from './wire/index.js';
26+
HeaderKeyFactory,
27+
} from "./wire/index.js";
2828

29-
export * from './client/index.js';
30-
export * from './stream/index.js';
29+
export * from "./client/index.js";
30+
export * from "./stream/index.js";

0 commit comments

Comments
 (0)