Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/brpc/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Extension {
void List(std::ostream& os, char separator);

private:
friend class butil::GetLeakySingleton<Extension<T> >;
template <typename U> friend U* butil::create_leaky_singleton_obj();
Extension() = default;
butil::CaseIgnoredFlatMap<T*> _instance_map;
butil::Mutex _map_mutex;
Expand Down
171 changes: 114 additions & 57 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,26 @@
#include <google/protobuf/descriptor.h> // MethodDescriptor
#include <google/protobuf/text_format.h>
#include <gflags/gflags.h>
#include <json2pb/pb_to_json.h> // ProtoMessageToJson
#include <json2pb/json_to_pb.h> // JsonToProtoMessage
#include <string>

#include "brpc/policy/http_rpc_protocol.h"
#include "butil/unique_ptr.h" // std::unique_ptr
#include "butil/string_splitter.h" // StringMultiSplitter
#include "butil/string_printf.h"
#include "butil/time.h"
#include "butil/sys_byteorder.h"
#include "json2pb/pb_to_json.h" // ProtoMessageToJson
#include "json2pb/json_to_pb.h" // JsonToProtoMessage
#include "brpc/compress.h"
#include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD
#include "brpc/controller.h" // Controller
#include "brpc/server.h" // Server
#include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD
#include "brpc/controller.h" // Controller
#include "brpc/server.h" // Server
#include "brpc/details/server_private_accessor.h"
#include "brpc/span.h"
#include "brpc/socket.h" // Socket
#include "brpc/rpc_dump.h" // SampledRequest
#include "brpc/http_status_code.h" // HTTP_STATUS_*
#include "brpc/socket.h" // Socket
#include "brpc/rpc_dump.h" // SampledRequest
#include "brpc/http_status_code.h" // HTTP_STATUS_*
#include "brpc/details/controller_private_accessor.h"
#include "brpc/builtin/index_service.h" // IndexService
#include "brpc/builtin/index_service.h" // IndexService
#include "brpc/policy/gzip_compress.h"
#include "brpc/policy/http2_rpc_protocol.h"
#include "brpc/details/usercode_backup_pool.h"
Expand Down Expand Up @@ -203,6 +202,9 @@ HttpContentType ParseContentType(butil::StringPiece ct, bool* is_grpc_ct) {
if (ct.starts_with("json")) {
type = HTTP_CONTENT_JSON;
ct.remove_prefix(4);
} else if (ct.starts_with("proto-json")) {
type = HTTP_CONTENT_PROTO_JSON;
ct.remove_prefix(10);
} else if (ct.starts_with("proto-text")) {
type = HTTP_CONTENT_PROTO_TEXT;
ct.remove_prefix(10);
Expand Down Expand Up @@ -271,6 +273,79 @@ static bool RemoveGrpcPrefix(butil::IOBuf* body, bool* compressed) {
return (message_length + 5 == sz);
}

static bool JsonToProtoMessage(const butil::IOBuf& body,
google::protobuf::Message* message,
Controller* cntl, int error_code) {
butil::IOBufAsZeroCopyInputStream wrapper(body);
json2pb::Json2PbOptions options;
options.base64_to_bytes = cntl->has_pb_bytes_to_base64();
options.array_to_single_repeated = cntl->has_pb_single_repeated_to_array();
std::string error;
bool ok = json2pb::JsonToProtoMessage(&wrapper, message, options, &error);
if (!ok) {
cntl->SetFailed(error_code, "Fail to parse http json body as %s: %s",
message->GetDescriptor()->full_name().c_str(),
error.c_str());
}
return ok;
}

static bool ProtoMessageToJson(const google::protobuf::Message& message,
butil::IOBufAsZeroCopyOutputStream* wrapper,
Controller* cntl, int error_code) {
json2pb::Pb2JsonOptions options;
options.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
options.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
options.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
options.single_repeated_to_array = cntl->has_pb_single_repeated_to_array();
options.enum_option = FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME;
std::string error;
bool ok = json2pb::ProtoMessageToJson(message, wrapper, options, &error);
if (!ok) {
cntl->SetFailed(error_code, "Fail to convert %s to json: %s",
message.GetDescriptor()->full_name().c_str(),
error.c_str());
}
return ok;
}

static bool ProtoJsonToProtoMessage(const butil::IOBuf& body,
google::protobuf::Message* message,
Controller* cntl, int error_code) {
json2pb::ProtoJson2PbOptions options;
options.ignore_unknown_fields = true;
butil::IOBufAsZeroCopyInputStream wrapper(body);
std::string error;
bool ok = json2pb::ProtoJsonToProtoMessage(&wrapper, message, options, &error);
if (!ok) {
cntl->SetFailed(error_code, "Fail to parse http proto-json body as %s: %s",
message->GetDescriptor()->full_name().c_str(),
error.c_str());
}
return ok;
}

static bool ProtoMessageToProtoJson(const google::protobuf::Message& message,
butil::IOBufAsZeroCopyOutputStream* wrapper,
Controller* cntl, int error_code) {
json2pb::Pb2ProtoJsonOptions options;
#if GOOGLE_PROTOBUF_VERSION >= 5026002
options.always_print_fields_with_no_presence = cntl->has_always_print_primitive_fields();
#else
options.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
#endif
options.always_print_enums_as_ints = FLAGS_pb_enum_as_number;
std::string error;
bool ok = json2pb::ProtoMessageToProtoJson(message, wrapper, options, &error);
if (!ok) {
cntl->SetFailed(error_code, "Fail to convert %s to proto-json: %s",
message.GetDescriptor()->full_name().c_str(), error.c_str());
}
return ok;
}

void ProcessHttpResponse(InputMessageBase* msg) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
Expand Down Expand Up @@ -435,8 +510,8 @@ void ProcessHttpResponse(InputMessageBase* msg) {
if (grpc_compressed) {
encoding = res_header->GetHeader(common->GRPC_ENCODING);
if (encoding == NULL) {
cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding'"
" in compressed gRPC response");
cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding' "
"in compressed gRPC response");
break;
}
}
Expand All @@ -455,23 +530,24 @@ void ProcessHttpResponse(InputMessageBase* msg) {
}
if (content_type == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(cntl->response(), res_body)) {
cntl->SetFailed(ERESPONSE, "Fail to parse content");
cntl->SetFailed(ERESPONSE, "Fail to parse content as %s",
cntl->response()->GetDescriptor()->full_name().c_str());
break;
}
} else if (content_type == HTTP_CONTENT_PROTO_TEXT) {
if (!ParsePbTextFromIOBuf(cntl->response(), res_body)) {
cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content");
cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content as %s",
cntl->response()->GetDescriptor()->full_name().c_str());
break;
}
} else if (content_type == HTTP_CONTENT_JSON) {
// message body is json
butil::IOBufAsZeroCopyInputStream wrapper(res_body);
std::string err;
json2pb::Json2PbOptions options;
options.base64_to_bytes = cntl->has_pb_bytes_to_base64();
options.array_to_single_repeated = cntl->has_pb_single_repeated_to_array();
if (!json2pb::JsonToProtoMessage(&wrapper, cntl->response(), options, &err)) {
cntl->SetFailed(ERESPONSE, "Fail to parse content, %s", err.c_str());
// Message body is json.
if (!JsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) {
break;
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
// Message body is json.
if (!ProtoJsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) {
break;
}
} else {
Expand Down Expand Up @@ -530,8 +606,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
}
} else {
bool is_grpc_ct = false;
content_type = ParseContentType(hreq.content_type(),
&is_grpc_ct);
content_type = ParseContentType(hreq.content_type(), &is_grpc_ct);
is_grpc = (is_http2 && is_grpc_ct);
}

Expand All @@ -549,21 +624,15 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
return cntl->SetFailed(EREQUEST, "Fail to print %s as proto-text",
pbreq->GetTypeName().c_str());
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
if (!ProtoMessageToProtoJson(*pbreq, &wrapper, cntl, EREQUEST)) {
cntl->request_attachment().clear();
return;
}
} else if (content_type == HTTP_CONTENT_JSON) {
std::string err;
json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
opt.single_repeated_to_array = cntl->has_pb_single_repeated_to_array();

opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME);
if (!json2pb::ProtoMessageToJson(*pbreq, &wrapper, opt, &err)) {
if (!ProtoMessageToJson(*pbreq, &wrapper, cntl, EREQUEST)) {
cntl->request_attachment().clear();
return cntl->SetFailed(
EREQUEST, "Fail to convert request to json, %s", err.c_str());
return;
}
} else {
return cntl->SetFailed(
Expand Down Expand Up @@ -819,19 +888,10 @@ HttpResponseSender::~HttpResponseSender() {
if (!google::protobuf::TextFormat::Print(*res, &wrapper)) {
cntl->SetFailed(ERESPONSE, "Fail to print %s as proto-text", res->GetTypeName().c_str());
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
ProtoMessageToProtoJson(*res, &wrapper, cntl, ERESPONSE);
} else {
std::string err;
json2pb::Pb2JsonOptions opt;
opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64();
opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array();
opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields();
opt.single_repeated_to_array = cntl->has_pb_single_repeated_to_array();
opt.enum_option = (FLAGS_pb_enum_as_number
? json2pb::OUTPUT_ENUM_BY_NUMBER
: json2pb::OUTPUT_ENUM_BY_NAME);
if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) {
cntl->SetFailed(ERESPONSE, "Fail to convert response to json, %s", err.c_str());
}
ProtoMessageToJson(*res, &wrapper, cntl, ERESPONSE);
}
}

Expand Down Expand Up @@ -1600,17 +1660,14 @@ void ProcessHttpRequest(InputMessageBase *msg) {
req->GetDescriptor()->full_name().c_str());
return;
}
} else if (content_type == HTTP_CONTENT_PROTO_JSON) {
if (!ProtoJsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
return;
}
} else {
butil::IOBufAsZeroCopyInputStream wrapper(req_body);
std::string err;
json2pb::Json2PbOptions options;
options.base64_to_bytes = mp->params.pb_bytes_to_base64;
options.array_to_single_repeated = mp->params.pb_single_repeated_to_array;
cntl->set_pb_bytes_to_base64(mp->params.pb_bytes_to_base64);
cntl->set_pb_single_repeated_to_array(mp->params.pb_single_repeated_to_array);
if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",
req->GetDescriptor()->full_name().c_str(), err.c_str());
if (!JsonToProtoMessage(req_body, req, cntl, EREQUEST)) {
return;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/brpc/policy/http_rpc_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ enum HttpContentType {
HTTP_CONTENT_JSON = 1,
HTTP_CONTENT_PROTO = 2,
HTTP_CONTENT_PROTO_TEXT = 3,
HTTP_CONTENT_PROTO_JSON = 4,
};

// Parse from the textual content type. One type may have more than one literals.
Expand Down
5 changes: 2 additions & 3 deletions src/bthread/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTable {
class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
public:
KeyTableList() :
_head(NULL), _tail(NULL), _length(0) {
}
_head(NULL), _tail(NULL), _length(0) {}

~KeyTableList() {
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
Expand Down Expand Up @@ -305,7 +304,7 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
return count;
}

inline uint32_t get_length() {
inline uint32_t get_length() const {
return _length;
}

Expand Down
2 changes: 1 addition & 1 deletion src/butil/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ bool InitializeLogFileHandle() {
#elif defined(OS_POSIX)
log_file = fopen(log_file_name->c_str(), "a");
if (log_file == NULL) {
fprintf(stderr, "Fail to fopen %s", log_file_name->c_str());
fprintf(stderr, "Fail to fopen %s: %s", log_file_name->c_str(), berror());
return false;
}
#endif
Expand Down
10 changes: 8 additions & 2 deletions src/butil/memory/singleton_on_pthread_once.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@

namespace butil {

template <typename T> class GetLeakySingleton {
template <typename T>
T* create_leaky_singleton_obj() {
return new T();
}

template <typename T>
class GetLeakySingleton {
public:
static butil::subtle::AtomicWord g_leaky_singleton_untyped;
static pthread_once_t g_create_leaky_singleton_once;
Expand All @@ -39,7 +45,7 @@ pthread_once_t GetLeakySingleton<T>::g_create_leaky_singleton_once = PTHREAD_ONC

template <typename T>
void GetLeakySingleton<T>::create_leaky_singleton() {
T* obj = new T;
T* obj = create_leaky_singleton_obj<T>();
butil::subtle::Release_Store(
&g_leaky_singleton_untyped,
reinterpret_cast<butil::subtle::AtomicWord>(obj));
Expand Down
48 changes: 41 additions & 7 deletions src/json2pb/json_to_pb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
#include "butil/strings/string_number_conversions.h"
#include "butil/third_party/rapidjson/error/error.h"
#include "butil/third_party/rapidjson/rapidjson.h"
#include "json_to_pb.h"
#include "zero_copy_stream_reader.h" // ZeroCopyStreamReader
#include "encode_decode.h"
#include "json2pb/json_to_pb.h"
#include "json2pb/zero_copy_stream_reader.h" // ZeroCopyStreamReader
#include "json2pb/encode_decode.h"
#include "json2pb/protobuf_map.h"
#include "json2pb/rapidjson.h"
#include "json2pb/protobuf_type_resolver.h"
#include "butil/base64.h"
#include "butil/string_printf.h"
#include "protobuf_map.h"
#include "rapidjson.h"

#include "butil/iobuf.h"

#ifdef __GNUC__
// Ignore -Wnonnull for `(::google::protobuf::Message*)nullptr' of J2PERROR by design.
Expand Down Expand Up @@ -712,6 +712,40 @@ bool JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream *stream,
std::string* error) {
return JsonToProtoMessage(stream, message, Json2PbOptions(), error, nullptr);
}

bool ProtoJsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json,
google::protobuf::Message* message,
const ProtoJson2PbOptions& options,
std::string* error) {
TypeResolverUniqueptr type_resolver = GetTypeResolver(*message);
butil::IOBuf buf;
butil::IOBufAsZeroCopyOutputStream output_stream(&buf);
std::string type_url = GetTypeUrl(*message);
auto st = google::protobuf::util::JsonToBinaryStream(
type_resolver.get(), type_url, json, &output_stream, options);

butil::IOBufAsZeroCopyInputStream input_stream(buf);
google::protobuf::io::CodedInputStream decoder(&input_stream);
if (!st.ok()) {
if (NULL != error) {
*error = st.ToString();
}
return false;
}

bool ok = message->ParseFromCodedStream(&decoder);
if (!ok && NULL != error) {
*error = "Fail to ParseFromCodedStream";
}
return ok;
}

bool ProtoJsonToProtoMessage(const std::string& json, google::protobuf::Message* message,
const ProtoJson2PbOptions& options, std::string* error) {
google::protobuf::io::ArrayInputStream input_stream(json.data(), json.size());
return ProtoJsonToProtoMessage(&input_stream, message, options, error);
}

} //namespace json2pb

#undef J2PERROR
Expand Down
Loading