Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/http_c++/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);

if (argc != 2) {
LOG(ERROR) << "Usage: ./http_client \"www.foo.com\"";
LOG(ERROR) << "Usage: ./http_client \"http(s)://www.foo.com\"";
return -1;
}
char* url = argv[1];
Expand Down
26 changes: 26 additions & 0 deletions example/multi_threaded_echo_c++/cert.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-----BEGIN CERTIFICATE-----
MIIEUTCCAzmgAwIBAgIBADANBgkqhkiG9w0BAQQFADB9MQswCQYDVQQGEwJDTjER
MA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5naGFpMQ4wDAYDVQQKEwVC
YWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQxHDAaBgkqhkiG9w0BCQEW
DXNhdEBiYWlkdS5jb20wHhcNMTUwNzE2MDMxOTUxWhcNMTgwNTA1MDMxOTUxWjB9
MQswCQYDVQQGEwJDTjERMA8GA1UECBMIU2hhbmdoYWkxETAPBgNVBAcTCFNoYW5n
aGFpMQ4wDAYDVQQKEwVCYWlkdTEMMAoGA1UECxMDSU5GMQwwCgYDVQQDEwNTQVQx
HDAaBgkqhkiG9w0BCQEWDXNhdEBiYWlkdS5jb20wggEiMA0GCSqGSIb3DQEBAQUA
A4IBDwAwggEKAoIBAQCqdyAeHY39tqY1RYVbfpqZjZlJDtZb04znxjgQrX+mKmLb
mwvXgJojlfn2Qcgp4NKYFqDFb9tU/Gbb436dRvkHyWOz0RPMspR0TTRU1NIY8wRy
0A1LOCgLHsbRJHqktGjylejALdgsspFWyDY9bEfb4oWsnKGzJqcvIDXrPmMOOY4o
pbA9SufSzwRZN7Yzc5jAedpaF9SK78RQXtvV0+JfCUwBsBWPKevRFFUrN7rQBYjP
cgV/HgDuquPrqnESVSYyfEBKZba6cmNb+xzO3cB1brPTtobSXh+0o/0CtRA+2m63
ODexxCLntgkPm42IYCJLM15xTatcfVX/3LHQ31DrAgMBAAGjgdswgdgwHQYDVR0O
BBYEFGcd7lA//bSAoSC/NbWRx/H+O1zpMIGoBgNVHSMEgaAwgZ2AFGcd7lA//bSA
oSC/NbWRx/H+O1zpoYGBpH8wfTELMAkGA1UEBhMCQ04xETAPBgNVBAgTCFNoYW5n
aGFpMREwDwYDVQQHEwhTaGFuZ2hhaTEOMAwGA1UEChMFQmFpZHUxDDAKBgNVBAsT
A0lORjEMMAoGA1UEAxMDU0FUMRwwGgYJKoZIhvcNAQkBFg1zYXRAYmFpZHUuY29t
ggEAMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEEBQADggEBAKfoCn8SpLk3uQyT
X+oygcRWfTeJtN3D5J69NCMJ7wB+QPfpEBPwiqMgdbp4bRJ98H7x5UQsHT+EDOT/
9OmipomHInFY4W1ew11zNKwuENeRrnZwTcCiVLZsxZsAU41ZeI5Yq+2WdtxnePCR
VL1/NjKOq+WoRdb2nLSNDWgYMkLRVlt32hyzryyrBbmaxUl8BxnPqUiWduMwsZUz
HNpXkoa1xTSd+En1SHYWfMg8BOVuV0I0/fjUUG9AXVqYpuogfbjAvibVNWAmxOfo
fOjCPCGoJC1ET3AxYkgXGwioobz0pK/13k2pV+wu7W4g+6iTfz+hwZbPsUk2a/5I
f6vXFB0=
-----END CERTIFICATE-----
2 changes: 2 additions & 0 deletions example/multi_threaded_echo_c++/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
DEFINE_bool(enable_ssl, false, "Use SSL connection");
DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
DEFINE_string(http_content_type, "application/json", "Content type of http request");

