Skip to content

Commit 3750a36

Browse files
committed
Merge pull request #521 from basho/pevm-client-specified-timeouts
Changes needed to expose FSM timeouts to clients
2 parents 0a84af1 + 8609e3b commit 3750a36

File tree

3 files changed

+85
-61
lines changed

3 files changed

+85
-61
lines changed

src/riak_client.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ delete(Bucket,Key,Options,Timeout) when is_list(Options) ->
250250
ReqId = mk_reqid(),
251251
riak_kv_delete_sup:start_delete(Node, [ReqId, Bucket, Key, Options, Timeout,
252252
Me, ClientId]),
253-
wait_for_reqid(ReqId, Timeout);
253+
RTimeout = recv_timeout(Options),
254+
wait_for_reqid(ReqId, erlang:min(Timeout, RTimeout));
254255
delete(Bucket,Key,RW,Timeout) ->
255256
delete(Bucket,Key,[{rw, RW}], Timeout).
256257

@@ -295,7 +296,8 @@ delete_vclock(Bucket,Key,VClock,Options,Timeout) when is_list(Options) ->
295296
ReqId = mk_reqid(),
296297
riak_kv_delete_sup:start_delete(Node, [ReqId, Bucket, Key, Options, Timeout,
297298
Me, ClientId, VClock]),
298-
wait_for_reqid(ReqId, Timeout);
299+
RTimeout = recv_timeout(Options),
300+
wait_for_reqid(ReqId, erlang:min(Timeout, RTimeout));
299301
delete_vclock(Bucket,Key,VClock,RW,Timeout) ->
300302
delete_vclock(Bucket,Key,VClock,[{rw, RW}],Timeout).
301303

src/riak_kv_pb_object.erl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@
6565
req_ctx, % context to go along with request (partial results, request ids etc)
6666
client_id = <<0,0,0,0>> }). % emulate legacy API when vnode_vclocks is true
6767

68-
-define(DEFAULT_TIMEOUT, 60000).
69-
7068
%% @doc init/0 callback. Returns the service internal start
7169
%% state.
7270
-spec init() -> any().
@@ -102,12 +100,14 @@ process(#rpbsetclientidreq{client_id = ClientId}, State) ->
102100

103101
process(#rpbgetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk,
104102
basic_quorum=BQ, if_modified=VClock,
105-
head=Head, deletedvclock=DeletedVClock}, #state{client=C} = State) ->
103+
head=Head, deletedvclock=DeletedVClock,
104+
timeout=Timeout}, #state{client=C} = State) ->
106105
R = decode_quorum(R0),
107106
PR = decode_quorum(PR0),
108107
case C:get(B, K, make_option(deletedvclock, DeletedVClock) ++
109108
make_option(r, R) ++
110109
make_option(pr, PR) ++
110+
make_option(timeout, Timeout) ++
111111
make_option(notfound_ok, NFOk) ++
112112
make_option(basic_quorum, BQ)) of
113113
{ok, O} ->
@@ -166,7 +166,7 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC,
166166

167167
process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
168168
w=W0, dw=DW0, pw=PW0, return_body=ReturnBody,
169-
return_head=ReturnHead},
169+
return_head=ReturnHead, timeout=Timeout},
170170
#state{client=C} = State) ->
171171

172172
case K of
@@ -196,7 +196,8 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
196196
end
197197
end,
198198
case C:put(O, make_option(w, W) ++ make_option(dw, DW) ++
199-
make_option(pw, PW) ++ [{timeout, default_timeout()} | Options]) of
199+
make_option(pw, PW) ++ make_option(timeout, Timeout) ++
200+
Options) of
200201
ok when is_binary(ReturnKey) ->
201202
PutResp = #rpbputresp{key = ReturnKey},
202203
{reply, PutResp, State};
@@ -226,7 +227,8 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
226227
end;
227228

