Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ ebin/*.beam
ebin/riak_pb.app
include/*_pb.hrl
doc/*
.qc
.eqc-info
current_counterexample.eqc

# Python
riak_pb.egg-info
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ single request. The response message will typically include a boolean

RpbGetServerInfoReq -> RpbGetServerInfoResp
RpbPingReq -> RpbPingResp

RpbGetBucketReq -> RpbErrorResp | RpbGetBucketResp
RpbPutBucketReq -> RpbErrorResp | RpbPutBucketResp

### Riak KV Request/Response messages

Expand All @@ -62,7 +63,6 @@ single request. The response message will typically include a boolean
RpbDelReq -> RpbErrorResp | RpbDelResp
RpbListBucketsReq -> RpbErrorResp | RpbListBucketsResp
RpbListKeysReq -> RpbErrorResp | RpbListKeysResp{1,}
RpbGetBucketReq -> RpbErrorResp | RpbGetBucketResp
RpbMapRedReq -> RpbMapRedResp{1,}
RpbIndexReq -> RpbIndexResp

Expand Down
2 changes: 2 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", "master"}}
]}.

{eunit_opts, [verbose]}.

%% Fixes attempted removal of riak_pb directory by rebar_escripter
{escript_name, "doesnothavescript"}.
77 changes: 76 additions & 1 deletion src/riak.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/

/*
** Revision: 1.2
** Revision: 1.4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why two revisions? Is it to keep step with Riak versions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't change this for 1.3, so yes. There is a 1.3 tag, I believe.

*/

// Java package specifiers
Expand All @@ -47,3 +47,78 @@ message RpbPair {
required bytes key = 1;
optional bytes value = 2;
}


// Get bucket properties request
message RpbGetBucketReq {
required bytes bucket = 1;
}

// Get bucket properties response
message RpbGetBucketResp {
required RpbBucketProps props = 1;
}

// Set bucket properties request
message RpbSetBucketReq {
required bytes bucket = 1;
required RpbBucketProps props = 2;
}

// Set bucket properties response - no message defined, just send
// RpbSetBucketResp

// Module-Function pairs for commit hooks and other bucket properties
// that take functions
message RpbModFun {
required bytes module = 1;
required bytes function = 2;
}

// A commit hook, which may either be a modfun or a JavaScript named
// function
message RpbCommitHook {
optional RpbModFun modfun = 1;
optional bytes name = 2;
}

// Bucket properties
message RpbBucketProps {
// Declared in riak_core_app
optional uint32 n_val = 1;
optional bool allow_mult = 2;
optional bool last_write_wins = 3;
repeated RpbCommitHook precommit = 4;
repeated RpbCommitHook postcommit = 5;
optional RpbModFun chash_keyfun = 6;

// Declared in riak_kv_app
optional RpbModFun linkfun = 7;
optional uint32 old_vclock = 8;
optional uint32 young_vclock = 9;
optional uint32 big_vclock = 10;
optional uint32 small_vclock = 11;
optional uint32 pr = 12;
optional uint32 r = 13;
optional uint32 w = 14;
optional uint32 pw = 15;
optional uint32 dw = 16;
optional uint32 rw = 17;
optional bool basic_quorum = 18;
optional bool notfound_ok = 19;

// Used by riak_kv_multi_backend
optional bytes backend = 20;

// Used by riak_search bucket fixup
optional bool search = 21;

// Used by riak_repl bucket fixup
enum RpbReplMode {
off = 0;
realtime = 1;
fullsync = 2;
both = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bolth?

}
optional RpbReplMode repl = 22;
}
27 changes: 1 addition & 26 deletions src/riak_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
*/

/*
** Revision: 1.2
** Revision: 1.4
*/

// Java package specifiers
Expand Down Expand Up @@ -122,25 +122,6 @@ message RpbListKeysResp {
optional bool done = 2;
}

// Get bucket properties request
message RpbGetBucketReq {
required bytes bucket = 1;
}

// Get bucket properties response
message RpbGetBucketResp {
required RpbBucketProps props = 1;
}

