Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,13 @@ delete(Bucket,Key,Options,Timeout) when is_list(Options) ->
ReqId = mk_reqid(),
riak_kv_delete_sup:start_delete(Node, [ReqId, Bucket, Key, Options, Timeout,
Me, ClientId]),
wait_for_reqid(ReqId, Timeout);
RTimeout = recv_timeout(Options),
lager:info("timeouts r ~p t ~p > ~p",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be at debug level and probably have a more useful message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meant to remove.

[RTimeout, Timeout, RTimeout > Timeout]),
case RTimeout > Timeout of
true -> wait_for_reqid(ReqId, Timeout);
false -> wait_for_reqid(ReqId, RTimeout)
end;
delete(Bucket,Key,RW,Timeout) ->
delete(Bucket,Key,[{rw, RW}], Timeout).

Expand Down Expand Up @@ -295,7 +301,11 @@ delete_vclock(Bucket,Key,VClock,Options,Timeout) when is_list(Options) ->
ReqId = mk_reqid(),
riak_kv_delete_sup:start_delete(Node, [ReqId, Bucket, Key, Options, Timeout,
Me, ClientId, VClock]),
wait_for_reqid(ReqId, Timeout);
RTimeout = recv_timeout(Options),
case RTimeout > Timeout of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not:

wait_for_reqid(ReqId, erlang:max(Timeout, RTimeout))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

erlang:stdlib() > evan:brain()

true -> wait_for_reqid(ReqId, Timeout);
false -> wait_for_reqid(ReqId, RTimeout)
end;
delete_vclock(Bucket,Key,VClock,RW,Timeout) ->
delete_vclock(Bucket,Key,VClock,[{rw, RW}],Timeout).

Expand Down
20 changes: 10 additions & 10 deletions src/riak_kv_pb_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@
req_ctx, % context to go along with request (partial results, request ids etc)
client_id = <<0,0,0,0>> }). % emulate legacy API when vnode_vclocks is true

-define(DEFAULT_TIMEOUT, 60000).

%% @doc init/0 callback. Returns the service internal start
%% state.
-spec init() -> any().
Expand Down Expand Up @@ -102,12 +100,14 @@ process(#rpbsetclientidreq{client_id = ClientId}, State) ->

process(#rpbgetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk,
basic_quorum=BQ, if_modified=VClock,
head=Head, deletedvclock=DeletedVClock}, #state{client=C} = State) ->
head=Head, deletedvclock=DeletedVClock,
timeout=Timeout}, #state{client=C} = State) ->
R = decode_quorum(R0),
PR = decode_quorum(PR0),
case C:get(B, K, make_option(deletedvclock, DeletedVClock) ++
make_option(r, R) ++
make_option(pr, PR) ++
make_option(timeout, Timeout) ++
make_option(notfound_ok, NFOk) ++
make_option(basic_quorum, BQ)) of
{ok, O} ->
Expand Down Expand Up @@ -166,7 +166,7 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC,

process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
w=W0, dw=DW0, pw=PW0, return_body=ReturnBody,
return_head=ReturnHead},
return_head=ReturnHead, timeout=Timeout},
#state{client=C} = State) ->

case K of
Expand Down Expand Up @@ -196,7 +196,8 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
end
end,
case C:put(O, make_option(w, W) ++ make_option(dw, DW) ++
make_option(pw, PW) ++ [{timeout, default_timeout()} | Options]) of
make_option(pw, PW) ++ make_option(timeout, Timeout) ++
Options) of
ok when is_binary(ReturnKey) ->
PutResp = #rpbputresp{key = ReturnKey},
{reply, PutResp, State};
Expand Down Expand Up @@ -226,7 +227,8 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
end;

