Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment with using persistent_Term not ETS for PB registrar #2

Open
wants to merge 4 commits into
base: openriak-3.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
{eunit_opts, [verbose]}.

{deps, [
{riak_pb, {git, "https://github.com/OpenRiak/riak_pb.git", {branch, "openriak-3.4"}}},
{riak_pb, {git, "https://github.com/OpenRiak/riak_pb.git", {branch, "nhse-o34-or.i10-nodict"}}},
{webmachine, {git, "https://github.com/OpenRiak/webmachine.git", {branch, "openriak-3.4"}}},
{mochiweb, {git, "https://github.com/OpenRiak/mochiweb.git", {branch, "openriak-3.4"}}},
{riak_core, {git, "https://github.com/OpenRiak/riak_core.git", {branch, "openriak-3.4"}}}
]}.

{profiles, [
{test, [{deps, [meck]}]},
{test, [{deps, [{meck, {git, "https://github.com/OpenRiak/meck.git", {branch, "openriak-3.2"}}}]}]},
{gha, [{erl_opts, [{d, 'GITHUBEXCLUDE'}]}]}
]}.

Expand Down
1 change: 0 additions & 1 deletion src/riak_api_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
Reason :: term().
start(_Type, _StartArgs) ->
riak_core_util:start_app_deps(riak_api),

case riak_api_sup:start_link() of
{ok, Pid} ->
riak_core:register(riak_api, [{stat_mod, riak_api_stat}]),
Expand Down
255 changes: 114 additions & 141 deletions src/riak_api_pb_registrar.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@
register/1,
deregister/1,
swap/3,
set_heir/1,
clear_all/0,
services/0,
lookup/1
]).

-record(state, {
opq = [] :: [ tuple() ], %% A list of registrations to reply to once we have the table
owned = false :: boolean() %% Whether the registrar owns the table yet
owned = true :: boolean() %% Whether the registrar owns the table yet
}).

-include("riak_api_pb_registrar.hrl").
Expand All @@ -66,7 +66,8 @@
%% @doc Starts the registrar server
-spec start_link() -> {ok, pid()}.
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
{ok, Pid} = gen_server:start_link({local, ?SERVER}, ?MODULE, [], []),
{ok, Pid}.

%% @doc Registers a range of message codes to named services.
-spec register([riak_api_pb_service:registration()]) -> ok | {error, Reason::term()}.
Expand All @@ -91,40 +92,52 @@ deregister(Registrations) ->
swap(NewModule, MinCode, MaxCode) ->
gen_server:call(?SERVER, {swap, {NewModule, MinCode, MaxCode}}, infinity).

%% @doc Sets the heir of the registrations table on behalf of the
%% helper process.
%% @private
-spec set_heir(pid()) -> ok.
set_heir(Pid) ->
gen_server:call(?SERVER, {set_heir, Pid}, infinity).

%% @doc Lists registered service modules.
-spec services() -> [ module() ].
services() ->
lists:usort([ Service || {_Code, Service} <- ets:tab2list(?ETS_NAME)]).
lists:usort(
lists:filtermap(
fun(PKV) ->
case PKV of
{{Name, _Code}, Service}
when Name == ?ETS_NAME, is_atom(Service) ->
{true, Service};
_ ->
false
end
end,
persistent_term:get()
)
).

-spec clear_all() -> ok.
clear_all() ->
lists:foreach(
fun({K, _V}) ->
case K of
{Name, Code} when Name == ?ETS_NAME ->
persistent_term:erase({Name, Code});
_ ->
ok
end
end,
persistent_term:get()
).

%% @doc Looks up the registration of a given message code.
-spec lookup(non_neg_integer()) -> {ok, module()} | error.
lookup(Code) ->
case ets:lookup(?ETS_NAME, Code) of
[{Code, Service}] -> {ok, Service};
_ -> error
case persistent_term:get({?ETS_NAME, Code}, error) of
error -> error;
Service -> {ok, Service}
end.