// Set bucket properties request
message RpbSetBucketReq {
required bytes bucket = 1;
required RpbBucketProps props = 2;
}


// Set bucket properties response - no message defined, just send RpbSetBucketResp


// Map/Reduce request
message RpbMapRedReq {
Expand Down Expand Up @@ -199,9 +180,3 @@ message RpbLink {
optional bytes key = 2;
optional bytes tag = 3;
}

// Bucket properties
message RpbBucketProps {
optional uint32 n_val = 1;
optional bool allow_mult = 2;
}
202 changes: 201 additions & 1 deletion src/riak_pb_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,33 @@
encode_bool/1, %% riakc_pb:pbify_bool
decode_bool/1, %% riakc_pb:erlify_bool
to_binary/1, %% riakc_pb:binary
to_list/1]). %% riakc_pb:any_to_list
to_list/1, %% riakc_pb:any_to_list
encode_bucket_props/1, %% riakc_pb:pbify_rpbbucketprops
decode_bucket_props/1, %% riakc_pb:erlify_rpbbucketprops
encode_modfun/1,
decode_modfun/2,
encode_commit_hooks/1,
decode_commit_hooks/1
]).

%% @doc Bucket properties that store module/function pairs, e.g.
%% commit hooks, hash functions, link functions, will be in one of
%% these forms. More specifically:
%%
%% chash_keyfun :: {module(), function()}
%% linkfun :: {modfun, module(), function()}
%% precommit, postcommit :: [ {struct, [{binary(), binary()}]} ]
%% @end
-type modfun_property() :: {module(), function()} | {modfun, module(), function()} | {struct, [{binary(), binary()}]}.

%% @doc Fields that can be specified in a commit hook must be
%% binaries. The valid values are <<"mod">>, <<"fun">>, <<"name">>.
%% Note that "mod" and "fun" must be used together, and "name" cannot
%% be used if the other two are present.
-type commit_hook_field() :: binary().

%% @doc Bucket properties that are commit hooks have this format.
-type commit_hook_property() :: [ {struct, [{commit_hook_field(), binary()}]} ].