Expand Down Expand Up @@ -94,6 +95,7 @@ int main(int argc, char* argv[]) {

// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.ssl_options.enable = FLAGS_enable_ssl;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100);
Expand Down
27 changes: 27 additions & 0 deletions example/multi_threaded_echo_c++/key.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAqncgHh2N/bamNUWFW36amY2ZSQ7WW9OM58Y4EK1/pipi25sL
14CaI5X59kHIKeDSmBagxW/bVPxm2+N+nUb5B8ljs9ETzLKUdE00VNTSGPMEctAN
SzgoCx7G0SR6pLRo8pXowC3YLLKRVsg2PWxH2+KFrJyhsyanLyA16z5jDjmOKKWw
PUrn0s8EWTe2M3OYwHnaWhfUiu/EUF7b1dPiXwlMAbAVjynr0RRVKze60AWIz3IF
fx4A7qrj66pxElUmMnxASmW2unJjW/sczt3AdW6z07aG0l4ftKP9ArUQPtputzg3
scQi57YJD5uNiGAiSzNecU2rXH1V/9yx0N9Q6wIDAQABAoIBADN3khflnnhKzDXr
To9IU08nRG+dbjT9U16rJ0RJze+SfpSFZHblWiSCZJzoUZHrUkofEt1pn1QyfK/J
KPI9enTSZirlZk/4XwAaS0GNm/1yahZsIIdkZhqtaSO+GtVdrw4HGuXjMZCVPXJx
MocrCSsnYmqyQ9P+SJ3e4Mis5mVllwDiUVlnTIamSSt16qkPdamLSJrxvI4LirQK
9MZWNLoDFpRU1MJxQ/QzrEC3ONTq4j++AfbGzYTmDDtLeM8OSH5o72YXZ2JkaA4c
xCzHFT+NaJYxF7esn/ctzGg50LYl8IF2UQtzOkX2l3l/OktIB1w+jGV6ONb1EWx5
4zkkzNkCgYEA2EXj7GMsyNE3OYdMw8zrqQKUMON2CNnD+mBseGlr22/bhXtzpqK8
uNel8WF1ezOnVvNsU8pml/W/mKUu6KQt5JfaDzen3OKjzTABVlbJxwFhPvwAeaIA
q/tmSKyqiCgOMbR7Cq4UEwGf2A9/RII4JEC0/aipRU5srF65OYPUOJcCgYEAycco
DFVG6jUw9w68t/X4f7NT4IYP96hSAqLUPuVz2fWwXKLWEX8JiMI+Ue3PbMz6mPcs
4vMu364u4R3IuzrrI+PRK9iTa/pahBP6eF6ZpbY1ObI8CVLTrqUS9p22rr9lBm8V
EZA9hwcHLYt+PWzaKcsFpbP4+AeY7nBBbL9CAM0CgYAzuJsmeB1ItUgIuQOxu7sM
AzLfcjZTLYkBwreOIGAL7XdJN9nTmw2ZAvGLhWwsF5FIaRSaAUiBxOKaJb7PIhxb
k7kxdHTvjT/xHS7ksAK3VewkvO18KTMR7iBq9ugdgb7LQkc+qZzhYr0QVbxw7Ndy
TAs8sm4wxe2VV13ilFVXZwKBgDfU6ZnwBr1Llo7l/wYQA4CiSDU6IzTt2DNuhrgY
mWPX/cLEM+OHeUXkKYZV/S0n0rd8vWjWzUOLWOFlcmOMPAAkS36MYM5h6aXeOVIR
KwaVUkjyrnYN+xC6EHM41JGp1/RdzECd3sh8A1pw3K92bS9fQ+LD18IZqBFh8lh6
23KJAoGAe48SwAsaGvqRO61Taww/Wf+YpGc9lnVbCvNFGScYaycPMqaRBUBmz/U3
QQgpQY8T7JIECbA8sf78SlAZ9x93r0UQ70RekV3WzKAQHfHK8nqTjd3T0+i4aySO
yQpYYCgE24zYO6rQgwrhzI0S4rWe7izDDlg0RmLtQh7Xw+rlkAQ=
-----END RSA PRIVATE KEY-----
2 changes: 2 additions & 0 deletions example/multi_threaded_echo_c++/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ int main(int argc, char* argv[]) {

// Start the server.
brpc::ServerOptions options;
options.ssl_options.default_cert.certificate = "cert.pem";
options.ssl_options.default_cert.private_key = "key.pem";
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;
options.internal_port = FLAGS_internal_port;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/acceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef BRPC_ACCEPTOR_H
#define BRPC_ACCEPTOR_H

#include "bthread/bthread.h" // bthread_t
#include "butil/synchronization/condition_variable.h"
#include "butil/containers/flat_map.h"
#include "brpc/input_messenger.h"
Expand Down
25 changes: 20 additions & 5 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
#include <inttypes.h>
#include <google/protobuf/descriptor.h>
#include <gflags/gflags.h>
#include "butil/time.h" // milliseconds_from_now
#include "butil/time.h" // milliseconds_from_now
#include "butil/logging.h"
#include "bthread/unstable.h" // bthread_timer_add
#include "brpc/socket_map.h" // SocketMapInsert
#include "brpc/socket_map.h" // SocketMapInsert
#include "brpc/compress.h"
#include "brpc/global.h"
#include "brpc/span.h"
#include "brpc/details/load_balancer_with_naming.h"
#include "brpc/controller.h"
#include "brpc/channel.h"
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/policy/esp_authenticator.h"


Expand Down Expand Up @@ -62,7 +62,9 @@ Channel::Channel(ProfilerLinker)

Channel::~Channel() {
if (_server_id != (SocketId)-1) {
SocketMapRemove(_server_address);
SocketMapRemove(SocketMapKey(_server_address,
_options.ssl_options,
_options.auth));
}
}

Expand Down Expand Up @@ -121,6 +123,15 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
if (_options.auth == NULL) {
_options.auth = policy::global_esp_authenticator();
}
} else if (_options.protocol == brpc::PROTOCOL_HTTP) {
if (_raw_server_address.compare(0, 5, "https") == 0) {
_options.ssl_options.enable = true;
if (_options.ssl_options.sni_name.empty()) {
int port;
ParseHostAndPortFromURL(_raw_server_address.c_str(),
&_options.ssl_options.sni_name, &port);
}
}
}

return 0;
Expand Down Expand Up @@ -152,6 +163,7 @@ int Channel::Init(const char* server_addr_and_port,
return -1;
}
}
_raw_server_address.assign(server_addr_and_port);
return Init(point, options);
}

