Skip to content

Commit af2fac7

Browse files
authored
Apply request body mutations to ext_proc response in BBR (kubernetes-sigs#2551)
- Marshal the potentially mutated request body after plugins run - Set Content-Length header to reflect the new body size - Streaming: pass mutated bytes to addStreamedBodyResponse - Non-streaming: include BodyMutation_Body in the response - Update unit and integration tests to verify Content-Length and body mutation Signed-off-by: asaadbalum <asaad.balum@gmail.com>
1 parent a2f6326 commit af2fac7

File tree

6 files changed

+190
-89
lines changed

6 files changed

+190
-89
lines changed

pkg/bbr/handlers/request.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"strconv"
2324
"time"
2425

2526
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
@@ -65,6 +66,13 @@ func (s *Server) HandleRequestBody(ctx context.Context, reqCtx *RequestContext,
6566
reqCtx.Request.SetHeader(BaseModelHeader, baseModel)
6667
logger.Info("Base model from datastore", "baseModel", baseModel)
6768

69+
// TODO: check and do this only if the request body actually changed.
70+
mutatedBodyBytes, err := json.Marshal(reqCtx.Request.Body)
71+
if err != nil {
72+
return nil, err
73+
}
74+
reqCtx.Request.SetHeader(contentLengthHeader, strconv.Itoa(len(mutatedBodyBytes)))
75+
6876
metrics.RecordSuccessCounter()
6977

7078
if s.streaming {
@@ -81,7 +89,7 @@ func (s *Server) HandleRequestBody(ctx context.Context, reqCtx *RequestContext,
8189
},
8290
},
8391
})
84-
ret = addStreamedBodyResponse(ret, requestBodyBytes)
92+
ret = addStreamedBodyResponse(ret, mutatedBodyBytes)
8593
return ret, nil
8694
}
8795

@@ -96,6 +104,11 @@ func (s *Server) HandleRequestBody(ctx context.Context, reqCtx *RequestContext,
96104
SetHeaders: envoy.GenerateHeadersMutation(reqCtx.Request.MutatedHeaders()),
97105
RemoveHeaders: reqCtx.Request.RemovedHeaders(),
98106
},
107+
BodyMutation: &eppb.BodyMutation{
108+
Mutation: &eppb.BodyMutation_Body{
109+
Body: mutatedBodyBytes,
110+
},
111+
},
99112
},
100113
},
101114
},

pkg/bbr/handlers/request_test.go

