Skip to content

Commit 0ff1d13

Browse files
authored
[bbr] Skip body re-serialization and mutation when unchanged (kubernetes-sigs#2556)
Add body mutation tracking to InferenceMessage via SetBody, SetBodyField, RemoveBodyField, and BodyMutated so the request and response handlers skip json.Marshal and Content-Length update when no plugin modified the body. SetBody replaces the entire body map for plugins that transform the full payload (e.g., API translation). SetBodyField and RemoveBodyField handle granular field-level mutations. For unary mode, omit BodyMutation from the ext_proc response when unchanged. For streaming mode, forward original bytes instead of re-marshaling. Add integration tests that verify body mutation tracking over real gRPC for both unary and streaming paths. Signed-off-by: Asaad Balum <asaad.balum@gmail.com>
1 parent 84aa88d commit 0ff1d13

File tree

11 files changed

+711
-186
lines changed

11 files changed

+711
-186
lines changed

pkg/bbr/framework/types.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type InferenceMessage struct {
3737
// mutations
3838
mutatedHeaders map[string]string
3939
removedHeaders sets.Set[string]
40+
bodyMutated bool
4041
}
4142

4243
func (r *InferenceMessage) SetHeader(key string, value string) {
@@ -63,6 +64,27 @@ func (r *InferenceMessage) RemovedHeaders() []string {
6364
return r.removedHeaders.UnsortedList()
6465
}
6566

67+
func (r *InferenceMessage) SetBody(body map[string]any) {
68+
r.Body = body
69+
r.bodyMutated = true
70+
}
71+
72+
func (r *InferenceMessage) SetBodyField(key string, value any) {
73+
r.Body[key] = value
74+
r.bodyMutated = true
75+
}
76+
77+
func (r *InferenceMessage) RemoveBodyField(key string) {
78+
if _, ok := r.Body[key]; ok {
79+
delete(r.Body, key)
80+
r.bodyMutated = true
81+
}
82+
}
83+
84+
func (r *InferenceMessage) BodyMutated() bool {
85+
return r.bodyMutated
86+
}
87+
6688
type InferenceRequest struct {
6789
InferenceMessage
6890
}

pkg/bbr/framework/types_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
Copyright 2026 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package framework
18+
19+
import (
20+
"testing"
21+
)
22+
23+
func TestSetBodyField(t *testing.T) {
24+
msg := newInferenceMessage()
25+
if msg.BodyMutated() {
26+
t.Error("new message should not be marked as body-mutated")
27+
}
28+
29+
msg.SetBodyField("key", "value")
30+
31+
if !msg.BodyMutated() {
32+
t.Error("expected BodyMutated() to return true after SetBodyField")
33+
}
34+
if got, ok := msg.Body["key"]; !ok || got != "value" {
35+
t.Errorf("Body[\"key\"] = %v, %v; want \"value\", true", got, ok)
36+
}
37+
}
38+
39+
func TestSetBodyField_Overwrite(t *testing.T) {
40+
msg := newInferenceMessage()
41+
msg.Body["existing"] = "old"
42+
43+
msg.SetBodyField("existing", "new")
44+
45+
if !msg.BodyMutated() {
46+
t.Error("expected BodyMutated() to return true after overwriting a field")
47+
}
48+
if got := msg.Body["existing"]; got != "new" {
49+
t.Errorf("Body[\"existing\"] = %v; want \"new\"", got)
50+
}
51+
}
52+
53+
func TestRemoveBodyField(t *testing.T) {
54+
msg := newInferenceMessage()
55+
msg.Body["key"] = "value"
56+
57+
msg.RemoveBodyField("key")
58+
59+
if !msg.BodyMutated() {
60+
t.Error("expected BodyMutated() to return true after RemoveBodyField")
61+
}
62+
if _, ok := msg.Body["key"]; ok {
63+
t.Error("expected key to be removed from Body")
64+
}
65+
}
66+
67+
func TestRemoveBodyField_NonExistent(t *testing.T) {
68+
msg := newInferenceMessage()
69+
70+
msg.RemoveBodyField("missing")
71+
72+
if msg.BodyMutated() {
73+
t.Error("removing a non-existent field should not mark body as mutated")
74+
}
75+
}
76+
77+
func TestSetBody(t *testing.T) {
78+
msg := newInferenceMessage()
79+
80+
msg.SetBody(map[string]any{"model": "llama", "prompt": "hello"})
81+
82+
if !msg.BodyMutated() {
83+
t.Error("expected BodyMutated() to return true after SetBody")
84+
}
85+
if got, ok := msg.Body["model"]; !ok || got != "llama" {
86+
t.Errorf("Body[\"model\"] = %v, %v; want \"llama\", true", got, ok)
87+
}
88+
if got, ok := msg.Body["prompt"]; !ok || got != "hello" {
89+
t.Errorf("Body[\"prompt\"] = %v, %v; want \"hello\", true", got, ok)
90+
}
91+
}
92+
93+
func TestBodyMutated_FalseByDefault(t *testing.T) {
94+
req := NewInferenceRequest()
95+
if req.BodyMutated() {
96+
t.Error("new InferenceRequest should not be marked as body-mutated")
97+
}
98+
99+
resp := NewInferenceResponse()
100+
if resp.BodyMutated() {
101+
t.Error("new InferenceResponse should not be marked as body-mutated")
102+
}
103+
}

pkg/bbr/handlers/request.go

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,16 @@ func (s *Server) HandleRequestBody(ctx context.Context, reqCtx *RequestContext,
6666
reqCtx.Request.SetHeader(BaseModelHeader, baseModel)
6767
logger.Info("Base model from datastore", "baseModel", baseModel)
6868

69-
// TODO: check and do this only if the request body actually changed.
70-
mutatedBodyBytes, err := json.Marshal(reqCtx.Request.Body)
71-
if err != nil {
72-
return nil, err
69+
bodyMutated := reqCtx.Request.BodyMutated()
70+
var mutatedBodyBytes []byte
71+
if bodyMutated {
72+
var err error
73+
mutatedBodyBytes, err = json.Marshal(reqCtx.Request.Body)
74+
if err != nil {
75+
return nil, err
76+
}
77+
reqCtx.Request.SetHeader(contentLengthHeader, strconv.Itoa(len(mutatedBodyBytes)))
7378
}
74-
reqCtx.Request.SetHeader(contentLengthHeader, strconv.Itoa(len(mutatedBodyBytes)))
7579

7680
metrics.RecordSuccessCounter()
7781

@@ -89,27 +93,35 @@ func (s *Server) HandleRequestBody(ctx context.Context, reqCtx *RequestContext,
8993
},
9094
},
9195
})
92-
ret = addStreamedBodyResponse(ret, mutatedBodyBytes)
96+
if bodyMutated {
97+
ret = addStreamedBodyResponse(ret, mutatedBodyBytes)
98+
} else {
99+
ret = addStreamedBodyResponse(ret, requestBodyBytes)
100+
}
93101
return ret, nil
94102
}
95103

