From 44167e7c2752798041212a37aa58512af09bf60f Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 3 May 2013 23:59:21 -0500 Subject: [PATCH 1/3] Intermediate refactoring: isolate Mod:put() and Mod:get() --- src/riak_kv_vnode.erl | 132 +++++++++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 53 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 0725b850ff..7c685f9c0e 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -480,7 +480,7 @@ handle_command({hashtree_pid, Node}, _, State=#state{hashtrees=HT}) -> {reply, {error, wrong_node}, State} end; handle_command({rehash, Bucket, Key}, _, State=#state{mod=Mod, modstate=ModState}) -> - case do_get_binary({Bucket, Key}, Mod, ModState) of + case do_get_binary(Bucket, Key, Mod, ModState) of {ok, Bin, _UpdModState} -> update_hashtree(Bucket, Key, Bin, State); _ -> @@ -937,7 +937,7 @@ prepare_put(#state{idx=Idx, prunetime=PruneTime}, IndexBackend) -> GetReply = - case Mod:get(Bucket, Key, ModState) of + case do_get_object(Bucket, Key, Mod, ModState) of {error, not_found, _UpdModState} -> ok; % NOTE: bad_crc is NOT an official backend response. It is @@ -947,8 +947,8 @@ prepare_put(#state{idx=Idx, {error, bad_crc, _UpdModState} -> lager:info("Bad CRC detected while reading Partition=~p, Bucket=~p, Key=~p", [Idx, Bucket, Key]), ok; - {ok, GetVal, _UpdModState} -> - {ok, GetVal} + {ok, TheOldObj, _UpdModState} -> + {ok, TheOldObj} end, case GetReply of ok -> @@ -965,8 +965,7 @@ prepare_put(#state{idx=Idx, RObj end, {{true, ObjToStore}, PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}}; - {ok, Val} -> - OldObj = object_from_binary(Bucket, Key, Val), + {ok, OldObj} -> case put_merge(Coord, LWW, OldObj, RObj, VId, StartTime) of {oldobj, OldObj1} -> {{false, OldObj1}, PutArgs}; @@ -1014,32 +1013,29 @@ perform_put({true, Obj}, bkey={Bucket, Key}, reqid=ReqID, index_specs=IndexSpecs}) -> - ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0), - Val = riak_object:to_binary(ObjFmt, Obj), - case Mod:put(Bucket, Key, IndexSpecs, Val, ModState) of - {ok, UpdModState} -> - update_hashtree(Bucket, Key, Val, State), + case encode_and_Mod_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) of + {{ok, UpdModState}, EncodedVal} -> + update_hashtree(Bucket, Key, EncodedVal, State), case RB of true -> Reply = {dw, Idx, Obj, ReqID}; false -> Reply = {dw, Idx, ReqID} end; - {error, _Reason, UpdModState} -> + {{error, _Reason, UpdModState}, _EncodedVal} -> Reply = {fail, Idx, ReqID} end, {Reply, State#state{modstate=UpdModState}}. do_reformat({Bucket, Key}=BKey, State=#state{mod=Mod, modstate=ModState}) -> - case Mod:get(Bucket, Key, ModState) of + case do_get_object(Bucket, Key, Mod, ModState) of {error, not_found, _UpdModState} -> Reply = {error, not_found}, UpdState = State; - {ok, ObjBin, _UpdModState} -> + {ok, RObj, _UpdModState} -> %% since it is assumed capabilities have been properly set %% to the desired version, to reformat, all we need to do %% is submit a new write - RObj = riak_object:from_binary(Bucket, Key, ObjBin), PutArgs = #putargs{returnbody=false, bkey=BKey, reqid=undefined, @@ -1118,10 +1114,10 @@ do_get(_Sender, BKey, ReqID, {reply, {r, Retval, Idx, ReqID}, State}. %% @private -do_get_term(BKey, Mod, ModState) -> - case do_get_binary(BKey, Mod, ModState) of - {ok, Bin, _UpdModState} -> - {ok, object_from_binary(BKey, Bin)}; +do_get_term({Bucket, Key}, Mod, ModState) -> + case do_get_object(Bucket, Key, Mod, ModState) of + {ok, Obj, _UpdModState} -> + {ok, Obj}; %% @TODO Eventually it would be good to %% make the use of not_found or notfound %% consistent throughout the code. @@ -1133,8 +1129,31 @@ do_get_term(BKey, Mod, ModState) -> Err end. -do_get_binary({Bucket, Key}, Mod, ModState) -> - Mod:get(Bucket, Key, ModState). +do_get_binary(Bucket, Key, Mod, ModState) -> + case mod_capability_uses_r_object(Mod, ModState, Bucket) of + true -> + Mod:get_object(Bucket, Key, true, ModState); + false -> + Mod:get(Bucket, Key, ModState) + end. + +do_get_object(Bucket, Key, Mod, ModState) -> + case mod_capability_uses_r_object(Mod, ModState, Bucket) of + true -> + Mod:get_object(Bucket, Key, false, ModState); + false -> + case do_get_binary(Bucket, Key, Mod, ModState) of + {ok, ObjBin, _UpdModState} -> + case riak_object:from_binary(Bucket, Key, ObjBin) of + {error, R} -> + throw(R); + RObj -> + {ok, RObj, _UpdModState} + end; + Else -> + Else + end + end. %% @private %% @doc This is a generic function for operations that involve @@ -1289,9 +1308,9 @@ do_get_vclocks(KeyList,_State=#state{mod=Mod,modstate=ModState}) -> [{BKey, do_get_vclock(BKey,Mod,ModState)} || BKey <- KeyList]. %% @private do_get_vclock({Bucket, Key}, Mod, ModState) -> - case Mod:get(Bucket, Key, ModState) of + case do_get_object(Bucket, Key, Mod, ModState) of {error, not_found, _UpdModState} -> vclock:fresh(); - {ok, Val, _UpdModState} -> riak_object:vclock(object_from_binary(Bucket, Key, Val)) + {ok, Obj, _UpdModState} -> riak_object:vclock(Obj) end. %% @private @@ -1303,7 +1322,7 @@ do_diffobj_put({Bucket, Key}, DiffObj, StartTS = os:timestamp(), {ok, Capabilities} = Mod:capabilities(Bucket, ModState), IndexBackend = lists:member(indexes, Capabilities), - case Mod:get(Bucket, Key, ModState) of + case do_get_object(Bucket, Key, Mod, ModState) of {error, not_found, _UpdModState} -> case IndexBackend of true -> @@ -1311,19 +1330,17 @@ do_diffobj_put({Bucket, Key}, DiffObj, false -> IndexSpecs = [] end, - ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0), - Val = riak_object:to_binary(ObjFmt, DiffObj), - Res = Mod:put(Bucket, Key, IndexSpecs, Val, ModState), - case Res of - {ok, _UpdModState} -> - update_hashtree(Bucket, Key, Val, StateData), + case encode_and_Mod_put(DiffObj, Mod, Bucket, Key, + IndexSpecs, ModState) of + {{ok, _UpdModState} = InnerRes, EncodedVal} -> + update_hashtree(Bucket, Key, EncodedVal, StateData), update_index_write_stats(IndexBackend, IndexSpecs), - update_vnode_stats(vnode_put, Idx, StartTS); - _ -> nop - end, - Res; - {ok, Val0, _UpdModState} -> - OldObj = object_from_binary(Bucket, Key, Val0), + update_vnode_stats(vnode_put, Idx, StartTS), + InnerRes; + {InnerRes, _Val} -> + InnerRes + end; + {ok, OldObj, _UpdModState} -> %% Merge handoff values with the current - possibly discarding %% if out of date. Ok to set VId/Starttime undefined as %% they are not used for non-coordinating puts. @@ -1338,18 +1355,16 @@ do_diffobj_put({Bucket, Key}, DiffObj, false -> IndexSpecs = [] end, - ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0), - Val = riak_object:to_binary(ObjFmt, AMObj), - Res = Mod:put(Bucket, Key, IndexSpecs, Val, ModState), - case Res of - {ok, _UpdModState} -> - update_hashtree(Bucket, Key, Val, StateData), + case encode_and_Mod_put(AMObj, Mod, Bucket, Key, + IndexSpecs, ModState) of + {{ok, _UpdModState} = InnerRes, EncodedVal} -> + update_hashtree(Bucket, Key, EncodedVal, StateData), update_index_write_stats(IndexBackend, IndexSpecs), - update_vnode_stats(vnode_put, Idx, StartTS); - _ -> - nop - end, - Res + update_vnode_stats(vnode_put, Idx, StartTS), + InnerRes; + {InnerRes, _EncodedVal} -> + InnerRes + end end end. @@ -1526,14 +1541,25 @@ object_info({Bucket, _Key}=BKey) -> Hash = riak_core_util:chash_key(BKey), {Bucket, Hash}. -object_from_binary({B,K}, ValBin) -> - object_from_binary(B, K, ValBin). -object_from_binary(B, K, ValBin) -> - case riak_object:from_binary(B, K, ValBin) of - {error, R} -> throw(R); - Obj -> Obj +-spec encode_and_Mod_put( + Obj::riak_object:object(), Mod::term(), Bucket::riak_object:bucket(), + Key::riak_object:key(), IndexSpecs::list(), ModState::term()) -> + {{ok, UpdModState::term()}, EncodedObj::binary()} | + {{error, Reason::term(), UpdModState::term()}, EncodedObj::binary()}. + +encode_and_Mod_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) -> + case mod_capability_uses_r_object(Mod, ModState, Bucket) of + true -> + Mod:put_object(Bucket, Key, IndexSpecs, Obj, ModState); + false -> + ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0), + EncodedVal = riak_object:to_binary(ObjFmt, Obj), + {Mod:put(Bucket, Key, IndexSpecs, EncodedVal, ModState), EncodedVal} end. +mod_capability_uses_r_object(Mod, ModState, Bucket) -> + {ok, Capabilities} = Mod:capabilities(Bucket, ModState), + lists:member(uses_r_object, Capabilities). -ifdef(TEST). From a23dec87b5296ff2df6e93c3d0df40c4fb1113ef Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 15 May 2013 09:08:25 -0500 Subject: [PATCH 2/3] QuickCheck-related changes for fs2 backend Also, add backend-specific bucket & key name generators and fix type spec typo --- rebar.config | 4 +- src/riak_kv_vnode.erl | 2 +- test/backend_eqc.erl | 97 +++++++++++++++++++++++++++++++++---------- 3 files changed, 80 insertions(+), 23 deletions(-) diff --git a/rebar.config b/rebar.config index 58cfc828b6..a310f81498 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,8 @@ {cover_enabled, true}. {edoc_opts, [{preprocess, true}]}. -{erl_opts, [warnings_as_errors, {parse_transform, lager_transform}]}. +{erl_opts, [warnings_as_errors, + {parse_transform, lager_transform}, + {d, 'TEST_FS2_BACKEND_IN_RIAK_KV'}]}. {eunit_opts, [verbose]}. {erl_first_files, [ diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 7c685f9c0e..c715dbe53e 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1542,7 +1542,7 @@ object_info({Bucket, _Key}=BKey) -> {Bucket, Hash}. -spec encode_and_Mod_put( - Obj::riak_object:object(), Mod::term(), Bucket::riak_object:bucket(), + Obj::riak_object:riak_object(), Mod::term(), Bucket::riak_object:bucket(), Key::riak_object:key(), IndexSpecs::list(), ModState::term()) -> {{ok, UpdModState::term()}, EncodedObj::binary()} | {{error, Reason::term(), UpdModState::term()}, EncodedObj::binary()}. diff --git a/test/backend_eqc.erl b/test/backend_eqc.erl index 6f9099cac4..c2fb838de6 100644 --- a/test/backend_eqc.erl +++ b/test/backend_eqc.erl @@ -34,7 +34,12 @@ test/2, test/3, test/4, - test/5]). + test/5, + property/1, + property/2, + property/3, + property/4, + property/5]). %% eqc_fsm callbacks -export([initial_state/0, @@ -71,21 +76,40 @@ %% ==================================================================== test(Backend) -> - test(Backend, false). + test2(property(Backend, false)). test(Backend, Volatile) -> - test(Backend, Volatile, []). + test2(property(Backend, Volatile, [])). test(Backend, Volatile, Config) -> - test(Backend, Volatile, Config, fun(BeState,_Olds) -> - catch(Backend:stop(BeState)) end). + test2(property(Backend, Volatile, Config, + fun(BeState,_Olds) -> catch(Backend:stop(BeState)) end)). test(Backend, Volatile, Config, Cleanup) -> - test(Backend, Volatile, Config, Cleanup, ?TEST_ITERATIONS). + test2(property(Backend, Volatile, Config, Cleanup, ?TEST_ITERATIONS)). test(Backend, Volatile, Config, Cleanup, NumTests) -> - eqc:quickcheck(eqc:numtests(NumTests, - prop_backend(Backend, Volatile, Config, Cleanup))). + test2(property(Backend, Volatile, Config, Cleanup, NumTests)). + +test2(Prop) -> + eqc:quickcheck(Prop). + +property(Backend) -> + property(Backend, false). + +property(Backend, Volatile) -> + property(Backend, Volatile, []). + +property(Backend, Volatile, Config) -> + property(Backend, Volatile, Config, fun(BeState,_Olds) -> + catch(Backend:stop(BeState)) end). + +property(Backend, Volatile, Config, Cleanup) -> + property(Backend, Volatile, Config, Cleanup, ?TEST_ITERATIONS). + +property(Backend, Volatile, Config, Cleanup, NumTests) -> + eqc:numtests(NumTests, + prop_backend(Backend, Volatile, Config, Cleanup)). %% ==================================================================== %% eqc property @@ -122,10 +146,24 @@ prop_backend(Backend, Volatile, Config, Cleanup) -> %%==================================================================== bucket() -> - elements([<<"b1">>,<<"b2">>,<<"b3">>,<<"b4">>]). + elements([<<"b1">>,<<"b2">>]). + +bucket(#qcst{backend=Backend}) -> + try + Backend:backend_eqc_bucket() + catch error:undef -> + bucket() + end. key() -> - elements([<<"k1">>,<<"k2">>,<<"k3">>,<<"k4">>]). + elements([<<"k1">>,<<"k2">>]). + +key(#qcst{backend=Backend}) -> + try + Backend:backend_eqc_key() + catch error:undef -> + key() + end. val() -> %% The creation of the riak object and the call @@ -140,8 +178,11 @@ val() -> g_opts() -> frequency([{5, [async_fold]}, {2, []}]). -fold_keys_opts() -> - frequency([{5, [async_fold]}, {2, []}, {2, [{index, bucket(), index_query()}]}, {2, [{bucket, bucket()}]}]). +fold_keys_opts(Q) -> + frequency([{5, [async_fold]}, + {2, []}, + {2, [{index, bucket(Q), index_query(Q)}]}, + {2, [{bucket, bucket(Q)}]}]). index_specs() -> ?LET(L, list(index_spec()), lists:usort(L)). @@ -154,9 +195,9 @@ index_spec() -> {remove, int_index(), int_posting()} ]). -index_query() -> +index_query(Q) -> oneof([ - {eq, <<"$bucket">>, bucket()}, %% the bucket() in this query is ignored/transformed + {eq, <<"$bucket">>, bucket(Q)}, %% the bucket() in this query is ignored/transformed range_query(<<"$key">>, key(), key()), {eq, <<"$key">>, key()}, eq_query(), @@ -349,7 +390,15 @@ next_state_data(_From, _To, S, _R, {call, _M, put, [Bucket, Key, IndexSpecs, Val S#qcst{d = orddict:store({Bucket, Key}, Val, S#qcst.d), i = update_indexes(Bucket, Key, IndexSpecs, S#qcst.i)}; next_state_data(_From, _To, S, _R, {call, _M, delete, [Bucket, Key|_]}) -> - S#qcst{d = orddict:erase({Bucket, Key}, S#qcst.d), + D1 = orddict:erase({Bucket, Key}, S#qcst.d), + D2 = try + Backend = S#qcst.backend, + BE_c = S#qcst.c, + Backend:backend_eqc_filter_orddict_on_delete(Bucket, Key, D1, BE_c) + catch error:undef -> + D1 + end, + S#qcst{d = D2, i = remove_indexes(Bucket, Key, S#qcst.i)}; next_state_data(_From, _To, S, _R, {call, ?MODULE, drop, _}) -> @@ -367,13 +416,13 @@ stopped(#qcst{backend=Backend, running(#qcst{backend=Backend, s=State, - i=Indexes}) -> + i=Indexes}=Q) -> [ - {history, {call, Backend, put, [bucket(), key(), index_specs(), val(), State]}}, - {history, {call, Backend, get, [bucket(), key(), State]}}, - {history, {call, ?MODULE, delete, [bucket(), key(), Backend, State, Indexes]}}, + {history, {call, Backend, put, [bucket(Q), key(Q), index_specs(), val(), State]}}, + {history, {call, Backend, get, [bucket(Q), key(Q), State]}}, + {history, {call, ?MODULE, delete, [bucket(Q), key(Q), Backend, State, Indexes]}}, {history, {call, Backend, fold_buckets, [fold_buckets_fun(), get_fold_buffer(), g_opts(), State]}}, - {history, {call, Backend, fold_keys, [fold_keys_fun(), get_fold_buffer(), fold_keys_opts(), State]}}, + {history, {call, Backend, fold_keys, [fold_keys_fun(), get_fold_buffer(), fold_keys_opts(Q), State]}}, {history, {call, Backend, fold_objects, [fold_objects_fun(), get_fold_buffer(), g_opts(), State]}}, {history, {call, Backend, is_empty, [State]}}, {stopped, {call, ?MODULE, drop, [Backend, State]}}, @@ -545,7 +594,13 @@ postcondition(_From, _To, S, {ok, Buffer} -> finish_fold(Buffer, From) end, - R = receive_fold_results([]), + R0 = receive_fold_results([]), + R = try + (S#qcst.backend):backend_eqc_fold_objects_transform(R0) + catch + error:undef -> + R0 + end, lists:sort(Objects) =:= lists:sort(R); postcondition(_From, _To, S,{call, _M, is_empty, [_BeState]}, R) -> R =:= (orddict:size(S#qcst.d) =:= 0); From 15fb639b5865a95cdb142928f7d090140e834085 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 17 May 2013 17:52:11 -0500 Subject: [PATCH 3/3] Address Jordan's review comments --- src/riak_kv_vnode.erl | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index c715dbe53e..387bdceb67 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1013,7 +1013,7 @@ perform_put({true, Obj}, bkey={Bucket, Key}, reqid=ReqID, index_specs=IndexSpecs}) -> - case encode_and_Mod_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) of + case encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) of {{ok, UpdModState}, EncodedVal} -> update_hashtree(Bucket, Key, EncodedVal, State), case RB of @@ -1130,7 +1130,7 @@ do_get_term({Bucket, Key}, Mod, ModState) -> end. do_get_binary(Bucket, Key, Mod, ModState) -> - case mod_capability_uses_r_object(Mod, ModState, Bucket) of + case uses_r_object(Mod, ModState, Bucket) of true -> Mod:get_object(Bucket, Key, true, ModState); false -> @@ -1138,7 +1138,7 @@ do_get_binary(Bucket, Key, Mod, ModState) -> end. do_get_object(Bucket, Key, Mod, ModState) -> - case mod_capability_uses_r_object(Mod, ModState, Bucket) of + case uses_r_object(Mod, ModState, Bucket) of true -> Mod:get_object(Bucket, Key, false, ModState); false -> @@ -1330,8 +1330,8 @@ do_diffobj_put({Bucket, Key}, DiffObj, false -> IndexSpecs = [] end, - case encode_and_Mod_put(DiffObj, Mod, Bucket, Key, - IndexSpecs, ModState) of + case encode_and_put(DiffObj, Mod, Bucket, Key, + IndexSpecs, ModState) of {{ok, _UpdModState} = InnerRes, EncodedVal} -> update_hashtree(Bucket, Key, EncodedVal, StateData), update_index_write_stats(IndexBackend, IndexSpecs), @@ -1355,8 +1355,8 @@ do_diffobj_put({Bucket, Key}, DiffObj, false -> IndexSpecs = [] end, - case encode_and_Mod_put(AMObj, Mod, Bucket, Key, - IndexSpecs, ModState) of + case encode_and_put(AMObj, Mod, Bucket, Key, + IndexSpecs, ModState) of {{ok, _UpdModState} = InnerRes, EncodedVal} -> update_hashtree(Bucket, Key, EncodedVal, StateData), update_index_write_stats(IndexBackend, IndexSpecs), @@ -1541,14 +1541,14 @@ object_info({Bucket, _Key}=BKey) -> Hash = riak_core_util:chash_key(BKey), {Bucket, Hash}. --spec encode_and_Mod_put( - Obj::riak_object:riak_object(), Mod::term(), Bucket::riak_object:bucket(), - Key::riak_object:key(), IndexSpecs::list(), ModState::term()) -> +-spec encode_and_put( + Obj::riak_object:riak_object(), Mod::term(), Bucket::riak_object:bucket(), + Key::riak_object:key(), IndexSpecs::list(), ModState::term()) -> {{ok, UpdModState::term()}, EncodedObj::binary()} | {{error, Reason::term(), UpdModState::term()}, EncodedObj::binary()}. -encode_and_Mod_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) -> - case mod_capability_uses_r_object(Mod, ModState, Bucket) of +encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) -> + case uses_r_object(Mod, ModState, Bucket) of true -> Mod:put_object(Bucket, Key, IndexSpecs, Obj, ModState); false -> @@ -1557,7 +1557,7 @@ encode_and_Mod_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) -> {Mod:put(Bucket, Key, IndexSpecs, EncodedVal, ModState), EncodedVal} end. -mod_capability_uses_r_object(Mod, ModState, Bucket) -> +uses_r_object(Mod, ModState, Bucket) -> {ok, Capabilities} = Mod:capabilities(Bucket, ModState), lists:member(uses_r_object, Capabilities).