%% @doc Create an iolist of msg code and protocol buffer
%% message. Replaces `riakc_pb:encode/1'.
Expand Down Expand Up @@ -191,3 +217,177 @@ encode_pair({K,V}) ->
-spec decode_pair(#rpbpair{}) -> {string(), string()}.
decode_pair(#rpbpair{key = K, value = V}) ->
{K, V}.


%% @doc Convert an RpbBucketProps message to a property list
-spec decode_bucket_props(PBProps::#rpbbucketprops{} | undefined) -> [proplists:property()].
decode_bucket_props(undefined) ->
[];
decode_bucket_props(#rpbbucketprops{n_val=N,
allow_mult=AM,
last_write_wins=LWW,
precommit=Pre,
postcommit=Post,
chash_keyfun=Chash,
linkfun=Link,
old_vclock=Old,
young_vclock=Young,
big_vclock=Big,
small_vclock=Small,
pr=PR, r=R, w=W, pw=PW,
dw=DW, rw=RW,
basic_quorum=BQ,
notfound_ok=NFOK,
backend=Backend,
search=Search,
repl=Repl

}) ->
%% Extract numerical properties
[ {P,V} || {P,V} <- [ {n_val, N}, {old_vclock, Old}, {young_vclock, Young},
{big_vclock, Big}, {small_vclock, Small} ],
V /= undefined ] ++
%% Extract booleans
[ {BProp, decode_bool(Bool)} ||
{BProp, Bool} <- [{allow_mult, AM}, {last_write_wins, LWW},
{basic_quorum, BQ}, {notfound_ok, NFOK},
{search, Search}],
Bool /= undefined ] ++

%% Extract commit hooks
[ {PrePostProp, decode_commit_hooks(CList)} ||
{PrePostProp, CList} <- [{precommit, Pre}, {postcommit, Post}],
CList /= [] ] ++

%% Extract modfuns
[ {MFProp, decode_modfun(MF, MFProp)} || {MFProp, MF} <- [{chash_keyfun, Chash},
{linkfun, Link}],
MF /= undefined ] ++

%% Extract backend
[ {backend, Backend} || is_binary(Backend) ] ++

%% Extract quora
[ {QProp, riak_pb_kv_codec:decode_quorum(Q)} ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other point of inter-dependance between riak_pb_codec and riak_pb_kv_codec? Seems like the calls should only go one way. I imagine you considered moving riak_pb_kv_codec:decode_quorum/1 here, curious to hear why you decided not to.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the thorny thing about going the simpler route (in terms of message definitions) that I did. In an ideal world, riak_pb_kv_codec would handle its own bucket properties and decoding, and riak_pb_search_codec would do the same, u.s.w. However, that design would really require a whole bunch of machinery to manage who-decodes-what and creating message extensions and things that seemed overkill just for the framing layer. Perhaps there's a design that avoids this explicit call that I'm not seeing, but I just wanted to keep it simple.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I should note that encode/decode_quorum are used in get/put/delete messages in the KV section.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with that, just thought it worth the question.

{QProp, Q} <- [{pr, PR}, {r, R}, {w, W}, {pw, PW}, {dw, DW}, {rw, RW}],
Q /= undefined ] ++

%% Extract repl prop
[ {repl, Repl} || Repl /= undefined ].


%% @doc Convert a property list to an RpbBucketProps message
-spec encode_bucket_props([proplists:property()]) -> PBProps::#rpbbucketprops{}.
encode_bucket_props(Props) ->
encode_bucket_props(Props, #rpbbucketprops{}).

%% @doc Convert a property list to an RpbBucketProps message
%% @private
-spec encode_bucket_props([proplists:property()], PBPropsIn::#rpbbucketprops{}) -> PBPropsOut::#rpbbucketprops{}.
encode_bucket_props([], Pb) ->
Pb;
encode_bucket_props([{n_val, Nval} | Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{n_val = Nval});
encode_bucket_props([{allow_mult, Flag} | Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{allow_mult = encode_bool(Flag)});
encode_bucket_props([{last_write_wins, LWW}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{last_write_wins = encode_bool(LWW)});
encode_bucket_props([{precommit, Precommit}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{precommit = encode_commit_hooks(Precommit)});
encode_bucket_props([{postcommit, Postcommit}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{postcommit = encode_commit_hooks(Postcommit)});
encode_bucket_props([{chash_keyfun, ModFun}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{chash_keyfun = encode_modfun(ModFun)});
encode_bucket_props([{linkfun, ModFun}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{linkfun = encode_modfun(ModFun)});
encode_bucket_props([{old_vclock, Num}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{old_vclock = Num});
encode_bucket_props([{young_vclock, Num}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{young_vclock = Num});
encode_bucket_props([{big_vclock, Num}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{big_vclock = Num});
encode_bucket_props([{small_vclock, Num}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{small_vclock = Num});
encode_bucket_props([{pr, Q}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{pr = riak_pb_kv_codec:encode_quorum(Q)});
encode_bucket_props([{r, Q}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{r = riak_pb_kv_codec:encode_quorum(Q)});
encode_bucket_props([{w, Q}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{w = riak_pb_kv_codec:encode_quorum(Q)});
encode_bucket_props([{pw, Q}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{pw = riak_pb_kv_codec:encode_quorum(Q)});
encode_bucket_props([{dw, Q}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{dw = riak_pb_kv_codec:encode_quorum(Q)});
encode_bucket_props([{rw, Q}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{rw = riak_pb_kv_codec:encode_quorum(Q)});
encode_bucket_props([{basic_quorum, BQ}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{basic_quorum = encode_bool(BQ)});
encode_bucket_props([{notfound_ok, NFOK}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{notfound_ok = encode_bool(NFOK)});
encode_bucket_props([{backend, B}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{backend = to_binary(B)});
encode_bucket_props([{search, S}|Rest], Pb) ->
encode_bucket_props(Rest, Pb#rpbbucketprops{search = encode_bool(S)});
encode_bucket_props([{repl, Atom}|Rest], Pb) ->
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, one oversight here is that true and false are valid for the repl property, and are equivalent to both and off respectively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird. So in this case that should be captured by a comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure. If we coerce them into both/off, the roundtrip codec will not be equivalent. There's no way in PB enums to say "this value is the same as this other one", I think you get a compilation error.

encode_bucket_props(Rest, Pb#rpbbucketprops{repl = Atom});
encode_bucket_props([_Ignore|Rest], Pb) ->
%% Ignore any properties not explicitly part of the PB message
encode_bucket_props(Rest, Pb).

%% @doc Converts a module-function specification into a RpbModFun message.
-spec encode_modfun(modfun_property()) -> #rpbmodfun{}.
encode_modfun({struct, Props}) ->
{<<"mod">>, Mod} = lists:keyfind(<<"mod">>, 1, Props),
{<<"fun">>, Fun} = lists:keyfind(<<"fun">>, 1, Props),
encode_modfun({Mod, Fun});
encode_modfun({modfun, M, F}) ->
encode_modfun({M, F});
encode_modfun({M, F}) ->
#rpbmodfun{module=to_binary(M), function=to_binary(F)}.

%% @doc Converts an RpbModFun message into the appropriate format for
%% the given property.
-spec decode_modfun(#rpbmodfun{}, atom()) -> modfun_property().
decode_modfun(MF, linkfun) ->
{M,F} = decode_modfun(MF, undefined),
{modfun, M, F};
decode_modfun(#rpbmodfun{module=Mod, function=Fun}, commit_hook) ->
{struct, [{<<"mod">>, Mod}, {<<"fun">>, Fun}]};
decode_modfun(#rpbmodfun{module=Mod, function=Fun}=MF, _Prop) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running the riak-erlang-client I get the following error

https://gist.github.com/russelldb/3fcbf5089cc473184e62

And the code in that gist 'fixes' it (is there another way, yours looks like it should work?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the catch is wrong, it should just be exit:badarg.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, i'm still wrong, i can just make it the _:_ catch.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once more, with reading comprehension, the correct match in this case is error:badarg.

try
{binary_to_existing_atom(Mod, latin1), binary_to_existing_atom(Fun, latin1)}
catch
error:badarg ->
error_logger:warning_msg("Creating new atoms from protobuffs message! ~p", [MF]),
{binary_to_atom(Mod, latin1), binary_to_atom(Fun, latin1)}
end.

%% @doc Converts a list of commit hooks into a list of RpbCommitHook
%% messages.
-spec encode_commit_hooks([commit_hook_property()]) -> [ #rpbcommithook{} ].
encode_commit_hooks(Hooks) ->
[ encode_commit_hook(Hook) || Hook <- Hooks ].

encode_commit_hook({struct, Props}=Hook) ->
FoundProps = [ lists:keymember(Field, 1, Props) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Field <- [<<"mod">>, <<"fun">>, <<"name">>]],
case FoundProps of
[true, true, _] ->
#rpbcommithook{modfun=encode_modfun(Hook)};
[false, false, true] ->
{<<"name">>, Name} = lists:keyfind(<<"name">>, 1, Props),
#rpbcommithook{name=to_binary(Name)};
_ ->
erlang:error(badarg, [Hook])
end.

%% @doc Converts a list of RpbCommitHook messages into commit hooks.
-spec decode_commit_hooks([ #rpbcommithook{} ]) -> [ commit_hook_property() ].
decode_commit_hooks(Hooks) ->
[ decode_commit_hook(Hook) || Hook <- Hooks,
Hook =/= #rpbcommithook{modfun=undefined, name=undefined} ].

decode_commit_hook(#rpbcommithook{modfun = Modfun}) when Modfun =/= undefined ->
decode_modfun(Modfun, commit_hook);
decode_commit_hook(#rpbcommithook{name = Name}) when Name =/= undefined ->
{struct, [{<<"name">>, Name}]}.
Loading