Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ int Socket::OnCreated(const SocketOptions& options) {
}
// Must be the last one! Internal fields of this Socket may be accessed
// just after calling ResetFileDescriptor.
if (ResetFileDescriptor(options.fd) != 0) {
if (ResetFileDescriptor(fd) != 0) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to ResetFileDescriptor";
SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
Expand Down
17 changes: 14 additions & 3 deletions test/brpc_builtin_service_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -841,8 +841,10 @@ void* dummy_bthread(void*) {


#ifdef BRPC_BTHREAD_TRACER
bool g_bthread_trace_start = false;
bool g_bthread_trace_stop = false;
void* bthread_trace(void*) {
g_bthread_trace_start = true;
while (!g_bthread_trace_stop) {
bthread_usleep(1000 * 100);
}
Expand Down Expand Up @@ -883,9 +885,13 @@ TEST_F(BuiltinServiceTest, bthreads) {
}

#ifdef BRPC_BTHREAD_TRACER
{
bool ok = false;
for (int i = 0; i < 10; ++i) {
bthread_t th;
EXPECT_EQ(0, bthread_start_background(&th, NULL, bthread_trace, NULL));
while (!g_bthread_trace_start) {
bthread_usleep(1000 * 10);
}
ClosureChecker done;
brpc::Controller cntl;
std::string id_string;
Expand All @@ -895,9 +901,14 @@ TEST_F(BuiltinServiceTest, bthreads) {
service.default_method(&cntl, &req, &res, &done);
g_bthread_trace_stop = true;
EXPECT_FALSE(cntl.Failed());
CheckContent(cntl, "stop=0");
CheckContent(cntl, "bthread_trace");
const std::string& content = cntl.response_attachment().to_string();
ok = content.find("stop=0") != std::string::npos &&
content.find("bthread_trace") != std::string::npos;
if (ok) {
break;
}
}
ASSERT_TRUE(ok);
#endif // BRPC_BTHREAD_TRACER
}

Expand Down
76 changes: 24 additions & 52 deletions test/brpc_redis_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -814,10 +814,9 @@ std::unordered_map<std::string, int64_t> int_map;

class RedisServiceImpl : public brpc::RedisService {
public:
RedisServiceImpl()
RedisServiceImpl(std::string password)
: _batch_count(0)
, _user("user1")
, _password("password1") {}
, _password(std::move(password)) {}

brpc::RedisCommandHandlerResult OnBatched(const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output, bool flush_batched) {
Expand Down Expand Up @@ -867,21 +866,19 @@ class RedisServiceImpl : public brpc::RedisService {

std::vector<std::vector<std::string> > _batched_command;
int _batch_count;
std::string _user;
std::string _password;
};


class AuthSession : public brpc::Destroyable {
public:
explicit AuthSession(const std::string& user_name, const std::string& password)
: _user_name(user_name), _password(password) {}
explicit AuthSession(std::string password)
: _password(std::move(password)) {}

void Destroy() override {
delete this;
}

const std::string _user_name;
const std::string _password;
};

Expand All @@ -894,17 +891,16 @@ class AuthCommandHandler : public brpc::RedisCommandHandler {
const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool flush_batched) {
if (args.size() < 2) {
if (args.size() < 1) {
output->SetError("ERR wrong number of arguments for 'AUTH' command");
return brpc::REDIS_CMD_HANDLED;
}
const std::string user(args[1].data(), args[1].size());
const std::string password(args[2].data(), args[2].size());
if (_rs->_user != user || _rs->_password != password) {
const std::string password(args[1].data(), args[1].size());
if (_rs->_password != password) {
output->SetError("ERR invalid username/password");
return brpc::REDIS_CMD_HANDLED;
}
auto auth_session = new AuthSession(user, password);
auto auth_session = new AuthSession(password);
ctx->reset_session(auth_session);
output->SetStatus("OK");
return brpc::REDIS_CMD_HANDLED;
Expand All @@ -929,7 +925,7 @@ class SetCommandHandler : public brpc::RedisCommandHandler {
return brpc::REDIS_CMD_HANDLED;
}
AuthSession* session = static_cast<AuthSession*>(ctx->session);
if (!session || (session->_password != _rs->_password) || (session->_user_name != _rs->_user)) {
if (!session || (session->_password != _rs->_password)) {
output->SetError("ERR no auth");
return brpc::REDIS_CMD_HANDLED;
}
Expand Down Expand Up @@ -971,7 +967,7 @@ class GetCommandHandler : public brpc::RedisCommandHandler {
return brpc::REDIS_CMD_HANDLED;
}
AuthSession* session = static_cast<AuthSession*>(ctx->session);
if (!session || (session->_password != _rs->_password) || (session->_user_name != _rs->_user)) {
if (session->_password != _rs->_password) {
output->SetError("ERR no auth");
return brpc::REDIS_CMD_HANDLED;
}
Expand Down Expand Up @@ -1015,7 +1011,7 @@ class IncrCommandHandler : public brpc::RedisCommandHandler {
return brpc::REDIS_CMD_HANDLED;
}
AuthSession* session = static_cast<AuthSession*>(ctx->session);
if (!session || (session->_password != _rs->_password) || (session->_user_name != _rs->_user)) {
if (session->_password != _rs->_password) {
output->SetError("ERR no auth");
return brpc::REDIS_CMD_HANDLED;
}
Expand All @@ -1036,9 +1032,10 @@ class IncrCommandHandler : public brpc::RedisCommandHandler {
};

TEST_F(RedisTest, server_sanity) {
std::string password = GeneratePassword();
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
GetCommandHandler *gh = new GetCommandHandler(rsimpl);
SetCommandHandler *sh = new SetCommandHandler(rsimpl);
AuthCommandHandler *ah = new AuthCommandHandler(rsimpl);
Expand All @@ -1053,21 +1050,13 @@ TEST_F(RedisTest, server_sanity) {

brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
options.auth = new brpc::policy::RedisAuthenticator(password);
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));

brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("auth user1 password1"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_STREQ("OK", response.reply(0).c_str());
request.Clear();
response.Clear();
cntl.Reset();
ASSERT_TRUE(request.AddCommand("get hello"));
ASSERT_TRUE(request.AddCommand("get hello2"));
ASSERT_TRUE(request.AddCommand("set key1 value1"));
Expand Down Expand Up @@ -1122,13 +1111,6 @@ TEST_F(RedisTest, server_sanity) {

void* incr_thread(void* arg) {
brpc::Channel* c = static_cast<brpc::Channel*>(arg);
// do auth
brpc::RedisRequest auth_req;
brpc::RedisResponse auth_resp;
brpc::Controller auth_cntl;
EXPECT_TRUE(auth_req.AddCommand("auth user1 password1"));
c->CallMethod(NULL, &auth_cntl, &auth_req, &auth_resp, NULL);
EXPECT_FALSE(auth_cntl.Failed()) << auth_cntl.ErrorText();
for (int i = 0; i < 5000; ++i) {
brpc::RedisRequest request;
brpc::RedisResponse response;
Expand All @@ -1137,16 +1119,17 @@ void* incr_thread(void* arg) {
c->CallMethod(NULL, &cntl, &request, &response, NULL);
EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
EXPECT_EQ(1, response.reply_size());
EXPECT_TRUE(response.reply(0).is_integer());
EXPECT_TRUE(response.reply(0).is_integer()) << response.reply(0);
}
return NULL;
}

TEST_F(RedisTest, server_concurrency) {
std::string password = GeneratePassword();
int N = 10;
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
AuthCommandHandler *ah = new AuthCommandHandler(rsimpl);
IncrCommandHandler *ih = new IncrCommandHandler(rsimpl);
rsimpl->AddCommandHandler("incr", ih);
Expand All @@ -1158,6 +1141,7 @@ TEST_F(RedisTest, server_concurrency) {
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
options.connection_type = "pooled";
options.auth = new brpc::policy::RedisAuthenticator(password);
std::vector<bthread_t> bths;
std::vector<brpc::Channel*> channels;
for (int i = 0; i < N; ++i) {
Expand Down Expand Up @@ -1228,9 +1212,10 @@ class MultiCommandHandler : public brpc::RedisCommandHandler {
};

TEST_F(RedisTest, server_command_continue) {
std::string password = GeneratePassword();
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
rsimpl->AddCommandHandler("auth", new AuthCommandHandler(rsimpl));
rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl));
rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl));
Expand All @@ -1242,16 +1227,9 @@ TEST_F(RedisTest, server_command_continue) {

brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
options.auth = new brpc::policy::RedisAuthenticator(password);
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));
// do auth
brpc::RedisRequest auth_req;
brpc::RedisResponse auth_resp;
brpc::Controller auth_cntl;
ASSERT_TRUE(auth_req.AddCommand("auth user1 password1"));
channel.CallMethod(NULL, &auth_cntl, &auth_req, &auth_resp, NULL);
ASSERT_FALSE(auth_cntl.Failed()) << auth_cntl.ErrorText();

{
brpc::RedisRequest request;
brpc::RedisResponse response;
Expand Down Expand Up @@ -1311,9 +1289,10 @@ TEST_F(RedisTest, server_command_continue) {
}

TEST_F(RedisTest, server_handle_pipeline) {
std::string password = GeneratePassword();
brpc::Server server;
brpc::ServerOptions server_options;
RedisServiceImpl* rsimpl = new RedisServiceImpl;
RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
GetCommandHandler* getch = new GetCommandHandler(rsimpl, true);
SetCommandHandler* setch = new SetCommandHandler(rsimpl, true);
AuthCommandHandler* authch = new AuthCommandHandler(rsimpl);
Expand All @@ -1327,20 +1306,13 @@ TEST_F(RedisTest, server_handle_pipeline) {

brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
options.auth = new brpc::policy::RedisAuthenticator(password);
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options));

brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
ASSERT_TRUE(request.AddCommand("auth user1 password1"));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_STREQ("OK", response.reply(0).c_str());
request.Clear();
response.Clear();
cntl.Reset();
ASSERT_TRUE(request.AddCommand("set key1 v1"));
ASSERT_TRUE(request.AddCommand("set key2 v2"));
ASSERT_TRUE(request.AddCommand("set key3 v3"));
Expand Down
Loading