Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
in_opentelemetry: unify signal content-type handling across metrics/l…
…ogs/traces

  - Added shared content-type/payload helpers in plugins/in_opentelemetry/opentelemetry_utils.{h,c}:
      - opentelemetry_is_json_content_type
      - opentelemetry_is_protobuf_content_type
      - opentelemetry_is_grpc_content_type
      - opentelemetry_payload_starts_with_json_object
  - Switched logs/traces/metrics paths to use shared helpers:
      - plugins/in_opentelemetry/opentelemetry_logs.c
      - plugins/in_opentelemetry/opentelemetry_traces.c
      - plugins/in_opentelemetry/opentelemetry_prot.c
  - Removed duplicated local gRPC content-type checker from opentelemetry_prot.c.
  - Improved coherence across signals:
      - unified JSON/protobuf/gRPC detection
      - accepts content-type params (e.g. application/json; charset=utf-8)
      - consistent JSON payload prefix validation for logs/traces.

Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
  • Loading branch information
edsiper committed Feb 12, 2026
commit 088238d202a08a1687f39091109be4b137ab7dac
8 changes: 3 additions & 5 deletions plugins/in_opentelemetry/opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -641,16 +641,14 @@ int opentelemetry_process_logs(struct flb_opentelemetry *ctx,

/* Detect the type of payload */
if (content_type) {
if (strcasecmp(content_type, "application/json") == 0) {
if (buf[0] != '{') {
if (opentelemetry_is_json_content_type(content_type) == FLB_TRUE) {
if (opentelemetry_payload_starts_with_json_object(buf, size) != FLB_TRUE) {
flb_plg_error(ctx->ins, "Invalid JSON payload");
return -1;
}
is_proto = FLB_FALSE;
}
else if (strcasecmp(content_type, "application/protobuf") == 0 ||
strcasecmp(content_type, "application/grpc") == 0 ||
strcasecmp(content_type, "application/x-protobuf") == 0) {
else if (opentelemetry_is_protobuf_content_type(content_type) == FLB_TRUE) {
is_proto = FLB_TRUE;
}
else {
Expand Down
38 changes: 9 additions & 29 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,6 @@

#define HTTP_CONTENT_JSON 0

static int is_grpc_content_type(const char *content_type)
{
if (content_type == NULL) {
return FLB_FALSE;
}

if (strncasecmp(content_type, "application/grpc", 16) != 0) {
return FLB_FALSE;
}

if (content_type[16] == '\0' ||
content_type[16] == '+' ||
content_type[16] == ';') {
return FLB_TRUE;
}

return FLB_FALSE;
}

static int is_profiles_export_path(const char *path)
{
if (path == NULL) {
Expand Down Expand Up @@ -157,7 +138,7 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co
offset = 0;

if (content_type != NULL &&
strcasecmp(content_type, "application/json") == 0) {
opentelemetry_is_json_content_type(content_type) == FLB_TRUE) {
result = flb_opentelemetry_metrics_json_to_cmt(&decoded_contexts,
request->data.data,
request->data.len);
Expand Down Expand Up @@ -952,14 +933,13 @@ static int process_payload_metrics_ng(struct flb_opentelemetry *ctx,
offset = 0;

/* note: if the content type is gRPC, it was already decoded */
if (strcasecmp(request->content_type, "application/json") == 0) {
if (opentelemetry_is_json_content_type(request->content_type) == FLB_TRUE) {
result = flb_opentelemetry_metrics_json_to_cmt(&decoded_contexts,
payload,
payload_size);
}
else if (is_grpc_content_type(request->content_type) == FLB_TRUE ||
strcasecmp(request->content_type, "application/x-protobuf") == 0 ||
strcasecmp(request->content_type, "application/protobuf") == 0) {
else if (opentelemetry_is_protobuf_content_type(request->content_type) ==
FLB_TRUE) {
result = cmt_decode_opentelemetry_create(&decoded_contexts,
payload,
payload_size,
Expand Down Expand Up @@ -1070,15 +1050,14 @@ static int process_payload_profiles_ng(struct flb_opentelemetry *ctx,
flb_error("[otel] content type missing");
return -1;
}
else if (strcasecmp(request->content_type, "application/json") == 0) {
else if (opentelemetry_is_json_content_type(request->content_type) == FLB_TRUE) {
flb_error("[otel] unsuported profiles encoding type : %s",
request->content_type);

return -1;
}
else if (is_grpc_content_type(request->content_type) == FLB_TRUE ||
strcasecmp(request->content_type, "application/protobuf") == 0 ||
strcasecmp(request->content_type, "application/x-protobuf") == 0) {
else if (opentelemetry_is_protobuf_content_type(request->content_type) ==
FLB_TRUE) {
profiles_context = NULL;
offset = 0;

Expand Down Expand Up @@ -1250,7 +1229,8 @@ int opentelemetry_prot_handle_ng(struct flb_http_request *request,
}

/* Check if the payload is gRPC compressed */
if (grpc_request && is_grpc_content_type(request->content_type) == FLB_TRUE) {
if (grpc_request &&
opentelemetry_is_grpc_content_type(request->content_type) == FLB_TRUE) {

next_grpc_message:

Expand Down
9 changes: 4 additions & 5 deletions plugins/in_opentelemetry/opentelemetry_traces.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "opentelemetry.h"
#include "opentelemetry_traces.h"
#include "opentelemetry_utils.h"

int opentelemetry_traces_process_protobuf(struct flb_opentelemetry *ctx,
flb_sds_t tag,
Expand Down Expand Up @@ -157,17 +158,15 @@ int opentelemetry_process_traces(struct flb_opentelemetry *ctx,

/* Detect the type of payload */
if (content_type) {
if (strcasecmp(content_type, "application/json") == 0) {
if (buf[0] != '{') {
if (opentelemetry_is_json_content_type(content_type) == FLB_TRUE) {
if (opentelemetry_payload_starts_with_json_object(buf, size) != FLB_TRUE) {
flb_plg_error(ctx->ins, "Invalid JSON payload");
return -1;
}

is_proto = FLB_FALSE;
}
else if (strcasecmp(content_type, "application/protobuf") == 0 ||
strcasecmp(content_type, "application/grpc") == 0 ||
strcasecmp(content_type, "application/x-protobuf") == 0) {
else if (opentelemetry_is_protobuf_content_type(content_type) == FLB_TRUE) {
is_proto = FLB_TRUE;
}
else {
Expand Down
100 changes: 100 additions & 0 deletions plugins/in_opentelemetry/opentelemetry_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,106 @@ int json_payload_get_wrapped_value(msgpack_object *wrapper,
return 0;
}

static int opentelemetry_content_type_matches(const char *content_type,
const char *expected_content_type)
{
size_t expected_length;
char trailing_character;

if (content_type == NULL || expected_content_type == NULL) {
return FLB_FALSE;
}

expected_length = strlen(expected_content_type);

if (strncasecmp(content_type, expected_content_type, expected_length) != 0) {
return FLB_FALSE;
}

trailing_character = content_type[expected_length];

if (trailing_character == '\0' ||
trailing_character == ';' ||
trailing_character == ' ' ||
trailing_character == '\t') {
return FLB_TRUE;
}

return FLB_FALSE;
}

int opentelemetry_is_grpc_content_type(const char *content_type)
{
if (content_type == NULL) {
return FLB_FALSE;
}

if (strncasecmp(content_type, "application/grpc", 16) != 0) {
return FLB_FALSE;
}

if (content_type[16] == '\0' ||
content_type[16] == '+' ||
content_type[16] == ';' ||
content_type[16] == ' ' ||
content_type[16] == '\t') {
return FLB_TRUE;
}

return FLB_FALSE;
}

int opentelemetry_is_json_content_type(const char *content_type)
{
return opentelemetry_content_type_matches(content_type, "application/json");
}

int opentelemetry_is_protobuf_content_type(const char *content_type)
{
if (opentelemetry_content_type_matches(content_type, "application/protobuf") ==
FLB_TRUE) {
return FLB_TRUE;
}

if (opentelemetry_content_type_matches(content_type,
"application/x-protobuf") == FLB_TRUE) {
return FLB_TRUE;
}

if (opentelemetry_is_grpc_content_type(content_type) == FLB_TRUE) {
return FLB_TRUE;
}

return FLB_FALSE;
}

int opentelemetry_payload_starts_with_json_object(const void *payload,
size_t payload_size)
{
size_t index;
const unsigned char *buffer;

if (payload == NULL || payload_size == 0) {
return FLB_FALSE;
}

buffer = (const unsigned char *) payload;

for (index = 0; index < payload_size; index++) {
if (isspace(buffer[index])) {
continue;
}

if (buffer[index] == '{') {
return FLB_TRUE;
}

return FLB_FALSE;
}

return FLB_FALSE;
}

static int hex_to_int(char ch)
{
if (ch >= '0' && ch <= '9') {
Expand Down
5 changes: 5 additions & 0 deletions plugins/in_opentelemetry/opentelemetry_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ int json_payload_get_wrapped_value(msgpack_object *wrapper,
msgpack_object **value,
int *type);

int opentelemetry_is_grpc_content_type(const char *content_type);
int opentelemetry_is_json_content_type(const char *content_type);
int opentelemetry_is_protobuf_content_type(const char *content_type);
int opentelemetry_payload_starts_with_json_object(const void *payload, size_t payload_size);

int hex_to_id(char *str, int len, unsigned char *out_buf, int out_size);
uint64_t convert_string_number_to_u64(char *str, size_t len);

Expand Down
Loading