104+
// Necessary so that the new headers are used in the routing decision.
105+
response := &eppb.CommonResponse{
106+
ClearRouteCache: true,
107+
HeaderMutation: &eppb.HeaderMutation{
108+
SetHeaders: envoy.GenerateHeadersMutation(reqCtx.Request.MutatedHeaders()),
109+
RemoveHeaders: reqCtx.Request.RemovedHeaders(),
110+
},
111+
}
112+
if bodyMutated {
113+
response.BodyMutation = &eppb.BodyMutation{
114+
Mutation: &eppb.BodyMutation_Body{
115+
Body: mutatedBodyBytes,
116+
},
117+
}
118+
}
119+
96120
return []*eppb.ProcessingResponse{
97121
{
98122
Response: &eppb.ProcessingResponse_RequestBody{
99123
RequestBody: &eppb.BodyResponse{
100-
Response: &eppb.CommonResponse{
101-
// Necessary so that the new headers are used in the routing decision.
102-
ClearRouteCache: true,
103-
HeaderMutation: &eppb.HeaderMutation{
104-
SetHeaders: envoy.GenerateHeadersMutation(reqCtx.Request.MutatedHeaders()),
105-
RemoveHeaders: reqCtx.Request.RemovedHeaders(),
106-
},
107-
BodyMutation: &eppb.BodyMutation{
108-
Mutation: &eppb.BodyMutation_Body{
109-
Body: mutatedBodyBytes,
110-
},
111-
},
112-
},
124+
Response: response,
113125
},
114126
},
115127
},

0 commit comments

Comments
 (0)