process(#rpbdelreq{bucket=B, key=K, vclock=PbVc,
r=R0, w=W0, pr=PR0, pw=PW0, dw=DW0, rw=RW0},
r=R0, w=W0, pr=PR0, pw=PW0, dw=DW0, rw=RW0,
timeout=Timeout},
#state{client=C} = State) ->
W = decode_quorum(W0),
PW = decode_quorum(PW0),
Expand All @@ -240,7 +242,8 @@ process(#rpbdelreq{bucket=B, key=K, vclock=PbVc,
make_option(rw, RW) ++
make_option(pr, PR) ++
make_option(pw, PW) ++
make_option(dw, DW),
make_option(dw, DW) ++
make_option(timeout, Timeout),
Result = case PbVc of
undefined ->
C:delete(B, K, Options);
Expand Down Expand Up @@ -298,6 +301,3 @@ erlify_rpbvc(PbVc) ->
%% Convert a vector clock to protocol buffers
pbify_rpbvc(Vc) ->
zlib:zip(term_to_binary(Vc)).

default_timeout() ->
?DEFAULT_TIMEOUT.
123 changes: 78 additions & 45 deletions src/riak_kv_wm_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,6 @@ init(Props) ->
%% bindings from the dispatch, as well as any vtag
%% query parameter.
service_available(RD, Ctx=#ctx{riak=RiakProps}) ->
Timeout = case wrq:get_req_header(?HEAD_TIMEOUT, RD) of
undefined -> ?DEFAULT_TIMEOUT;
TimeoutStr ->
try
list_to_integer(TimeoutStr)
catch
_:_ ->
lager:error("Bad timeout value ~p, "
"using ~d~n",
[TimeoutStr, ?DEFAULT_TIMEOUT]),
?DEFAULT_TIMEOUT
end
end,

case riak_kv_wm_utils:get_riak_client(RiakProps, riak_kv_wm_utils:get_client_id(RD)) of
{ok, C} ->
{true,
Expand All @@ -197,8 +183,7 @@ service_available(RD, Ctx=#ctx{riak=RiakProps}) ->
undefined -> undefined;
K -> list_to_binary(riak_kv_wm_utils:maybe_decode_uri(RD, K))
end,
vtag=wrq:get_qs_value(?Q_VTAG, RD),
timeout=Timeout
vtag=wrq:get_qs_value(?Q_VTAG, RD)
}};
Error ->
{false,
Expand Down Expand Up @@ -245,29 +230,66 @@ malformed_request(RD, Ctx) when Ctx#ctx.method =:= 'POST'
undefined ->
{true, missing_content_type(RD), Ctx};
_ ->
case malformed_rw_params(RD, Ctx) of
Result={true, _, _} ->
case malformed_timeout_param(RD, Ctx) of
Result={true, _, _} ->
Result;
{false, RWRD, RWCtx} ->
case malformed_link_headers(RWRD, RWCtx) of
Result = {true, _, _} ->
{false, ToRD, ToCtx} ->
case malformed_rw_params(ToRD, ToCtx) of
Result={true, _, _} ->
Result;
{false, RWLH, LHCtx} ->
malformed_index_headers(RWLH, LHCtx)
{false, RWRD, RWCtx} ->
case malformed_link_headers(RWRD, RWCtx) of
Result = {true, _, _} ->
Result;
{false, RWLH, LHCtx} ->
malformed_index_headers(RWLH, LHCtx)
end
end
end
end;
malformed_request(RD, Ctx) ->
case malformed_rw_params(RD, Ctx) of
Result = {true, _, _} ->
malformed_request(RD, Ctx) ->
case malformed_timeout_param(RD, Ctx) of
Result={true, _, _} ->
Result;
{false, ResRD, ResCtx} ->
DocCtx = ensure_doc(ResCtx),
case DocCtx#ctx.doc of
{error, Reason} ->
handle_common_error(Reason, ResRD, DocCtx);
_ ->
{false, ResRD, DocCtx}
{false, ToRD, ToCtx} ->
case malformed_rw_params(ToRD, ToCtx) of
Result = {true, _, _} ->
Result;
{false, ResRD, ResCtx} ->
DocCtx = ensure_doc(ResCtx),
case DocCtx#ctx.doc of
{error, Reason} ->
handle_common_error(Reason, ResRD, DocCtx);
_ ->
{false, ResRD, DocCtx}
end
end
end.

%% @spec malformed_timeout_param(reqdata(), context()) ->
%% {boolean(), reqdata(), context()}
%% @doc Check that the timeout parameter is are a
%% string-encoded integer. Store the integer value
%% in context() if so.
malformed_timeout_param(RD, Ctx) ->
case wrq:get_qs_value("timeout", none, RD) of
none ->
{false, RD, Ctx};
TimeoutStr ->
try
Timeout = list_to_integer(TimeoutStr),
{false, RD, Ctx#ctx{timeout=Timeout}}
catch
_:_ ->
{true,
wrq:append_to_resp_body(io_lib:format("Bad timeout "
"value ~p, "
"using ~p~n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you provide a bad timeout value, it doesn't actually use the default but returns a 400 response.

[TimeoutStr,
?DEFAULT_TIMEOUT]),
wrq:set_resp_header(?HEAD_CTYPE,
"text/plain", RD)),
Ctx}
end
end.

Expand Down Expand Up @@ -611,10 +633,13 @@ accept_doc_body(RD, Ctx=#ctx{bucket=B, key=K, client=C, links=L, index_fields=IF
IndexMD = dict:store(?MD_INDEX, IF, UserMetaMD),
MDDoc = riak_object:update_metadata(VclockDoc, IndexMD),
Doc = riak_object:update_value(MDDoc, riak_kv_wm_utils:accept_value(CType, wrq:req_body(RD))),
Options = case wrq:get_qs_value(?Q_RETURNBODY, RD) of ?Q_TRUE -> [returnbody]; _ -> [] end,
case C:put(Doc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw},
{timeout, Ctx#ctx.timeout} |
Options]) of
Options0 = case wrq:get_qs_value(?Q_RETURNBODY, RD) of ?Q_TRUE -> [returnbody]; _ -> [] end,
Options = case Ctx#ctx.timeout of
undefined -> Options0;
Else -> [{timeout, Else} | Options0]
end,
case C:put(Doc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw} |
Options]) of
{error, Reason} ->
handle_common_error(Reason, RD, Ctx);
ok ->
Expand Down Expand Up @@ -848,19 +873,27 @@ ensure_doc(Ctx=#ctx{doc=undefined, key=undefined}) ->
Ctx#ctx{doc={error, notfound}};
ensure_doc(Ctx=#ctx{doc=undefined, bucket=B, key=K, client=C, r=R,
pr=PR, basic_quorum=Quorum, notfound_ok=NotFoundOK}) ->
Ctx#ctx{doc=C:get(B, K, [deletedvclock, {r, R}, {pr, PR},
{basic_quorum, Quorum},
{notfound_ok, NotFoundOK},
{timeout, Ctx#ctx.timeout}])};
Opts0 = [deletedvclock, {r, R}, {pr, PR},
{basic_quorum, Quorum},
{notfound_ok, NotFoundOK}],
Opts = case Ctx#ctx.timeout of
undefined -> Opts0;
Else -> Opts0 ++ [{timeout, Else}]
end,
Ctx#ctx{doc=C:get(B, K, Opts)};
ensure_doc(Ctx) -> Ctx.

%% @spec delete_resource(reqdata(), context()) -> {true, reqdata(), context()}
%% @doc Delete the document specified.
delete_resource(RD, Ctx=#ctx{bucket=B, key=K, client=C, rw=RW, r=R, w=W,
pr=PR, pw=PW, dw=DW, timeout=Timeout}) ->
Options = lists:filter(fun({_, default}) -> false; (_) -> true end,
[{rw, RW}, {r, R}, {w, W}, {pr, PR}, {pw, PW}, {dw, DW},
{timeout, Timeout}]),
pr=PR, pw=PW, dw=DW}) ->
Options0 = lists:filter(fun({_, default}) -> false; (_) -> true end,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested refactoring:

Opts0 = [{rw, RW}, {r, R}, {w, W}, {pr, PR}, {pw, PW}, {dw, DW}, {timeout, Timeout}],
Options = [ {Opt, Val} || {Opt, Val} <- Opts0, Val /= undefined, Val /= default ]

That pattern might also be useful in these other places where you conditionally include the timeout option, e.g.

Opts0 ++ [ {timeout, Timeout} || Timeout /= undefined ]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's done in three different ways in three different places. Would be happy to refactor it to be uniform, but wasn't sure that this PR was the right place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with adding refactoring all three, or simply making a function to call for the same effect. Cleaning up debt is always good.

Sean Cribbs

On Apr 1, 2013, at 4:29 PM, Evan Vigil-McClanahan notifications@github.com wrote:

In src/riak_kv_wm_object.erl:

ensure_doc(Ctx) -> Ctx.

%% @SPEC delete_resource(reqdata(), context()) -> {true, reqdata(), context()}
%% @doc Delete the document specified.
delete_resource(RD, Ctx=#ctx{bucket=B, key=K, client=C, rw=RW, r=R, w=W,

  •    pr=PR, pw=PW, dw=DW, timeout=Timeout}) ->
    
  • Options = lists:filter(fun({, default}) -> false; () -> true end,
  •    [{rw, RW}, {r, R}, {w, W}, {pr, PR}, {pw, PW}, {dw, DW}, 
    
  •     {timeout, Timeout}]),
    
  •    pr=PR, pw=PW, dw=DW}) ->
    
  • Options0 = lists:filter(fun({, default}) -> false; () -> true end,
    it's done in three different ways in three different places. Would be happy to refactor it to be uniform, but wasn't sure that this PR was the right place.


Reply to this email directly or view it on GitHub.

[{rw, RW}, {r, R}, {w, W}, {pr, PR},
{pw, PW}, {dw, DW}]),
Options = case Ctx#ctx.timeout of
undefined -> Options0;
Else -> Options0 ++ [{timeout, Else}]
end,
Result = case wrq:get_req_header(?HEAD_VCLOCK, RD) of
undefined ->
C:delete(B,K,Options);
Expand Down