Expand All @@ -174,6 +186,7 @@ int Channel::Init(const char* server_addr, int port,
return -1;
}
}
_raw_server_address.assign(server_addr);
return Init(point, options);
}

Expand All @@ -189,7 +202,9 @@ int Channel::Init(butil::EndPoint server_addr_and_port,
return -1;
}
_server_address = server_addr_and_port;
if (SocketMapInsert(server_addr_and_port, &_server_id) != 0) {
if (SocketMapInsert(SocketMapKey(server_addr_and_port,
_options.ssl_options,
_options.auth), &_server_id) != 0) {
LOG(ERROR) << "Fail to insert into SocketMap";
return -1;
}
Expand Down
5 changes: 5 additions & 0 deletions src/brpc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <ostream> // std::ostream
#include "bthread/errno.h" // Redefine errno
#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr
#include "brpc/ssl_option.h" // ChannelSSLOptions
#include "brpc/channel_base.h" // ChannelBase
#include "brpc/adaptive_protocol_type.h" // AdaptiveProtocolType
#include "brpc/adaptive_connection_type.h" // AdaptiveConnectionType
Expand Down Expand Up @@ -87,6 +88,9 @@ struct ChannelOptions {
// Print a log when above situation happens.
// Default: true.
bool log_succeed_without_server;

// SSL related options. Refer to `ChannelSSLOptions' for details
ChannelSSLOptions ssl_options;

// Turn on authentication for this channel if `auth' is not NULL.
// Note `auth' will not be deleted by channel and must remain valid when
Expand Down Expand Up @@ -185,6 +189,7 @@ friend class SelectiveChannel;

int InitChannelOptions(const ChannelOptions* options);

std::string _raw_server_address;
butil::EndPoint _server_address;
SocketId _server_id;
Protocol::SerializeRequest _serialize_request;
Expand Down
6 changes: 6 additions & 0 deletions src/brpc/controller.cpp
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ BAIDU_REGISTER_ERRNO(brpc::ERTMPPUBLISHABLE, "RtmpRetryingClientStream is publis
BAIDU_REGISTER_ERRNO(brpc::ERTMPCREATESTREAM, "createStream was rejected by the RTMP server");
BAIDU_REGISTER_ERRNO(brpc::EEOF, "Got EOF");
BAIDU_REGISTER_ERRNO(brpc::EUNUSED, "The socket was not needed");
BAIDU_REGISTER_ERRNO(brpc::ESSL, "SSL related operation failed");

BAIDU_REGISTER_ERRNO(brpc::EINTERNAL, "General internal error");
BAIDU_REGISTER_ERRNO(brpc::ERESPONSE, "Bad response");
Expand Down Expand Up @@ -1369,6 +1370,11 @@ bool Controller::is_ssl() const {
return s ? (s->ssl_state() == SSL_CONNECTED) : false;
}

x509_st* Controller::get_peer_certificate() const {
Socket* s = _current_call.sending_sock.get();
return s ? s->GetPeerCertificate() : NULL;
}

#if defined(OS_MACOSX)
typedef sig_t SignalHandler;
#else
Expand Down
13 changes: 10 additions & 3 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
#define EAUTH ERPCAUTH
#endif

extern "C" {
struct x509_st;
}

namespace brpc {
class Span;
class Server;
Expand Down Expand Up @@ -306,6 +310,12 @@ friend void policy::ProcessMongoRequest(InputMessageBase*);
// Returns the authenticated result. NULL if there is no authentication
const AuthContext* auth_context() const { return _auth_context; }

// Whether the underlying channel is using SSL
bool is_ssl() const;

// Get the peer certificate, which can be printed by ostream
x509_st* get_peer_certificate() const;

// Mutable header of http response.
HttpHeader& http_response() {
if (_http_response == NULL) {
Expand Down Expand Up @@ -380,9 +390,6 @@ friend void policy::ProcessMongoRequest(InputMessageBase*);
// Protocol of the request sent by client or received by server.
ProtocolType request_protocol() const { return _request_protocol; }

// Whether the underlying channel is using SSL
bool is_ssl() const;

// Resets the Controller to its initial state so that it may be reused in
// a new call. Must NOT be called while an RPC is in progress.
void Reset() { InternalReset(false); }
Expand Down
13 changes: 9 additions & 4 deletions src/brpc/details/naming_service_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ NamingServiceThread::Actions::~Actions() {
// Remove all sockets from SocketMap
for (std::vector<ServerNode>::const_iterator it = _last_servers.begin();
it != _last_servers.end(); ++it) {
SocketMapRemove(it->addr);
SocketMapRemove(SocketMapKey(it->addr));
}
EndWait(0);
}
Expand Down Expand Up @@ -107,15 +107,18 @@ void NamingServiceThread::Actions::ResetServers(
for (size_t i = 0; i < _added.size(); ++i) {
ServerNodeWithId tagged_id;
tagged_id.node = _added[i];
CHECK_EQ(SocketMapInsert(_added[i].addr, &tagged_id.id), 0);
// TODO: For each unique SocketMapKey (i.e. SSL settings), insert a new
// Socket. SocketMapKey may be passed through AddWatcher. Make sure
// to pick those Sockets with the right settings during OnAddedServers
CHECK_EQ(SocketMapInsert(SocketMapKey(_added[i].addr), &tagged_id.id), 0);
_added_sockets.push_back(tagged_id);
}

_removed_sockets.clear();
for (size_t i = 0; i < _removed.size(); ++i) {
ServerNodeWithId tagged_id;
tagged_id.node = _removed[i];
CHECK_EQ(0, SocketMapFind(_removed[i].addr, &tagged_id.id));
CHECK_EQ(0, SocketMapFind(SocketMapKey(_removed[i].addr), &tagged_id.id));
_removed_sockets.push_back(tagged_id);
}

Expand Down Expand Up @@ -164,7 +167,9 @@ void NamingServiceThread::Actions::ResetServers(
}

for (size_t i = 0; i < _removed.size(); ++i) {
SocketMapRemove(_removed[i].addr);
// TODO: Remove all Sockets that have the same address in SocketMapKey.peer
// We may need another data structure to avoid linear cost
SocketMapRemove(SocketMapKey(_removed[i].addr));
}

if (!_removed.empty() || !_added.empty()) {
Expand Down
Loading