Lines changed: 111 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package handlers
1919
import (
2020
"context"
2121
"encoding/json"
22+
"strconv"
2223
"strings"
2324
"testing"
2425

@@ -211,69 +212,95 @@ func TestHandleRequestBody(t *testing.T) {
211212
"model": 1,
212213
"prompt": "Tell me a joke",
213214
},
214-
want: []*extProcPb.ProcessingResponse{
215-
{
216-
Response: &extProcPb.ProcessingResponse_RequestBody{
217-
RequestBody: &extProcPb.BodyResponse{
218-
Response: &extProcPb.CommonResponse{
219-
// Necessary so that the new headers are used in the routing decision.
220-
ClearRouteCache: true,
221-
HeaderMutation: &extProcPb.HeaderMutation{
222-
SetHeaders: []*basepb.HeaderValueOption{
223-
{
224-
Header: &basepb.HeaderValue{
225-
Key: ModelHeader,
226-
RawValue: []byte("1"),
215+
want: func() []*extProcPb.ProcessingResponse {
216+
b, _ := json.Marshal(map[string]any{"model": 1, "prompt": "Tell me a joke"})
217+
return []*extProcPb.ProcessingResponse{
218+
{
219+
Response: &extProcPb.ProcessingResponse_RequestBody{
220+
RequestBody: &extProcPb.BodyResponse{
221+
Response: &extProcPb.CommonResponse{
222+
ClearRouteCache: true,
223+
HeaderMutation: &extProcPb.HeaderMutation{
224+
SetHeaders: []*basepb.HeaderValueOption{
225+
{
226+
Header: &basepb.HeaderValue{
227+
Key: ModelHeader,
228+
RawValue: []byte("1"),
229+
},
227230
},
228-
},
229-
{
230-
Header: &basepb.HeaderValue{
231-
Key: BaseModelHeader,
232-
RawValue: []byte(""),
231+
{
232+
Header: &basepb.HeaderValue{
233+
Key: BaseModelHeader,
234+
RawValue: []byte(""),
235+
},
236+
},
237+
{
238+
Header: &basepb.HeaderValue{
239+
Key: contentLengthHeader,
240+
RawValue: []byte(strconv.Itoa(len(b))),
241+
},
233242
},
234243
},
235244
},
245+
BodyMutation: &extProcPb.BodyMutation{
246+
Mutation: &extProcPb.BodyMutation_Body{
247+
Body: b,
248+
},
249+
},
236250
},
237251
},
238252
},
239253
},
240-
},
241-
},
254+
}
255+
}(),
242256
},
243257
{
244258
name: "success",
245259
body: map[string]any{
246260
"model": "foo",
247261
"prompt": "Tell me a joke",
248262
},
249-
want: []*extProcPb.ProcessingResponse{
250-
{
251-
Response: &extProcPb.ProcessingResponse_RequestBody{
252-
RequestBody: &extProcPb.BodyResponse{
253-
Response: &extProcPb.CommonResponse{
254-
// Necessary so that the new headers are used in the routing decision.
255-
ClearRouteCache: true,
256-
HeaderMutation: &extProcPb.HeaderMutation{
257-
SetHeaders: []*basepb.HeaderValueOption{
258-
{
259-
Header: &basepb.HeaderValue{
260-
Key: ModelHeader,
261-
RawValue: []byte("foo"),
263+
want: func() []*extProcPb.ProcessingResponse {
264+
b, _ := json.Marshal(map[string]any{"model": "foo", "prompt": "Tell me a joke"})
265+
return []*extProcPb.ProcessingResponse{
266+
{
267+
Response: &extProcPb.ProcessingResponse_RequestBody{
268+
RequestBody: &extProcPb.BodyResponse{
269+
Response: &extProcPb.CommonResponse{
270+
ClearRouteCache: true,
271+
HeaderMutation: &extProcPb.HeaderMutation{
272+
SetHeaders: []*basepb.HeaderValueOption{
273+
{
274+
Header: &basepb.HeaderValue{
275+
Key: ModelHeader,
276+
RawValue: []byte("foo"),
277+
},
262278
},
263-
},
264-
{
265-
Header: &basepb.HeaderValue{
266-
Key: BaseModelHeader,
267-
RawValue: []byte(""),
279+
{
280+
Header: &basepb.HeaderValue{
281+
Key: BaseModelHeader,
282+
RawValue: []byte(""),
283+
},
284+
},
285+
{
286+
Header: &basepb.HeaderValue{
287+
Key: contentLengthHeader,
288+
RawValue: []byte(strconv.Itoa(len(b))),
289+
},
268290
},
269291
},
270292
},
293+
BodyMutation: &extProcPb.BodyMutation{
294+
Mutation: &extProcPb.BodyMutation_Body{
295+
Body: b,
296+
},
297+
},
271298
},
272299
},
273300
},
274301
},
275-
},
276-
},
302+
}
303+
}(),
277304
},
278305
{
279306
name: "success-with-streaming",
@@ -282,52 +309,58 @@ func TestHandleRequestBody(t *testing.T) {
282309
"prompt": "Tell me a joke",
283310
},
284311
streaming: true,
285-
want: []*extProcPb.ProcessingResponse{
286-
{
287-
Response: &extProcPb.ProcessingResponse_RequestHeaders{
288-
RequestHeaders: &extProcPb.HeadersResponse{
289-
Response: &extProcPb.CommonResponse{
290-
ClearRouteCache: true,
291-
HeaderMutation: &extProcPb.HeaderMutation{
292-
SetHeaders: []*basepb.HeaderValueOption{
293-
{
294-
Header: &basepb.HeaderValue{
295-
Key: ModelHeader,
296-
RawValue: []byte("foo"),
312+
want: func() []*extProcPb.ProcessingResponse {
313+
b, _ := json.Marshal(map[string]any{"model": "foo", "prompt": "Tell me a joke"})
314+
return []*extProcPb.ProcessingResponse{
315+
{
316+
Response: &extProcPb.ProcessingResponse_RequestHeaders{
317+
RequestHeaders: &extProcPb.HeadersResponse{
318+
Response: &extProcPb.CommonResponse{
319+
ClearRouteCache: true,
320+
HeaderMutation: &extProcPb.HeaderMutation{
321+
SetHeaders: []*basepb.HeaderValueOption{
322+
{
323+
Header: &basepb.HeaderValue{
324+
Key: ModelHeader,
325+
RawValue: []byte("foo"),
326+
},
297327
},
298-
},
299-
{
300-
Header: &basepb.HeaderValue{
301-
Key: BaseModelHeader,
302-
RawValue: []byte(""),
328+
{
329+
Header: &basepb.HeaderValue{
330+
Key: BaseModelHeader,
331+
RawValue: []byte(""),
332+
},
333+
},
334+
{
335+
Header: &basepb.HeaderValue{
336+
Key: contentLengthHeader,
337+
RawValue: []byte(strconv.Itoa(len(b))),
338+
},
303339
},
304340
},
305341
},
306342
},
307343
},
308344
},
309345
},
310-
},
311-
{
312-
Response: &extProcPb.ProcessingResponse_RequestBody{
313-
RequestBody: &extProcPb.BodyResponse{
314-
Response: &extProcPb.CommonResponse{
315-
BodyMutation: &extProcPb.BodyMutation{
316-
Mutation: &extProcPb.BodyMutation_StreamedResponse{
317-
StreamedResponse: &extProcPb.StreamedBodyResponse{
318-
Body: mapToBytes(t, map[string]any{
319-
"model": "foo",
320-
"prompt": "Tell me a joke",
321-
}),
322-
EndOfStream: true,
346+
{
347+
Response: &extProcPb.ProcessingResponse_RequestBody{
348+
RequestBody: &extProcPb.BodyResponse{
349+
Response: &extProcPb.CommonResponse{
350+
BodyMutation: &extProcPb.BodyMutation{
351+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
352+
StreamedResponse: &extProcPb.StreamedBodyResponse{
353+
Body: b,
354+
EndOfStream: true,
355+
},
323356
},
324357
},
325358
},
326359
},
327360
},
328361
},
329-
},
330-
},
362+
}
363+
}(),
331364
},
332365
{
333366
name: "success-with-streaming-large-body",
@@ -365,6 +398,12 @@ func TestHandleRequestBody(t *testing.T) {
365398
RawValue: []byte(""),
366399
},
367400
},
401+
{
402+
Header: &basepb.HeaderValue{
403+
Key: contentLengthHeader,
404+
RawValue: []byte(strconv.Itoa(len(b))),
405+
},
406+
},
368407
},
369408
},
370409
},

pkg/bbr/handlers/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ const (
4242
ModelHeader = "X-Gateway-Model-Name"
4343
BaseModelHeader = "X-Gateway-Base-Model-Name"
4444

45+
contentLengthHeader = "Content-Length"
46+
4547
requestPluginExtensionPoint = "request"
4648
responsePluginExtensionPoint = "response"
4749
)

pkg/bbr/handlers/server_test.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package handlers
1818

1919
import (
2020
"context"
21+
"encoding/json"
22+
"strconv"
2123
"testing"
2224

2325
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -34,6 +36,7 @@ import (
3436
func TestHandleRequestBodyStreaming(t *testing.T) {
3537
ctx := logutil.NewTestLoggerIntoContext(context.Background())
3638

39+
b, _ := json.Marshal(map[string]any{"model": "foo"})
3740
cases := []struct {
3841
desc string
3942
streaming bool
@@ -42,9 +45,7 @@ func TestHandleRequestBodyStreaming(t *testing.T) {
4245
}{
4346
{
4447
desc: "no-streaming",
45-
body: mapToBytes(t, map[string]any{
46-
"model": "foo",
47-
}),
48+
body: b,
4849
want: []*extProcPb.ProcessingResponse{
4950
{
5051
Response: &extProcPb.ProcessingResponse_RequestBody{
@@ -66,6 +67,17 @@ func TestHandleRequestBodyStreaming(t *testing.T) {
6667
RawValue: []byte(""),
6768
},
6869
},
70+
{
71+
Header: &basepb.HeaderValue{
72+
Key: contentLengthHeader,
73+
RawValue: []byte(strconv.Itoa(len(b))),
74+
},
75+
},
76+
},
77+
},
78+
BodyMutation: &extProcPb.BodyMutation{
79+
Mutation: &extProcPb.BodyMutation_Body{
80+
Body: b,
6981
},
7082
},
7183
},
@@ -77,9 +89,7 @@ func TestHandleRequestBodyStreaming(t *testing.T) {
7789
{
7890
desc: "streaming",
7991
streaming: true,
80-
body: mapToBytes(t, map[string]any{
81-
"model": "foo",
82-
}),
92+
body: b,
8393
want: []*extProcPb.ProcessingResponse{
8494
{
8595
Response: &extProcPb.ProcessingResponse_RequestHeaders{
@@ -100,6 +110,12 @@ func TestHandleRequestBodyStreaming(t *testing.T) {
100110
RawValue: []byte(""),
101111
},
102112
},
113+
{
114+
Header: &basepb.HeaderValue{
115+
Key: contentLengthHeader,
116+
RawValue: []byte(strconv.Itoa(len(b))),
117+
},
118+
},
103119
},
104120
},
105121
},
@@ -113,9 +129,7 @@ func TestHandleRequestBodyStreaming(t *testing.T) {
113129
BodyMutation: &extProcPb.BodyMutation{
114130
Mutation: &extProcPb.BodyMutation_StreamedResponse{
115131
StreamedResponse: &extProcPb.StreamedBodyResponse{
116-
Body: mapToBytes(t, map[string]any{
117-
"model": "foo",
118-
}),
132+
Body: b,
119133
EndOfStream: true,
120134
},
121135
},

0 commit comments

Comments
 (0)