228229
process(#rpbdelreq{bucket=B, key=K, vclock=PbVc,
229-
r=R0, w=W0, pr=PR0, pw=PW0, dw=DW0, rw=RW0},
230+
r=R0, w=W0, pr=PR0, pw=PW0, dw=DW0, rw=RW0,
231+
timeout=Timeout},
230232
#state{client=C} = State) ->
231233
W = decode_quorum(W0),
232234
PW = decode_quorum(PW0),
@@ -240,7 +242,8 @@ process(#rpbdelreq{bucket=B, key=K, vclock=PbVc,
240242
make_option(rw, RW) ++
241243
make_option(pr, PR) ++
242244
make_option(pw, PW) ++
243-
make_option(dw, DW),
245+
make_option(dw, DW) ++
246+
make_option(timeout, Timeout),
244247
Result = case PbVc of
245248
undefined ->
246249
C:delete(B, K, Options);
@@ -298,6 +301,3 @@ erlify_rpbvc(PbVc) ->
298301
%% Convert a vector clock to protocol buffers
299302
pbify_rpbvc(Vc) ->
300303
zlib:zip(term_to_binary(Vc)).
301-
302-
default_timeout() ->
303-
?DEFAULT_TIMEOUT.

src/riak_kv_wm_object.erl

Lines changed: 71 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -168,20 +168,6 @@ init(Props) ->
168168
%% bindings from the dispatch, as well as any vtag
169169
%% query parameter.
170170
service_available(RD, Ctx=#ctx{riak=RiakProps}) ->
171-
Timeout = case wrq:get_req_header(?HEAD_TIMEOUT, RD) of
172-
undefined -> ?DEFAULT_TIMEOUT;
173-
TimeoutStr ->
174-
try
175-
list_to_integer(TimeoutStr)
176-
catch
177-
_:_ ->
178-
lager:error("Bad timeout value ~p, "
179-
"using ~d~n",
180-
[TimeoutStr, ?DEFAULT_TIMEOUT]),
181-
?DEFAULT_TIMEOUT
182-
end
183-
end,
184-
185171
case riak_kv_wm_utils:get_riak_client(RiakProps, riak_kv_wm_utils:get_client_id(RD)) of
186172
{ok, C} ->
187173
{true,
@@ -197,8 +183,7 @@ service_available(RD, Ctx=#ctx{riak=RiakProps}) ->
197183
undefined -> undefined;
198184
K -> list_to_binary(riak_kv_wm_utils:maybe_decode_uri(RD, K))
199185
end,
200-
vtag=wrq:get_qs_value(?Q_VTAG, RD),
201-
timeout=Timeout
186+
vtag=wrq:get_qs_value(?Q_VTAG, RD)
202187
}};
203188
Error ->
204189
{false,
@@ -245,29 +230,64 @@ malformed_request(RD, Ctx) when Ctx#ctx.method =:= 'POST'
245230
undefined ->
246231
{true, missing_content_type(RD), Ctx};
247232
_ ->
248-
case malformed_rw_params(RD, Ctx) of
249-
Result={true, _, _} ->
233+
case malformed_timeout_param(RD, Ctx) of
234+
Result={true, _, _} ->
250235
Result;
251-
{false, RWRD, RWCtx} ->
252-
case malformed_link_headers(RWRD, RWCtx) of
253-
Result = {true, _, _} ->
236+
{false, ToRD, ToCtx} ->
237+
case malformed_rw_params(ToRD, ToCtx) of
238+
Result={true, _, _} ->
254239
Result;
255-
{false, RWLH, LHCtx} ->
256-
malformed_index_headers(RWLH, LHCtx)
240+
{false, RWRD, RWCtx} ->
241+
case malformed_link_headers(RWRD, RWCtx) of
242+
Result = {true, _, _} ->
243+
Result;
244+
{false, RWLH, LHCtx} ->
245+
malformed_index_headers(RWLH, LHCtx)
246+
end
257247
end
258248
end
259249
end;
260-
malformed_request(RD, Ctx) ->
261-
case malformed_rw_params(RD, Ctx) of
262-
Result = {true, _, _} ->
250+
malformed_request(RD, Ctx) ->
251+
case malformed_timeout_param(RD, Ctx) of
252+
Result={true, _, _} ->
263253
Result;
264-
{false, ResRD, ResCtx} ->
265-
DocCtx = ensure_doc(ResCtx),
266-
case DocCtx#ctx.doc of
267-
{error, Reason} ->
268-
handle_common_error(Reason, ResRD, DocCtx);
269-
_ ->
270-
{false, ResRD, DocCtx}
254+
{false, ToRD, ToCtx} ->
255+
case malformed_rw_params(ToRD, ToCtx) of
256+
Result = {true, _, _} ->
257+
Result;
258+
{false, ResRD, ResCtx} ->
259+
DocCtx = ensure_doc(ResCtx),
260+
case DocCtx#ctx.doc of
261+
{error, Reason} ->
262+
handle_common_error(Reason, ResRD, DocCtx);
263+
_ ->
264+
{false, ResRD, DocCtx}
265+
end
266+
end
267+
end.
268+
269+
%% @spec malformed_timeout_param(reqdata(), context()) ->
270+
%% {boolean(), reqdata(), context()}
271+
%% @doc Check that the timeout parameter is are a
272+
%% string-encoded integer. Store the integer value
273+
%% in context() if so.
274+
malformed_timeout_param(RD, Ctx) ->
275+
case wrq:get_qs_value("timeout", none, RD) of
276+
none ->
277+
{false, RD, Ctx};
278+
TimeoutStr ->
279+
try
280+
Timeout = list_to_integer(TimeoutStr),
281+
{false, RD, Ctx#ctx{timeout=Timeout}}
282+
catch
283+
_:_ ->
284+
{true,
285+
wrq:append_to_resp_body(io_lib:format("Bad timeout "
286+
"value ~p~n",
287+
[TimeoutStr]),
288+
wrq:set_resp_header(?HEAD_CTYPE,
289+
"text/plain", RD)),
290+
Ctx}
271291
end
272292
end.
273293

@@ -611,10 +631,9 @@ accept_doc_body(RD, Ctx=#ctx{bucket=B, key=K, client=C, links=L, index_fields=IF
611631
IndexMD = dict:store(?MD_INDEX, IF, UserMetaMD),
612632
MDDoc = riak_object:update_metadata(VclockDoc, IndexMD),
613633
Doc = riak_object:update_value(MDDoc, riak_kv_wm_utils:accept_value(CType, wrq:req_body(RD))),
614-
Options = case wrq:get_qs_value(?Q_RETURNBODY, RD) of ?Q_TRUE -> [returnbody]; _ -> [] end,
615-
case C:put(Doc, [{w, Ctx#ctx.w}, {dw, Ctx#ctx.dw}, {pw, Ctx#ctx.pw},
616-
{timeout, Ctx#ctx.timeout} |
617-
Options]) of
634+
Options0 = case wrq:get_qs_value(?Q_RETURNBODY, RD) of ?Q_TRUE -> [returnbody]; _ -> [] end,
635+
Options = make_options(Options0, Ctx),
636+
case C:put(Doc, Options) of
618637
{error, Reason} ->
619638
handle_common_error(Reason, RD, Ctx);
620639
ok ->
@@ -846,21 +865,18 @@ decode_vclock_header(RD) ->
846865
%% worry about the order of executing of those places.
847866
ensure_doc(Ctx=#ctx{doc=undefined, key=undefined}) ->
848867
Ctx#ctx{doc={error, notfound}};
849-
ensure_doc(Ctx=#ctx{doc=undefined, bucket=B, key=K, client=C, r=R,
850-
pr=PR, basic_quorum=Quorum, notfound_ok=NotFoundOK}) ->
851-
Ctx#ctx{doc=C:get(B, K, [deletedvclock, {r, R}, {pr, PR},
852-
{basic_quorum, Quorum},
853-
{notfound_ok, NotFoundOK},
854-
{timeout, Ctx#ctx.timeout}])};
868+
ensure_doc(Ctx=#ctx{doc=undefined, bucket=B, key=K, client=C,
869+
basic_quorum=Quorum, notfound_ok=NotFoundOK}) ->
870+
Options0 = [deletedvclock, {basic_quorum, Quorum},
871+
{notfound_ok, NotFoundOK}],
872+
Options = make_options(Options0, Ctx),
873+
Ctx#ctx{doc=C:get(B, K, Options)};
855874
ensure_doc(Ctx) -> Ctx.
856875

857876
%% @spec delete_resource(reqdata(), context()) -> {true, reqdata(), context()}
858877
%% @doc Delete the document specified.
859-
delete_resource(RD, Ctx=#ctx{bucket=B, key=K, client=C, rw=RW, r=R, w=W,
860-
pr=PR, pw=PW, dw=DW, timeout=Timeout}) ->
861-
Options = lists:filter(fun({_, default}) -> false; (_) -> true end,
862-
[{rw, RW}, {r, R}, {w, W}, {pr, PR}, {pw, PW}, {dw, DW},
863-
{timeout, Timeout}]),
878+
delete_resource(RD, Ctx=#ctx{bucket=B, key=K, client=C}) ->
879+
Options = make_options([], Ctx),
864880
Result = case wrq:get_req_header(?HEAD_VCLOCK, RD) of
865881
undefined ->
866882
C:delete(B,K,Options);
@@ -1083,4 +1099,10 @@ handle_common_error(Reason, RD, Ctx) ->
10831099
Ctx}
10841100
end.
10851101

1086-
1102+
make_options(Prev, Ctx) ->
1103+
NewOpts0 = [{rw, Ctx#ctx.rw}, {r, Ctx#ctx.r}, {w, Ctx#ctx.w},
1104+
{pr, Ctx#ctx.pr}, {pw, Ctx#ctx.pw}, {dw, Ctx#ctx.dw},
1105+
{timeout, Ctx#ctx.timeout}],
1106+
NewOpts = [ {Opt, Val} || {Opt, Val} <- NewOpts0,
1107+
Val /= undefined, Val /= default ],
1108+
Prev ++ NewOpts.

0 commit comments

Comments
 (0)