Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small optimisation to using webmachine for PUT #1853

Open
wants to merge 4 commits into
base: develop-3.0
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 42 additions & 46 deletions src/riak_kv_wm_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@

-record(ctx, {api_version, %% integer() - Determine which version of the API to use.
bucket_type, %% binary() - Bucket type (from uri)
type_exists, %% bool() - Type exists as riak_core_bucket_type
bucket, %% binary() - Bucket name (from uri)
key, %% binary() - Key (from uri)
client, %% riak_client() - the store client
Expand All @@ -179,6 +180,8 @@
links, %% [link()] - links of the object
index_fields, %% [index_field()]
method, %% atom() - HTTP method for the request
ctype, %% string() - extracted content-type provided
charset, %% string() | undefined - extracted character set provided
timeout, %% integer() - passed-in timeout value in ms
security %% security context
}).
Expand Down Expand Up @@ -225,7 +228,7 @@ init(Props) ->
%% bindings from the dispatch, as well as any vtag
%% query parameter.
service_available(RD, Ctx0=#ctx{riak=RiakProps}) ->
Ctx = riak_kv_wm_utils:ensure_bucket_type(RD, Ctx0, #ctx.bucket_type),
Ctx = ensure_bucket_type(RD, Ctx0),
ClientID = riak_kv_wm_utils:get_client_id(RD),
case riak_kv_wm_utils:get_riak_client(RiakProps, ClientID) of
{ok, C} ->
Expand Down Expand Up @@ -300,7 +303,7 @@ validate(RD, Ctx=#ctx{security=Security}) ->
-spec maybe_validate_resource(
term(), #wm_reqdata{}, context(), string()) -> term().
maybe_validate_resource({false, Error, _}, RD, Ctx, _Perm) ->
RD1 = wrq:set_resp_header("Content-Type", "text/plain", RD),
RD1 = wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD),
{true, wrq:append_to_resp_body(
unicode:characters_to_binary(Error, utf8, utf8),
RD1), Ctx};
Expand Down Expand Up @@ -331,7 +334,7 @@ validate_doc(RD, Ctx) ->

%% @doc Detects whether the requested object's bucket-type exists.
validate_bucket_type(RD, Ctx) ->
case riak_kv_wm_utils:bucket_type_exists(Ctx#ctx.bucket_type) of
case Ctx#ctx.type_exists of
true ->
{false, RD, Ctx};
false ->
Expand Down Expand Up @@ -396,11 +399,16 @@ malformed_request([H|T], RD, Ctx) ->

%% @doc Detects whether the Content-Type header is missing on
%% PUT/POST.
%% This should probably result in a 415 using the known_content_type callback
malformed_content_type(RD, Ctx) ->
case wrq:get_req_header("Content-Type", RD) of
case wrq:get_req_header(?HEAD_CTYPE, RD) of
undefined ->
{true, missing_content_type(RD), Ctx};
_ -> {false, RD, Ctx}
RawCType ->
[ContentType|RawParams] = string:tokens(RawCType, "; "),
Params = [ list_to_tuple(string:tokens(P, "=")) || P <- RawParams],
Charset = proplists:get_value("charset", Params),
{false, RD, Ctx#ctx{ctype = ContentType, charset = Charset}}
end.

-spec malformed_timeout_param(#wm_reqdata{}, context()) ->
Expand Down Expand Up @@ -493,7 +501,7 @@ malformed_custom_param({Idx, Name, Default, AllowedValues}, {Result, RD, Ctx}) -
list_to_atom(
string:to_lower(
wrq:get_qs_value(Name, Default, RD))),
1,
1,
AllowedValueTuples),
case Option of
false ->
Expand Down Expand Up @@ -594,7 +602,7 @@ malformed_index_headers(RD, Ctx) ->
extract_index_fields(RD) ->
PrefixSize = length(?HEAD_INDEX_PREFIX),
{ok, RE} = re:compile(",\\s"),
F =
F =
fun({K,V}, Acc) ->
KList = riak_kv_wm_utils:any_to_list(K),
case lists:prefix(?HEAD_INDEX_PREFIX, string:to_lower(KList)) of
Expand Down Expand Up @@ -622,11 +630,10 @@ extract_index_fields(RD) ->
%% @doc List the content types available for representing this resource.
%% The content-type for a key-level request is the content-type that
%% was used in the PUT request that stored the document in Riak.
content_types_provided(RD, Ctx=#ctx{method=Method}=Ctx)
content_types_provided(RD, Ctx=#ctx{method=Method, ctype=ContentType})
when Method =:= 'PUT'; Method =:= 'POST' ->
{ContentType, _} = extract_content_type(RD),
{[{ContentType, produce_doc_body}], RD, Ctx};
content_types_provided(RD, Ctx=#ctx{method=Method}=Ctx)
content_types_provided(RD, Ctx=#ctx{method=Method})
when Method =:= 'DELETE' ->
{[{"text/html", to_html}], RD, Ctx};
content_types_provided(RD, Ctx0) ->
Expand All @@ -647,15 +654,15 @@ content_types_provided(RD, Ctx0) ->
%% The charset for a key-level request is the charset that was used
%% in the PUT request that stored the document in Riak (none if
%% no charset was specified at PUT-time).
charsets_provided(RD, Ctx=#ctx{method=Method}=Ctx)
charsets_provided(RD, Ctx=#ctx{method=Method})
when Method =:= 'PUT'; Method =:= 'POST' ->
case extract_content_type(RD) of
{_, undefined} ->
case Ctx#ctx.charset of
undefined ->
{no_charset, RD, Ctx};
{_, Charset} ->
Charset ->
{[{Charset, fun(X) -> X end}], RD, Ctx}
end;
charsets_provided(RD, Ctx=#ctx{method=Method}=Ctx)
charsets_provided(RD, Ctx=#ctx{method=Method})
when Method =:= 'DELETE' ->
{no_charset, RD, Ctx};
charsets_provided(RD, Ctx0) ->
Expand Down Expand Up @@ -774,7 +781,7 @@ resource_exists(RD, Ctx0) ->
DocCtx#ctx{vtag=Vtag}}
end;
{error, _} ->
%% This should never actually be reached because all the
%% This should never actually be reached because all the
%% error conditions from ensure_doc are handled up in
%% malformed_request.
{false, RD, DocCtx}
Expand All @@ -798,7 +805,7 @@ is_conflict(RD, Ctx) ->
{false, RD, Ctx};
{UpdM, NotModifiedClock} when UpdM =:= 'PUT'; UpdM =:= 'POST' ->
case Ctx#ctx.doc of
{ok, Obj} ->
{ok, Obj} ->
InClock =
riak_object:decode_vclock(
base64:decode(NotModifiedClock)),
Expand Down Expand Up @@ -865,14 +872,13 @@ accept_doc_body(
RD,
Ctx=#ctx{
bucket_type=T, bucket=B, key=K, client=C,
links=L,
links=L, ctype=CType, charset=Charset,
index_fields=IF}) ->
Doc0 = riak_object:new(riak_kv_wm_utils:maybe_bucket_type(T,B), K, <<>>),
VclockDoc = riak_object:set_vclock(Doc0, decode_vclock_header(RD)),
{CType, Charset} = extract_content_type(RD),
UserMeta = extract_user_meta(RD),
CTypeMD = dict:store(?MD_CTYPE, CType, dict:new()),
CharsetMD =
CharsetMD =
if Charset /= undefined ->
dict:store(?MD_CHARSET, Charset, CTypeMD);
true ->
Expand Down Expand Up @@ -952,21 +958,6 @@ add_conditional_headers(RD, Ctx) ->
calendar:universal_time_to_local_time(LM)), RD4),
{RD5,Ctx3}.

-spec extract_content_type(#wm_reqdata{}) ->
{ContentType::string(), Charset::string()|undefined}.
%% @doc Interpret the Content-Type header in the client's PUT request.
%% This function extracts the content type and charset for use
%% in subsequent GET requests.
extract_content_type(RD) ->
case wrq:get_req_header(?HEAD_CTYPE, RD) of
undefined ->
undefined;
RawCType ->
[CType|RawParams] = string:tokens(RawCType, "; "),
Params = [ list_to_tuple(string:tokens(P, "=")) || P <- RawParams],
{CType, proplists:get_value("charset", Params)}
end.

-spec extract_user_meta(#wm_reqdata{}) -> proplists:proplist().
%% @doc Extract headers prefixed by X-Riak-Meta- in the client's PUT request
%% to be returned by subsequent GET requests.
Expand Down Expand Up @@ -1165,7 +1156,7 @@ ensure_doc(Ctx=#ctx{doc=undefined, key=undefined}) ->
Ctx#ctx{doc={error, notfound}};
ensure_doc(Ctx=#ctx{doc=undefined, bucket_type=T, bucket=B, key=K, client=C,
basic_quorum=Quorum, notfound_ok=NotFoundOK}) ->
case riak_kv_wm_utils:bucket_type_exists(T) of
case Ctx#ctx.type_exists of
true ->
Options0 =
[deletedvclock,
Expand Down Expand Up @@ -1228,7 +1219,7 @@ generate_etag(RD, Ctx) ->
-spec last_modified(#wm_reqdata{}, context()) ->
{undefined|calendar:datetime(), #wm_reqdata{}, context()}.
%% @doc Get the last-modified time for this resource.
%% Documents will have the last-modified time specified by the
%% Documents will have the last-modified time specified by the
%% riak_object.
%% For documents with siblings, this is the last-modified time of the
%% latest sibling.
Expand Down Expand Up @@ -1339,11 +1330,11 @@ get_ctype(MD,V) ->
end.

missing_content_type(RD) ->
RD1 = wrq:set_resp_header("Content-Type", "text/plain", RD),
RD1 = wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD),
wrq:append_to_response_body(<<"Missing Content-Type request header">>, RD1).

send_precommit_error(RD, Reason) ->
RD1 = wrq:set_resp_header("Content-Type", "text/plain", RD),
RD1 = wrq:set_resp_header(?HEAD_CTYPE, "text/plain", RD),
Error = if
Reason =:= undefined ->
list_to_binary([atom_to_binary(wrq:method(RD1), utf8),
Expand All @@ -1364,14 +1355,14 @@ handle_common_error(Reason, RD, Ctx) ->
" to satisfy W/DW\n", RD), Ctx};
{error, timeout} ->
{{halt, 503},
wrq:set_resp_header("Content-Type", "text/plain",
wrq:set_resp_header(?HEAD_CTYPE, "text/plain",
wrq:append_to_response_body(
io_lib:format("request timed out~n",[]),
RD)),
Ctx};
{error, notfound} ->
{{halt, 404},
wrq:set_resp_header("Content-Type", "text/plain",
wrq:set_resp_header(?HEAD_CTYPE, "text/plain",
wrq:append_to_response_body(
io_lib:format("not found~n",[]),
RD)),
Expand All @@ -1384,7 +1375,7 @@ handle_common_error(Reason, RD, Ctx) ->
Ctx};
{error, {deleted, _VClock}} ->
{{halt, 404},
wrq:set_resp_header("Content-Type", "text/plain",
wrq:set_resp_header(?HEAD_CTYPE, "text/plain",
wrq:set_resp_header(?HEAD_DELETED, "true",
wrq:append_to_response_body(
io_lib:format("not found~n",[]),
Expand All @@ -1396,23 +1387,23 @@ handle_common_error(Reason, RD, Ctx) ->
{{halt, 400}, wrq:append_to_response_body(Msg, RD), Ctx};
{error, {r_val_unsatisfied, Requested, Returned}} ->
{{halt, 503},
wrq:set_resp_header("Content-Type", "text/plain",
wrq:set_resp_header(?HEAD_CTYPE, "text/plain",
wrq:append_to_response_body(
io_lib:format("R-value unsatisfied: ~p/~p~n",
[Returned, Requested]),
RD)),
Ctx};
{error, {dw_val_unsatisfied, DW, NumDW}} ->
{{halt, 503},
wrq:set_resp_header("Content-Type", "text/plain",
wrq:set_resp_header(?HEAD_CTYPE, "text/plain",
wrq:append_to_response_body(
io_lib:format("DW-value unsatisfied: ~p/~p~n",
[NumDW, DW]),
RD)),
Ctx};
{error, {pr_val_unsatisfied, Requested, Returned}} ->
{{halt, 503},
wrq:set_resp_header("Content-Type", "text/plain",
wrq:set_resp_header(?HEAD_CTYPE, "text/plain",
wrq:append_to_response_body(
io_lib:format("PR-value unsatisfied: ~p/~p~n",
[Returned, Requested]),
Expand All @@ -1430,7 +1421,7 @@ handle_common_error(Reason, RD, Ctx) ->
{{halt, 412}, RD, Ctx};
{error, Err} ->
{{halt, 500},
wrq:set_resp_header("Content-Type", "text/plain",
wrq:set_resp_header(?HEAD_CTYPE, "text/plain",
wrq:append_to_response_body(
io_lib:format("Error:~n~p~n",[Err]),
RD)),
Expand All @@ -1447,3 +1438,8 @@ make_options(Prev, Ctx) ->
NewOpts = [ {Opt, Val} || {Opt, Val} <- NewOpts0,
Val /= undefined, Val /= default ],
Prev ++ NewOpts.

ensure_bucket_type(RD, Ctx) ->
Ctx0 = riak_kv_wm_utils:ensure_bucket_type(RD, Ctx, #ctx.bucket_type),
Ctx0#ctx{type_exists =
riak_kv_wm_utils:bucket_type_exists(Ctx0#ctx.bucket_type)}.