%%--------------------------------------------------------------------
%%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
ok = riak_api_pb_registration_helper:claim_table(),
{ok, #state{}}.

handle_call({Op, Args}, From, #state{opq=OpQ, owned=false}=State) ->
%% Since we don't own the table yet, we enqueue the registration
%% operations until we get the ETS-TRANSFER message.
{noreply, State#state{opq=[{{Op, Args}, From}|OpQ]}};
handle_call({set_heir, Pid}, _From, #state{owned=true}=State) ->
ets:setopts(?ETS_NAME, [{heir, Pid, undefined}]),
{reply, ok, State};
handle_call({register, Registrations}, _From, State) ->
Reply = do_register(Registrations),
{reply, Reply, State};
Expand All @@ -139,11 +152,6 @@ handle_call({swap, {NewModule, MinCode, MaxCode}}, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'ETS-TRANSFER', ?ETS_NAME, _, _}, #state{opq=OpQ, owned=false}=State) ->
%% We've queued up a bunch of registration/deregistration ops,
%% lets process them now that we have the table.
NewState = lists:foldr(fun queue_folder/2, State#state{opq=[], owned=true}, OpQ),
{noreply, NewState};
handle_info(_Info, State) ->
{noreply, State}.

Expand All @@ -156,10 +164,6 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
queue_folder({Op, From}, FState) ->
{reply, Reply, NewFState} = handle_call(Op, From, FState),
gen_server:reply(From, Reply),
NewFState.

do_register([]) ->
ok;
Expand All @@ -179,7 +183,12 @@ do_register(Module, MinCode, MaxCode) ->
CodeRange = lists:seq(MinCode, MaxCode),
case lists:filter(fun is_registered/1, CodeRange) of
[] ->
ets:insert(?ETS_NAME, [{Code, Module} || Code <- CodeRange ]),
lists:map(
fun(Code) ->
persistent_term:put({?ETS_NAME, Code}, Module)
end,
CodeRange
),
riak_api_pb_sup:service_registered(Module),
ok;
AlreadyClaimed ->
Expand All @@ -191,7 +200,12 @@ do_swap(NewModule, MinCode, MaxCode) ->
Matching = lists:filter(fun is_registered/1, CodeRange),
case length(Matching) == length(CodeRange) of
true ->
ets:insert(?ETS_NAME, [{Code, NewModule} || Code <- CodeRange]),
lists:map(
fun(Code) ->
persistent_term:put({?ETS_NAME, Code}, NewModule)
end,
CodeRange
),
riak_api_pb_sup:service_registered(NewModule);
false ->
{error, {range_not_registered, CodeRange}}
Expand All @@ -214,21 +228,29 @@ MaxCode < 1 ->
do_deregister(Module, MinCode, MaxCode) ->
CodeRange = lists:seq(MinCode, MaxCode),
%% Figure out whether all of the codes can be deregistered.
Mapper = fun(I) ->
case ets:lookup(?ETS_NAME, I) of
[] ->
{error, {unregistered, I}};
[{I, Module}] ->
I;
[{I, _OtherModule}] ->
{error, {not_owned, I}}
end
end,
ToRemove = [ Mapper(I) || I <- CodeRange ],
ToRemove =
lists:map(
fun(Code) ->
case persistent_term:get({?ETS_NAME, Code}, error) of
error ->
{error, {unregistered, Code}};
Module ->
Code;
_OtherModule ->
{error, {not_owned, Code}}
end
end,
CodeRange
),
case ToRemove of
CodeRange ->
%% All codes are valid, so remove them.
_ = [ ets:delete(?ETS_NAME, Code) || Code <- CodeRange ],
lists:foreach(
fun(Code) ->
persistent_term:erase({?ETS_NAME, Code})
end,
CodeRange
),
riak_api_pb_sup:service_registered(Module),
ok;
_ ->
Expand All @@ -237,13 +259,13 @@ do_deregister(Module, MinCode, MaxCode) ->
end.

is_registered(Code) ->
ets:member(?ETS_NAME, Code).
error =/= persistent_term:get({?ETS_NAME, Code}, error).

-ifdef(TEST).

test_start() ->
%% Since registration is now a pair of processes, we need both.
{ok, test_start(riak_api_pb_registration_helper), test_start(?MODULE)}.
{ok, test_start(?MODULE)}.

test_start(Module) ->
case gen_server:start({local, Module}, Module, [], []) of
Expand All @@ -255,37 +277,57 @@ test_start(Module) ->
end.

setup() ->
{ok, HelperPid, Pid} = test_start(),
{Pid, HelperPid}.

cleanup({Pid, HelperPid}) ->
exit(Pid, brutal_kill),
exit(HelperPid, brutal_kill).
{ok, Pid} = test_start(),
Pid.

cleanup(Pid) ->
lists:foreach(
fun(PKT) ->
case PKT of
{{Name, Code}, Service}
when Name == ?ETS_NAME, is_atom(Service) ->
persistent_term:erase({Name, Code});
_ ->
ok
end
end,
persistent_term:get()
),
exit(Pid, brutal_kill).

deregister_test_() ->
{foreach,
fun setup/0,
fun cleanup/1,
[
%% Deregister a previously registered service
?_assertEqual(ok, begin
ok = riak_api_pb_service:register(foo, 1, 2),
riak_api_pb_service:deregister(foo, 1, 2)
end),
%% Invalid deregistration: range is invalid
?_assertEqual({error, invalid_message_code_range}, riak_api_pb_service:deregister(foo, 2, 1)),
%% Invalid deregistration: unregistered range
?_assertEqual({error, {unregistered, 1}}, riak_api_pb_service:deregister(foo, 1, 1)),
%% Invalid deregistration: registered to other service
?_assertEqual({error, {not_owned, 1}}, begin
ok = riak_api_pb_service:register(foo, 1, 2),
riak_api_pb_service:deregister(bar, 1)
end),
%% Deregister multiple
?_assertEqual(ok, begin
ok = riak_api_pb_service:register([{foo, 1, 2}, {bar, 3, 4}]),
riak_api_pb_service:deregister([{bar, 3, 4}, {foo, 1, 2}])
end)
%% Deregister a previously registered service
?_assertEqual(
ok,
begin
ok = riak_api_pb_service:register(foo, 1, 2),
riak_api_pb_service:deregister(foo, 1, 2)
end
),
%% Invalid deregistration: range is invalid
?_assertEqual({error, invalid_message_code_range}, riak_api_pb_service:deregister(foo, 2, 1)),
%% Invalid deregistration: unregistered range
?_assertEqual({error, {unregistered, 1}}, riak_api_pb_service:deregister(foo, 1, 1)),
%% Invalid deregistration: registered to other service
?_assertEqual(
{error, {not_owned, 1}},
begin
ok = riak_api_pb_service:register(foo, 1, 2),
riak_api_pb_service:deregister(bar, 1)
end
),
%% Deregister multiple
?_assertEqual(
ok,
begin
ok = riak_api_pb_service:register([{foo, 1, 2}, {bar, 3, 4}]),
riak_api_pb_service:deregister([{bar, 3, 4}, {foo, 1, 2}])
end
)
]}.

register_test_() ->
Expand Down Expand Up @@ -323,74 +365,5 @@ services_test_() ->
end)
]}.

registration_inheritance_test_() ->
{foreach,
fun setup/0,
fun({Pid, HelperPid}) ->
[ exit(P, brutal_kill) || P <- [Pid, HelperPid],
is_process_alive(P) ]
end,
[
%% Killing registrar causes helper to receive table. Restarting
%% it causes it to become owner again.
?_test(begin
Helper = whereis(riak_api_pb_registration_helper),
Registrar = whereis(?MODULE),
exit(Registrar, brutal_kill),
erlang:yield(),
timer:sleep(1),
?assertEqual(Helper, proplists:get_value(owner, ets:info(?ETS_NAME))),
NewReg = test_start(?MODULE),
erlang:yield(),
?assertEqual(NewReg, proplists:get_value(owner, ets:info(?ETS_NAME))),
?assertEqual(Helper, proplists:get_value(heir, ets:info(?ETS_NAME))),
exit(NewReg, brutal_kill)
end),

%% Killing and restarting helper causes it to become
%% the heir again.
?_test(begin
Helper = whereis(riak_api_pb_registration_helper),
exit(Helper, brutal_kill),
erlang:yield(),
NewHelper = test_start(riak_api_pb_registration_helper),
erlang:yield(),
?assertEqual(NewHelper, proplists:get_value(heir, ets:info(?ETS_NAME)))
end),

%% Registrar should queue up requests while it is not in
%% ownership of the table.
?_test(begin
Outer = self(),
%% Helper = whereis(riak_api_pb_registration_helper),
?assertEqual(ok, riak_api_pb_service:register(bar, 1000, 1000)),
Registrar = whereis(?MODULE),
exit(Registrar, brutal_kill),
meck:new(riak_api_pb_registration_helper, [passthrough]),
meck:expect(riak_api_pb_registration_helper, handle_call,
fun(claim_table, {Pid, _Tag}=From, State) ->
gen_server:reply(From, ok),
timer:sleep(100),
ets:give_away(?ETS_NAME, Pid, undefined),
{noreply, State}
end),
NewReg = test_start(?MODULE),
spawn(fun() ->
Outer ! {100, riak_api_pb_service:register(foo, 100, 100)}
end),
spawn(fun() ->
Outer ! {101, riak_api_pb_service:register(foo, 101, 101)}
end),
spawn(fun() ->
Outer ! {1000, riak_api_pb_service:swap(foo, 1000, 1000)}
end),
?assertEqual(ok, receive {100, Msg} -> Msg after 1500 -> fail end),
?assertEqual(ok, receive {101, Msg} -> Msg after 1500 -> fail end),
?assertEqual(ok, receive {1000, Msg} -> Msg after 1500 -> fail end),
meck:unload(),
exit(NewReg, brutal_kill)
end)

]}.

-endif.
Loading