Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions example/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,14 @@ cc_binary(
"//:brpc",
],
)

cc_binary(
name = "redis_c++_server",
srcs = [
"redis_c++/redis_server.cpp",
],
copts = COPTS,
deps = [
"//:brpc",
],
)
114 changes: 102 additions & 12 deletions example/redis_c++/redis_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,54 @@

DEFINE_int32(port, 6379, "TCP Port of this server");

class AuthSession : public brpc::Destroyable {
public:
explicit AuthSession(const std::string& user_name, const std::string& password)
: _user_name(user_name), _password(password) {}

void Destroy() override {
delete this;
}

const std::string _user_name;
const std::string _password;
};

class RedisServiceImpl : public brpc::RedisService {
public:
bool Set(const std::string& key, const std::string& value) {
RedisServiceImpl() {
_user_password["db1"] = "123456";
_user_password["db2"] = "123456";
_db_map["db1"].resize(kHashSlotNum);
_db_map["db2"].resize(kHashSlotNum);
}

bool Set(const std::string& db_name, const std::string& key, const std::string& value) {
Comment thread
lintanghui marked this conversation as resolved.
int slot = butil::crc32c::Value(key.c_str(), key.size()) % kHashSlotNum;
_mutex[slot].lock();
_map[slot][key] = value;
auto& kv = _db_map[db_name];
kv[slot][key] = value;
_mutex[slot].unlock();
return true;
}

bool Get(const std::string& key, std::string* value) {
bool Auth(const std::string& db_name, const std::string& password) {
if (_user_password.find(db_name) == _user_password.end()) {
return false;
} else {
if (_user_password[db_name] != password) {
return false;
}
}
return true;
}

bool Get(const std::string& db_name, const std::string& key, std::string* value) {
int slot = butil::crc32c::Value(key.c_str(), key.size()) % kHashSlotNum;
_mutex[slot].lock();
auto it = _map[slot].find(key);
if (it == _map[slot].end()) {
auto& kv = _db_map[db_name];
auto it = kv[slot].find(key);
if (it == kv[slot].end()) {
_mutex[slot].unlock();
return false;
}
Expand All @@ -56,7 +89,9 @@ class RedisServiceImpl : public brpc::RedisService {

private:
const static int kHashSlotNum = 32;
std::unordered_map<std::string, std::string> _map[kHashSlotNum];
typedef std::unordered_map<std::string, std::string> KVStore;
std::unordered_map<std::string, std::vector<KVStore>> _db_map;
std::unordered_map<std::string, std::string> _user_password;
butil::Mutex _mutex[kHashSlotNum];
};

Expand All @@ -65,16 +100,27 @@ class GetCommandHandler : public brpc::RedisCommandHandler {
explicit GetCommandHandler(RedisServiceImpl* rsimpl)
: _rsimpl(rsimpl) {}

brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
brpc::RedisCommandHandlerResult Run(brpc::RedisConnContext* ctx,
const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool /*flush_batched*/) override {

AuthSession* session = static_cast<AuthSession*>(ctx->get_session());
if (session == nullptr) {
output->FormatError("No auth session");
return brpc::REDIS_CMD_HANDLED;
}
if (session->_user_name.empty()) {
output->FormatError("No user name");
return brpc::REDIS_CMD_HANDLED;
}
if (args.size() != 2ul) {
output->FormatError("Expect 1 arg for 'get', actually %lu", args.size()-1);
return brpc::REDIS_CMD_HANDLED;
}
const std::string key(args[1].data(), args[1].size());
std::string value;
if (_rsimpl->Get(key, &value)) {
if (_rsimpl->Get(session->_user_name, key, &value)) {
output->SetString(value);
} else {
output->SetNullString();
Expand All @@ -91,32 +137,76 @@ class SetCommandHandler : public brpc::RedisCommandHandler {
explicit SetCommandHandler(RedisServiceImpl* rsimpl)
: _rsimpl(rsimpl) {}

brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
brpc::RedisCommandHandlerResult Run(brpc::RedisConnContext* ctx,
const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool /*flush_batched*/) override {
AuthSession* session = static_cast<AuthSession*>(ctx->get_session());
if (session == nullptr) {
output->FormatError("No auth session");
return brpc::REDIS_CMD_HANDLED;
}
if (session->_user_name.empty()) {
output->FormatError("No user name");
return brpc::REDIS_CMD_HANDLED;
}
if (args.size() != 3ul) {
output->FormatError("Expect 2 args for 'set', actually %lu", args.size()-1);
return brpc::REDIS_CMD_HANDLED;
}
const std::string key(args[1].data(), args[1].size());
const std::string value(args[2].data(), args[2].size());
_rsimpl->Set(key, value);
_rsimpl->Set(session->_user_name, key, value);
output->SetStatus("OK");
return brpc::REDIS_CMD_HANDLED;
}

private:
RedisServiceImpl* _rsimpl;
RedisServiceImpl* _rsimpl;
};



class AuthCommandHandler : public brpc::RedisCommandHandler {
public:
explicit AuthCommandHandler(RedisServiceImpl* rsimpl)
: _rsimpl(rsimpl) {}
brpc::RedisCommandHandlerResult Run(brpc::RedisConnContext* ctx,
const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool /*flush_batched*/) override {
if (args.size() != 3ul) {
output->FormatError("Expect 2 args for 'auth', actually %lu", args.size()-1);
Comment thread
lintanghui marked this conversation as resolved.
return brpc::REDIS_CMD_HANDLED;
}

const std::string db_name(args[1].data(), args[1].size());
const std::string password(args[2].data(), args[2].size());

if (_rsimpl->Auth(db_name, password)) {
output->SetStatus("OK");
auto auth_session = new AuthSession(db_name, password);
ctx->reset_session(auth_session);
} else {
output->FormatError("Invalid password for database '%s'", db_name.c_str());
}
return brpc::REDIS_CMD_HANDLED;
}

private:
RedisServiceImpl* _rsimpl;
};

int main(int argc, char* argv[]) {
google::ParseCommandLineFlags(&argc, &argv, true);
RedisServiceImpl *rsimpl = new RedisServiceImpl;
auto get_handler =std::unique_ptr<GetCommandHandler>(new GetCommandHandler(rsimpl));
auto set_handler =std::unique_ptr<SetCommandHandler>( new SetCommandHandler(rsimpl));
auto auth_handler = std::unique_ptr<AuthCommandHandler>(new AuthCommandHandler(rsimpl));
rsimpl->AddCommandHandler("get", get_handler.get());
rsimpl->AddCommandHandler("set", set_handler.get());

rsimpl->AddCommandHandler("auth", auth_handler.get());

brpc::Server server;
brpc::ServerOptions server_options;
server_options.redis_service = rsimpl;
Expand Down
34 changes: 2 additions & 32 deletions src/brpc/policy/redis_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,6 @@ struct InputResponse : public InputMessageBase {
}
};

// This class is as parsing_context in socket.
class RedisConnContext : public Destroyable {
public:
explicit RedisConnContext(const RedisService* rs)
: redis_service(rs)
, batched_size(0) {}

~RedisConnContext();
// @Destroyable
void Destroy() override;

const RedisService* redis_service;
// If user starts a transaction, transaction_handler indicates the
// handler pointer that runs the transaction command.
std::unique_ptr<RedisCommandHandler> transaction_handler;
// >0 if command handler is run in batched mode.
int batched_size;

RedisCommandParser parser;
butil::Arena arena;
};

int ConsumeCommand(RedisConnContext* ctx,
const std::vector<butil::StringPiece>& args,
Expand All @@ -83,7 +62,7 @@ int ConsumeCommand(RedisConnContext* ctx,
RedisReply output(&ctx->arena);
RedisCommandHandlerResult result = REDIS_CMD_HANDLED;
if (ctx->transaction_handler) {
result = ctx->transaction_handler->Run(args, &output, flush_batched);
result = ctx->transaction_handler->Run(ctx, args, &output, flush_batched);
if (result == REDIS_CMD_HANDLED) {
ctx->transaction_handler.reset(NULL);
} else if (result == REDIS_CMD_BATCHED) {
Expand All @@ -97,7 +76,7 @@ int ConsumeCommand(RedisConnContext* ctx,
snprintf(buf, sizeof(buf), "ERR unknown command `%s`", args[0].as_string().c_str());
output.SetError(buf);
} else {
result = ch->Run(args, &output, flush_batched);
result = ch->Run(ctx, args, &output, flush_batched);
if (result == REDIS_CMD_CONTINUE) {
if (ctx->batched_size != 0) {
LOG(ERROR) << "CONTINUE should not be returned in a batched process.";
Expand Down Expand Up @@ -134,15 +113,6 @@ int ConsumeCommand(RedisConnContext* ctx,
return 0;
}

// ========== impl of RedisConnContext ==========

RedisConnContext::~RedisConnContext() { }

void RedisConnContext::Destroy() {
delete this;
}

// ========== impl of RedisConnContext ==========

ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
bool read_eof, const void* arg) {
Expand Down
17 changes: 17 additions & 0 deletions src/brpc/redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,21 @@ RedisCommandHandler* RedisCommandHandler::NewTransactionHandler() {
return NULL;
}

// ========== impl of RedisConnContext ==========
RedisConnContext::~RedisConnContext() { }

void RedisConnContext::Destroy() {
if (session) {
session->Destroy();
}
delete this;
}

void RedisConnContext::reset_session(Destroyable* s){
if (session) {
session->Destroy();
}
session = s;
}

} // namespace brpc
46 changes: 44 additions & 2 deletions src/brpc/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

#include <unordered_map>

#include "brpc/destroyable.h"
#include "brpc/nonreflectable_message.h"
#include "brpc/parse_result.h"
#include "brpc/redis_command.h"
#include "brpc/pb_compat.h"
#include "brpc/redis_reply.h"
#include "butil/arena.h"
Expand Down Expand Up @@ -210,6 +212,39 @@ enum RedisCommandHandlerResult {
REDIS_CMD_BATCHED = 2,
};

class RedisCommandParser;
Comment thread
lintanghui marked this conversation as resolved.

// This class is as parsing_context in socket.
class RedisConnContext : public Destroyable {
public:
explicit RedisConnContext(const RedisService* rs)
: redis_service(rs)
, batched_size(0)
, session(nullptr) {}

~RedisConnContext();
// @Destroyable
void Destroy() override;
void reset_session(Destroyable* s);

Destroyable* get_session() { return session; }

const RedisService* redis_service;
// If user starts a transaction, transaction_handler indicates the
// handler pointer that runs the transaction command.
std::unique_ptr<RedisCommandHandler> transaction_handler;
// >0 if command handler is run in batched mode.
int batched_size;

RedisCommandParser parser;
butil::Arena arena;

private:
// If user is authenticated, session is set.
// Keep auth session info in RedisConnContext to distinguish diffrent users( or diffrent db).
Destroyable* session;
};

// The Command handler for a redis request. User should impletement Run().
class RedisCommandHandler {
public:
Expand All @@ -235,8 +270,15 @@ class RedisCommandHandler {
// it returns REDIS_CMD_HANDLED. Read the comment below.
virtual RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool flush_batched) = 0;

bool flush_batched) {
return REDIS_CMD_HANDLED;
};
virtual RedisCommandHandlerResult Run(RedisConnContext* ctx,
const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool flush_batched) {
return Run(args, output, flush_batched);
}
// The Run() returns CONTINUE for "multi", which makes brpc call this method to
// create a transaction_handler to process following commands until transaction_handler
// returns OK. For example, for command "multi; set k1 v1; set k2 v2; set k3 v3;
Expand Down
1 change: 1 addition & 0 deletions test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ cc_test(
],
)


cc_test(
name = "bvar_test",
srcs = glob(
Expand Down
Loading