Skip to content

Commit 817bb85

Browse files
spacewanderjohnlanni
authored andcommitted
golang: allow injecting extra data (envoyproxy#38362)
<!-- !!!ATTENTION!!! If you are fixing *any* crash or *any* potential security issue, *do not* open a pull request in this repo. Please report the issue via emailing envoy-security@googlegroups.com where the issue will be triaged appropriately. Thank you in advance for helping to keep Envoy secure. !!!ATTENTION!!! For an explanation of how to fill out the fields, please see the relevant section in [PULL_REQUESTS.md](https://github.com/envoyproxy/envoy/blob/main/PULL_REQUESTS.md) --> Commit Message: golang: allow injecting extra data Additional Description: This PR adds a feature that allows users to flush the data immediately when processing the data asynchronously. Risk Level: Low Testing: Integration test Docs Changes: Release Notes: Platform Specific Features: [Optional Runtime guard:] [Optional Fixes #Issue] [Optional Fixes commit #PR or SHA] [Optional Deprecated:] [Optional [API Considerations](https://github.com/envoyproxy/envoy/blob/main/api/review_checklist.md):] Signed-off-by: spacewander <spacewanderlzx@gmail.com>
1 parent 3954131 commit 817bb85

File tree

15 files changed

+434
-1
lines changed

15 files changed

+434
-1
lines changed

contrib/golang/common/go/api/api.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ typedef enum { // NOLINT(modernize-use-using)
6969
CAPIYield = -6,
7070
CAPIInternalFailure = -7,
7171
CAPISerializationFailure = -8,
72+
CAPIInvalidScene = -9,
7273
} CAPIStatus;
7374

7475
/* These APIs are related to the decode/encode phase, use the pointer of processState. */
@@ -79,6 +80,7 @@ CAPIStatus envoyGoFilterHttpSendLocalReply(void* s, int response_code, void* bod
7980
int details_len);
8081
CAPIStatus envoyGoFilterHttpSendPanicReply(void* s, void* details_data, int details_len);
8182
CAPIStatus envoyGoFilterHttpAddData(void* s, void* data, int data_len, bool is_streaming);
83+
CAPIStatus envoyGoFilterHttpInjectData(void* s, void* data, int data_len);
8284

8385
CAPIStatus envoyGoFilterHttpGetHeader(void* s, void* key_data, int key_len, uint64_t* value_data,
8486
int* value_len);

contrib/golang/common/go/api/capi.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type HttpCAPI interface {
2424
HttpContinue(s unsafe.Pointer, status uint64)
2525
HttpSendLocalReply(s unsafe.Pointer, responseCode int, bodyText string, headers map[string][]string, grpcStatus int64, details string)
2626
HttpAddData(s unsafe.Pointer, data []byte, isStreaming bool)
27+
HttpInjectData(s unsafe.Pointer, data []byte)
2728

2829
// Send a specialized reply that indicates that the filter has failed on the go side. Internally this is used for
2930
// when unhandled panics are detected.

contrib/golang/common/go/api/filter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ type FilterProcessCallbacks interface {
188188
// For example, turn a headers only request into a request with a body, add more body when processing trailers, and so on.
189189
// The second argument isStreaming supplies if this caller streams data or buffers the full body.
190190
AddData(data []byte, isStreaming bool)
191+
// InjectData inject the content of slice data via Envoy StreamXXFilterCallbacks's injectXXDataToFilterChaininjectData.
192+
InjectData(data []byte)
191193
}
192194

193195
type DecoderFilterCallbacks interface {

contrib/golang/filters/http/source/cgo.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ CAPIStatus envoyGoFilterHttpAddData(void* s, void* data, int data_len, bool is_s
151151
});
152152
}
153153

154+
CAPIStatus envoyGoFilterHttpInjectData(void* s, void* data, int data_length) {
155+
return envoyGoFilterProcessStateHandlerWrapper(
156+
s, [data, data_length](std::shared_ptr<Filter>& filter, ProcessorState& state) -> CAPIStatus {
157+
auto value = stringViewFromGoPointer(data, data_length);
158+
return filter->injectData(state, value);
159+
});
160+
}
161+
154162
// unsafe API, without copy memory from c to go.
155163
CAPIStatus envoyGoFilterHttpGetHeader(void* s, void* key_data, int key_len, uint64_t* value_data,
156164
int* value_len) {

contrib/golang/filters/http/source/go/pkg/http/capi_impl.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ func handleCApiStatus(status C.CAPIStatus) {
7777
case C.CAPIFilterIsGone,
7878
C.CAPIFilterIsDestroy,
7979
C.CAPINotInGo,
80-
C.CAPIInvalidPhase:
80+
C.CAPIInvalidPhase,
81+
C.CAPIInvalidScene:
8182
panic(capiStatusToStr(status))
8283
}
8384
}
@@ -92,6 +93,8 @@ func capiStatusToStr(status C.CAPIStatus) string {
9293
return errNotInGo
9394
case C.CAPIInvalidPhase:
9495
return errInvalidPhase
96+
case C.CAPIInvalidScene:
97+
return errInvalidScene
9598
}
9699

97100
return "unknown status"
@@ -154,6 +157,13 @@ func (c *httpCApiImpl) HttpAddData(s unsafe.Pointer, data []byte, isStreaming bo
154157
handleCApiStatus(res)
155158
}
156159

160+
func (c *httpCApiImpl) HttpInjectData(s unsafe.Pointer, data []byte) {
161+
state := (*processState)(s)
162+
res := C.envoyGoFilterHttpInjectData(unsafe.Pointer(state.processState),
163+
unsafe.Pointer(unsafe.SliceData(data)), C.int(len(data)))
164+
handleCApiStatus(res)
165+
}
166+
157167
func (c *httpCApiImpl) HttpGetHeader(s unsafe.Pointer, key string) string {
158168
state := (*processState)(s)
159169
var valueData C.uint64_t

contrib/golang/filters/http/source/go/pkg/http/filter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,10 @@ func (s *processState) AddData(data []byte, isStreaming bool) {
171171
cAPI.HttpAddData(unsafe.Pointer(s), data, isStreaming)
172172
}
173173

174+
func (s *processState) InjectData(data []byte) {
175+
cAPI.HttpInjectData(unsafe.Pointer(s), data)
176+
}
177+
174178
func (r *httpRequest) StreamInfo() api.StreamInfo {
175179
return &r.streamInfo
176180
}

contrib/golang/filters/http/source/go/pkg/http/type.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const (
3232
errFilterDestroyed = "golang filter has been destroyed"
3333
errNotInGo = "not proccessing Go"
3434
errInvalidPhase = "invalid phase, maybe headers/buffer already continued"
35+
errInvalidScene = "invalid scene for this API"
3536
)
3637

3738
// api.HeaderMap

contrib/golang/filters/http/source/golang_filter.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,43 @@ CAPIStatus Filter::addData(ProcessorState& state, absl::string_view data, bool i
568568
return CAPIStatus::CAPIYield;
569569
}
570570

571+
CAPIStatus Filter::injectData(ProcessorState& state, absl::string_view data) {
572+
// lock until this function return since it may running in a Go thread.
573+
Thread::LockGuard lock(mutex_);
574+
if (has_destroyed_) {
575+
ENVOY_LOG(debug, "golang filter has been destroyed");
576+
return CAPIStatus::CAPIFilterIsDestroy;
577+
}
578+
if (!state.isProcessingInGo()) {
579+
ENVOY_LOG(debug, "golang filter is not processing Go");
580+
return CAPIStatus::CAPINotInGo;
581+
}
582+
if (state.filterState() != FilterState::ProcessingData) {
583+
ENVOY_LOG(error, "injectData is not supported when calling without processing data, use "
584+
"`addData` instead.");
585+
return CAPIStatus::CAPIInvalidPhase;
586+
}
587+
588+
if (state.isThreadSafe()) {
589+
ENVOY_LOG(error, "injectData is not supported when calling inside the callback context");
590+
return CAPIStatus::CAPIInvalidScene;
591+
}
592+
593+
auto data_to_write = std::make_shared<Buffer::OwnedImpl>(data);
594+
auto weak_ptr = weak_from_this();
595+
state.getDispatcher().post([this, &state, weak_ptr, data_to_write] {
596+
if (!weak_ptr.expired() && !hasDestroyed()) {
597+
ENVOY_LOG(debug, "golang filter inject data to filter chain, length: {}",
598+
data_to_write->length());
599+
state.injectDataToFilterChain(*data_to_write.get(), false);
600+
} else {
601+
ENVOY_LOG(debug, "golang filter has gone or destroyed in injectData event");
602+
}
603+
});
604+
605+
return CAPIStatus::CAPIOK;
606+
}
607+
571608
CAPIStatus Filter::getHeader(ProcessorState& state, absl::string_view key, uint64_t* value_data,
572609
int* value_len) {
573610
Thread::LockGuard lock(mutex_);

contrib/golang/filters/http/source/golang_filter.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ class Filter : public Http::StreamFilter,
272272
CAPIStatus sendPanicReply(ProcessorState& state, absl::string_view details);
273273

274274
CAPIStatus addData(ProcessorState& state, absl::string_view data, bool is_streaming);
275+
CAPIStatus injectData(ProcessorState& state, absl::string_view data);
275276

276277
CAPIStatus getHeader(ProcessorState& state, absl::string_view key, uint64_t* value_data,
277278
int* value_len);

contrib/golang/filters/http/test/golang_integration_test.cc

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,7 @@ name: golang
810810
const std::string METRIC{"metric"};
811811
const std::string ACTION{"action"};
812812
const std::string ADDDATA{"add_data"};
813+
const std::string BUFFERINJECTDATA{"bufferinjectdata"};
813814
};
814815

815816
INSTANTIATE_TEST_SUITE_P(IpVersions, GolangIntegrationTest,
@@ -1350,6 +1351,158 @@ TEST_P(GolangIntegrationTest, AddDataBufferAllDataAndAsync) {
13501351
cleanup();
13511352
}
13521353

1354+
TEST_P(GolangIntegrationTest, BufferInjectData_InBufferedDownstreamRequest) {
1355+
initializeBasicFilter(BUFFERINJECTDATA, "test.com");
1356+
1357+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1358+
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
1359+
{":path", "/test?bufferingly_decode"},
1360+
{":scheme", "http"},
1361+
{":authority", "test.com"}};
1362+
1363+
auto encoder_decoder = codec_client_->startRequest(request_headers, false);
1364+
Http::RequestEncoder& request_encoder = encoder_decoder.first;
1365+
codec_client_->sendData(request_encoder, "To ", false);
1366+
codec_client_->sendData(request_encoder, "be, ", true);
1367+
1368+
waitForNextUpstreamRequest();
1369+
1370+
auto body = "To be, or not to be, that is the question";
1371+
EXPECT_EQ(body, upstream_request_->body().toString());
1372+
1373+
cleanup();
1374+
}
1375+
1376+
TEST_P(GolangIntegrationTest, BufferInjectData_InNonBufferedDownstreamRequest) {
1377+
initializeBasicFilter(BUFFERINJECTDATA, "test.com");
1378+
1379+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1380+
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
1381+
{":path", "/test?nonbufferingly_decode"},
1382+
{":scheme", "http"},
1383+
{":authority", "test.com"}};
1384+
1385+
auto encoder_decoder = codec_client_->startRequest(request_headers, false);
1386+
Http::RequestEncoder& request_encoder = encoder_decoder.first;
1387+
codec_client_->sendData(request_encoder, "To be, ", false);
1388+
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
1389+
Event::Dispatcher::RunType::NonBlock);
1390+
codec_client_->sendData(request_encoder, "that is ", true);
1391+
1392+
waitForNextUpstreamRequest();
1393+
1394+
auto body = "To be, or not to be, that is the question";
1395+
EXPECT_EQ(body, upstream_request_->body().toString());
1396+
1397+
cleanup();
1398+
}
1399+
1400+
TEST_P(GolangIntegrationTest, BufferInjectData_InBufferedUpstreamResponse) {
1401+
initializeBasicFilter(BUFFERINJECTDATA, "test.com");
1402+
1403+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1404+
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
1405+
{":path", "/test?bufferingly_encode"},
1406+
{":scheme", "http"},
1407+
{":authority", "test.com"}};
1408+
1409+
auto encoder_decoder = codec_client_->startRequest(request_headers, true);
1410+
auto response = std::move(encoder_decoder.second);
1411+
1412+
waitForNextUpstreamRequest();
1413+
1414+
Http::TestResponseHeaderMapImpl response_headers{
1415+
{":status", "200"},
1416+
};
1417+
upstream_request_->encodeHeaders(response_headers, false);
1418+
Buffer::OwnedImpl response_data("To ");
1419+
upstream_request_->encodeData(response_data, false);
1420+
Buffer::OwnedImpl response_data2("be, ");
1421+
upstream_request_->encodeData(response_data2, true);
1422+
1423+
ASSERT_TRUE(response->waitForEndStream());
1424+
1425+
auto body = "To be, or not to be, that is the question";
1426+
EXPECT_EQ(body, response->body());
1427+
1428+
cleanup();
1429+
}
1430+
1431+
TEST_P(GolangIntegrationTest, BufferInjectData_InNonBufferedUpstreamResponse) {
1432+
initializeBasicFilter(BUFFERINJECTDATA, "test.com");
1433+
1434+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1435+
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
1436+
{":path", "/test?nonbufferingly_encode"},
1437+
{":scheme", "http"},
1438+
{":authority", "test.com"}};
1439+
1440+
auto encoder_decoder = codec_client_->startRequest(request_headers, true);
1441+
auto response = std::move(encoder_decoder.second);
1442+
1443+
waitForNextUpstreamRequest();
1444+
1445+
Http::TestResponseHeaderMapImpl response_headers{
1446+
{":status", "200"},
1447+
};
1448+
upstream_request_->encodeHeaders(response_headers, false);
1449+
Buffer::OwnedImpl response_data("To be, ");
1450+
upstream_request_->encodeData(response_data, false);
1451+
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
1452+
Event::Dispatcher::RunType::NonBlock);
1453+
Buffer::OwnedImpl response_data2("that is ");
1454+
upstream_request_->encodeData(response_data2, true);
1455+
1456+
ASSERT_TRUE(response->waitForEndStream());
1457+
1458+
auto body = "To be, or not to be, that is the question";
1459+
EXPECT_EQ(body, response->body());
1460+
1461+
cleanup();
1462+
}
1463+
1464+
TEST_P(GolangIntegrationTest, BufferInjectData_WithoutProcessingData) {
1465+
initializeBasicFilter(BUFFERINJECTDATA, "test.com");
1466+
1467+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1468+
Http::TestRequestHeaderMapImpl request_headers{
1469+
{":method", "POST"},
1470+
{":path", "/test?inject_data_when_processing_header"},
1471+
{":scheme", "http"},
1472+
{":authority", "test.com"}};
1473+
1474+
auto encoder_decoder = codec_client_->startRequest(request_headers, true);
1475+
auto response = std::move(encoder_decoder.second);
1476+
1477+
ASSERT_TRUE(response->waitForEndStream());
1478+
1479+
EXPECT_EQ("400", response->headers().getStatusValue());
1480+
1481+
cleanup();
1482+
}
1483+
1484+
TEST_P(GolangIntegrationTest, BufferInjectData_ProcessingDataSynchronously) {
1485+
initializeBasicFilter(BUFFERINJECTDATA, "test.com");
1486+
1487+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1488+
Http::TestRequestHeaderMapImpl request_headers{
1489+
{":method", "POST"},
1490+
{":path", "/test?inject_data_when_processing_data_synchronously"},
1491+
{":scheme", "http"},
1492+
{":authority", "test.com"}};
1493+
1494+
auto encoder_decoder = codec_client_->startRequest(request_headers, false);
1495+
Http::RequestEncoder& request_encoder = encoder_decoder.first;
1496+
codec_client_->sendData(request_encoder, "blahblah", true);
1497+
auto response = std::move(encoder_decoder.second);
1498+
1499+
ASSERT_TRUE(response->waitForEndStream());
1500+
1501+
EXPECT_EQ("400", response->headers().getStatusValue());
1502+
1503+
cleanup();
1504+
}
1505+
13531506
// Buffer exceed limit in decode header phase.
13541507
TEST_P(GolangIntegrationTest, BufferExceedLimit_DecodeHeader) {
13551508
testBufferExceedLimit("/test?databuffer=decode-header");

0 commit comments

Comments
 (0)