From 676f230fd0fe93265585e2db5d2b9a74f4342507 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 3 Feb 2025 17:29:41 +0000 Subject: [PATCH 1/4] Experiment with using persistent_Term not ETS --- src/riak_api_pb_registrar.erl | 240 ++++++++++-------------- src/riak_api_pb_registrar.hrl | 8 +- src/riak_api_pb_registration_helper.erl | 172 ----------------- src/riak_api_pb_server.erl | 47 +++-- src/riak_api_sup.erl | 3 +- test/pb_service_test.erl | 21 ++- 6 files changed, 140 insertions(+), 351 deletions(-) delete mode 100644 src/riak_api_pb_registration_helper.erl diff --git a/src/riak_api_pb_registrar.erl b/src/riak_api_pb_registrar.erl index 7815040..0897241 100644 --- a/src/riak_api_pb_registrar.erl +++ b/src/riak_api_pb_registrar.erl @@ -42,14 +42,13 @@ register/1, deregister/1, swap/3, - set_heir/1, services/0, lookup/1 ]). -record(state, { opq = [] :: [ tuple() ], %% A list of registrations to reply to once we have the table - owned = false :: boolean() %% Whether the registrar owns the table yet + owned = true :: boolean() %% Whether the registrar owns the table yet }). -include("riak_api_pb_registrar.hrl"). @@ -66,7 +65,8 @@ %% @doc Starts the registrar server -spec start_link() -> {ok, pid()}. start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + {ok, Pid} = gen_server:start_link({local, ?SERVER}, ?MODULE, [], []), + {ok, Pid}. %% @doc Registers a range of message codes to named services. -spec register([riak_api_pb_service:registration()]) -> ok | {error, Reason::term()}. @@ -91,40 +91,38 @@ deregister(Registrations) -> swap(NewModule, MinCode, MaxCode) -> gen_server:call(?SERVER, {swap, {NewModule, MinCode, MaxCode}}, infinity). -%% @doc Sets the heir of the registrations table on behalf of the -%% helper process. -%% @private --spec set_heir(pid()) -> ok. -set_heir(Pid) -> - gen_server:call(?SERVER, {set_heir, Pid}, infinity). - %% @doc Lists registered service modules. -spec services() -> [ module() ]. services() -> - lists:usort([ Service || {_Code, Service} <- ets:tab2list(?ETS_NAME)]). + lists:usort( + lists:filtermap( + fun(PKV) -> + case PKV of + {{Name, _Code}, Service} + when Name == ?ETS_NAME, is_atom(Service) -> + {true, Service}; + _ -> + false + end + end, + persistent_term:get() + ) + ). %% @doc Looks up the registration of a given message code. -spec lookup(non_neg_integer()) -> {ok, module()} | error. lookup(Code) -> - case ets:lookup(?ETS_NAME, Code) of - [{Code, Service}] -> {ok, Service}; - _ -> error + case persistent_term:get({?ETS_NAME, Code}, error) of + error -> error; + Service -> {ok, Service} end. %%-------------------------------------------------------------------- %%% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> - ok = riak_api_pb_registration_helper:claim_table(), {ok, #state{}}. -handle_call({Op, Args}, From, #state{opq=OpQ, owned=false}=State) -> - %% Since we don't own the table yet, we enqueue the registration - %% operations until we get the ETS-TRANSFER message. - {noreply, State#state{opq=[{{Op, Args}, From}|OpQ]}}; -handle_call({set_heir, Pid}, _From, #state{owned=true}=State) -> - ets:setopts(?ETS_NAME, [{heir, Pid, undefined}]), - {reply, ok, State}; handle_call({register, Registrations}, _From, State) -> Reply = do_register(Registrations), {reply, Reply, State}; @@ -139,11 +137,6 @@ handle_call({swap, {NewModule, MinCode, MaxCode}}, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'ETS-TRANSFER', ?ETS_NAME, _, _}, #state{opq=OpQ, owned=false}=State) -> - %% We've queued up a bunch of registration/deregistration ops, - %% lets process them now that we have the table. - NewState = lists:foldr(fun queue_folder/2, State#state{opq=[], owned=true}, OpQ), - {noreply, NewState}; handle_info(_Info, State) -> {noreply, State}. @@ -156,10 +149,6 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -queue_folder({Op, From}, FState) -> - {reply, Reply, NewFState} = handle_call(Op, From, FState), - gen_server:reply(From, Reply), - NewFState. do_register([]) -> ok; @@ -179,7 +168,12 @@ do_register(Module, MinCode, MaxCode) -> CodeRange = lists:seq(MinCode, MaxCode), case lists:filter(fun is_registered/1, CodeRange) of [] -> - ets:insert(?ETS_NAME, [{Code, Module} || Code <- CodeRange ]), + lists:map( + fun(Code) -> + persistent_term:put({?ETS_NAME, Code}, Module) + end, + CodeRange + ), riak_api_pb_sup:service_registered(Module), ok; AlreadyClaimed -> @@ -191,7 +185,12 @@ do_swap(NewModule, MinCode, MaxCode) -> Matching = lists:filter(fun is_registered/1, CodeRange), case length(Matching) == length(CodeRange) of true -> - ets:insert(?ETS_NAME, [{Code, NewModule} || Code <- CodeRange]), + lists:map( + fun(Code) -> + persistent_term:put({?ETS_NAME, Code}, NewModule) + end, + CodeRange + ), riak_api_pb_sup:service_registered(NewModule); false -> {error, {range_not_registered, CodeRange}} @@ -214,21 +213,29 @@ MaxCode < 1 -> do_deregister(Module, MinCode, MaxCode) -> CodeRange = lists:seq(MinCode, MaxCode), %% Figure out whether all of the codes can be deregistered. - Mapper = fun(I) -> - case ets:lookup(?ETS_NAME, I) of - [] -> - {error, {unregistered, I}}; - [{I, Module}] -> - I; - [{I, _OtherModule}] -> - {error, {not_owned, I}} - end - end, - ToRemove = [ Mapper(I) || I <- CodeRange ], + ToRemove = + lists:map( + fun(Code) -> + case persistent_term:get({?ETS_NAME, Code}, error) of + error -> + {error, {unregistered, Code}}; + Module -> + Code; + _OtherModule -> + {error, {not_owned, Code}} + end + end, + CodeRange + ), case ToRemove of CodeRange -> %% All codes are valid, so remove them. - _ = [ ets:delete(?ETS_NAME, Code) || Code <- CodeRange ], + lists:foreach( + fun(Code) -> + persistent_term:erase({?ETS_NAME, Code}) + end, + CodeRange + ), riak_api_pb_sup:service_registered(Module), ok; _ -> @@ -237,13 +244,13 @@ do_deregister(Module, MinCode, MaxCode) -> end. is_registered(Code) -> - ets:member(?ETS_NAME, Code). + error =/= persistent_term:get({?ETS_NAME, Code}, error). -ifdef(TEST). test_start() -> %% Since registration is now a pair of processes, we need both. - {ok, test_start(riak_api_pb_registration_helper), test_start(?MODULE)}. + {ok, test_start(?MODULE)}. test_start(Module) -> case gen_server:start({local, Module}, Module, [], []) of @@ -255,37 +262,57 @@ test_start(Module) -> end. setup() -> - {ok, HelperPid, Pid} = test_start(), - {Pid, HelperPid}. - -cleanup({Pid, HelperPid}) -> - exit(Pid, brutal_kill), - exit(HelperPid, brutal_kill). + {ok, Pid} = test_start(), + Pid. + +cleanup(Pid) -> + lists:foreach( + fun(PKT) -> + case PKT of + {{Name, Code}, Service} + when Name == ?ETS_NAME, is_atom(Service) -> + persistent_term:erase({Name, Code}); + _ -> + ok + end + end, + persistent_term:get() + ), + exit(Pid, brutal_kill). deregister_test_() -> {foreach, fun setup/0, fun cleanup/1, [ - %% Deregister a previously registered service - ?_assertEqual(ok, begin - ok = riak_api_pb_service:register(foo, 1, 2), - riak_api_pb_service:deregister(foo, 1, 2) - end), - %% Invalid deregistration: range is invalid - ?_assertEqual({error, invalid_message_code_range}, riak_api_pb_service:deregister(foo, 2, 1)), - %% Invalid deregistration: unregistered range - ?_assertEqual({error, {unregistered, 1}}, riak_api_pb_service:deregister(foo, 1, 1)), - %% Invalid deregistration: registered to other service - ?_assertEqual({error, {not_owned, 1}}, begin - ok = riak_api_pb_service:register(foo, 1, 2), - riak_api_pb_service:deregister(bar, 1) - end), - %% Deregister multiple - ?_assertEqual(ok, begin - ok = riak_api_pb_service:register([{foo, 1, 2}, {bar, 3, 4}]), - riak_api_pb_service:deregister([{bar, 3, 4}, {foo, 1, 2}]) - end) + %% Deregister a previously registered service + ?_assertEqual( + ok, + begin + ok = riak_api_pb_service:register(foo, 1, 2), + riak_api_pb_service:deregister(foo, 1, 2) + end + ), + %% Invalid deregistration: range is invalid + ?_assertEqual({error, invalid_message_code_range}, riak_api_pb_service:deregister(foo, 2, 1)), + %% Invalid deregistration: unregistered range + ?_assertEqual({error, {unregistered, 1}}, riak_api_pb_service:deregister(foo, 1, 1)), + %% Invalid deregistration: registered to other service + ?_assertEqual( + {error, {not_owned, 1}}, + begin + ok = riak_api_pb_service:register(foo, 1, 2), + riak_api_pb_service:deregister(bar, 1) + end + ), + %% Deregister multiple + ?_assertEqual( + ok, + begin + ok = riak_api_pb_service:register([{foo, 1, 2}, {bar, 3, 4}]), + riak_api_pb_service:deregister([{bar, 3, 4}, {foo, 1, 2}]) + end + ) ]}. register_test_() -> @@ -323,74 +350,5 @@ services_test_() -> end) ]}. -registration_inheritance_test_() -> - {foreach, - fun setup/0, - fun({Pid, HelperPid}) -> - [ exit(P, brutal_kill) || P <- [Pid, HelperPid], - is_process_alive(P) ] - end, - [ - %% Killing registrar causes helper to receive table. Restarting - %% it causes it to become owner again. - ?_test(begin - Helper = whereis(riak_api_pb_registration_helper), - Registrar = whereis(?MODULE), - exit(Registrar, brutal_kill), - erlang:yield(), - timer:sleep(1), - ?assertEqual(Helper, proplists:get_value(owner, ets:info(?ETS_NAME))), - NewReg = test_start(?MODULE), - erlang:yield(), - ?assertEqual(NewReg, proplists:get_value(owner, ets:info(?ETS_NAME))), - ?assertEqual(Helper, proplists:get_value(heir, ets:info(?ETS_NAME))), - exit(NewReg, brutal_kill) - end), - - %% Killing and restarting helper causes it to become - %% the heir again. - ?_test(begin - Helper = whereis(riak_api_pb_registration_helper), - exit(Helper, brutal_kill), - erlang:yield(), - NewHelper = test_start(riak_api_pb_registration_helper), - erlang:yield(), - ?assertEqual(NewHelper, proplists:get_value(heir, ets:info(?ETS_NAME))) - end), - - %% Registrar should queue up requests while it is not in - %% ownership of the table. - ?_test(begin - Outer = self(), - %% Helper = whereis(riak_api_pb_registration_helper), - ?assertEqual(ok, riak_api_pb_service:register(bar, 1000, 1000)), - Registrar = whereis(?MODULE), - exit(Registrar, brutal_kill), - meck:new(riak_api_pb_registration_helper, [passthrough]), - meck:expect(riak_api_pb_registration_helper, handle_call, - fun(claim_table, {Pid, _Tag}=From, State) -> - gen_server:reply(From, ok), - timer:sleep(100), - ets:give_away(?ETS_NAME, Pid, undefined), - {noreply, State} - end), - NewReg = test_start(?MODULE), - spawn(fun() -> - Outer ! {100, riak_api_pb_service:register(foo, 100, 100)} - end), - spawn(fun() -> - Outer ! {101, riak_api_pb_service:register(foo, 101, 101)} - end), - spawn(fun() -> - Outer ! {1000, riak_api_pb_service:swap(foo, 1000, 1000)} - end), - ?assertEqual(ok, receive {100, Msg} -> Msg after 1500 -> fail end), - ?assertEqual(ok, receive {101, Msg} -> Msg after 1500 -> fail end), - ?assertEqual(ok, receive {1000, Msg} -> Msg after 1500 -> fail end), - meck:unload(), - exit(NewReg, brutal_kill) - end) - - ]}. -endif. diff --git a/src/riak_api_pb_registrar.hrl b/src/riak_api_pb_registrar.hrl index d4a9df1..08914fc 100644 --- a/src/riak_api_pb_registrar.hrl +++ b/src/riak_api_pb_registrar.hrl @@ -19,10 +19,4 @@ %% under the License. %% %% ------------------------------------------------------------------- --define(ETS_NAME, riak_api_pb_registrations). --define(ETS_HEIR, {heir, self(), undefined}). --define(ETS_OPTS, [protected, - named_table, - set, - ?ETS_HEIR, - {read_concurrency, true}]). +-define(ETS_NAME, riak_api_pb_registrations). \ No newline at end of file diff --git a/src/riak_api_pb_registration_helper.erl b/src/riak_api_pb_registration_helper.erl deleted file mode 100644 index 51cb00f..0000000 --- a/src/riak_api_pb_registration_helper.erl +++ /dev/null @@ -1,172 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% riak_api_pb_registration_helper: PB API Registration table manager -%% -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - -%% @doc A gen_server process that creates and serves as heir to the -%% message-code registration ETS table. Should the registrar process -%% exit, this server will inherit the ETS table and hand it back to -%% the registrar process when it restarts. --module(riak_api_pb_registration_helper). - --behaviour(gen_server). - --include("riak_api_pb_registrar.hrl"). - -%% API --export([start_link/0, - claim_table/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --include_lib("kernel/include/logger.hrl"). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @doc -%% Starts the server -%% -%% @end -%%-------------------------------------------------------------------- --spec start_link() -> {ok, Pid::pid()} | ignore | {error, Error::term()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - - -%%-------------------------------------------------------------------- -%% @doc -%% Gives the registration table away to the caller, which should be -%% the registrar process. -%% -%% @end -%%-------------------------------------------------------------------- --spec claim_table() -> ok. -claim_table() -> - gen_server:call(?SERVER, claim_table, infinity). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% @end -%%-------------------------------------------------------------------- --spec init([]) -> {ok, undefined}. -init([]) -> - case ets:info(?ETS_NAME) of - undefined -> - %% Table does not exist, so we create the table and wait - %% for the registrar to claim it. - ?ETS_NAME = ets:new(?ETS_NAME, ?ETS_OPTS), - {ok, undefined}; - List when is_list(List) -> - %% This process must have been restarted, because the table - %% already exists. Let's try to become the heir again. - ?LOG_DEBUG("PB registration helper restarted as ~p, becoming heir", [self()]), - riak_api_pb_registrar:set_heir(self()), - {ok, undefined} - end. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% @end -%%-------------------------------------------------------------------- --spec handle_call(Msg::term(), From::{pid(), term()}, State::term()) -> - {reply, Reply::term(), State::term()} | - {noreply, State::term()}. -handle_call(claim_table, {Pid, _Tag}, State) -> - %% The registrar is (re-)claiming the table, let's give it away. We - %% assume this process is the heir, which is set on startup or - %% transfer of the table. - ?LOG_DEBUG("Giving away PB registration table to ~p", [Pid]), - ets:give_away(?ETS_NAME, Pid, undefined), - Reply = ok, - {reply, Reply, State}; - -handle_call(_Msg, _From, State) -> - {noreply, State}. - - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% @end -%%-------------------------------------------------------------------- --spec handle_cast(term(), term()) -> {noreply, State::term()}. -handle_cast(_Msg, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% -%% @spec handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- --spec handle_info(term(), term()) -> {noreply, State::term()}. -handle_info({'ETS-TRANSFER', ?ETS_NAME, FromPid, _HeirData}, State) -> - %% The registrar process exited and transferred the table back to - %% the helper. - ?LOG_DEBUG("PB Registrar ~p exited, ~p received table", [FromPid, self()]), - {noreply, State}; - -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% -%% @end -%%-------------------------------------------------------------------- --spec terminate(term(), term()) -> ok. -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process state when code is changed -%% -%% @end -%%-------------------------------------------------------------------- --spec code_change(term(), term(), term()) -> {ok, term()}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/riak_api_pb_server.erl b/src/riak_api_pb_server.erl index 87de38f..003fe8d 100644 --- a/src/riak_api_pb_server.erl +++ b/src/riak_api_pb_server.erl @@ -55,7 +55,7 @@ transport = {gen_tcp, inet} :: {gen_tcp, inet} | {ssl, ssl}, socket :: port() | ssl:sslsocket(), % socket req, % current request - states :: orddict:orddict(), % per-service connection state + states :: #{module() => term()}, % per-service connection state peername :: undefined | {inet:ip_address(), pos_integer()}, common_name :: undefined | string(), security, @@ -101,11 +101,14 @@ service_registered(Pid, Mod) -> -spec init(list()) -> {ok, wait_for_socket, #state{}}. init([]) -> riak_api_stat:update(pbc_connect), - ServiceStates = lists:foldl(fun(Service, States) -> - orddict:store(Service, Service:init(), States) - end, - orddict:new(), - riak_api_pb_registrar:services()), + ServiceStates = + lists:foldl( + fun(Service, States) -> + maps:put(Service, Service:init(), States) + end, + maps:new(), + riak_api_pb_registrar:services() + ), {ok, wait_for_socket, #state{states=ServiceStates}}. wait_for_socket(_Event, State) -> @@ -230,7 +233,7 @@ connected({msg, MsgCode, MsgData}, State=#state{states=ServiceStates}) -> %% First find the appropriate service module to dispatch NewState = case riak_api_pb_registrar:lookup(MsgCode) of {ok, Service} -> - ServiceState = orddict:fetch(Service, ServiceStates), + ServiceState = maps:get(Service, ServiceStates), %% Decode the message according to the service case Service:decode(MsgCode, MsgData) of {ok, Message} -> @@ -283,16 +286,21 @@ handle_event({registered, Service}, StateName, #state{states=ServiceStates}=Stat %% When a new service is registered after a client connection is %% already established, update the internal state to support the %% new capabilities. - case orddict:is_key(Service, ServiceStates) of + case maps:is_key(Service, ServiceStates) of true -> %% This is an existing service registering %% disjoint message codes {next_state, StateName, State, 0}; false -> %% This is a new service registering - {next_state, StateName, - State#state{states=orddict:store(Service, Service:init(), - ServiceStates)}, 0} + { + next_state, + StateName, + State#state{ + states=maps:put(Service, Service:init(),ServiceStates) + }, + 0 + } end; handle_event(_Msg, StateName, State) -> {next_state, StateName, State, 0}. @@ -460,13 +468,13 @@ update_service_state(Service, NewServiceState, _OldServiceState, #state{req={Ser %% While streaming, we avoid extra fetches of the state by %% including it in the current request field. When req is %% undefined (set at the end of the stream), it will be updated - %% into the orddict. + %% into the map. ServerState#state{req={Service,ReqId,NewServiceState}}; update_service_state(_Service, OldServiceState, OldServiceState, ServerState) -> %% If the service state is unchanged, don't bother storing it again. ServerState; update_service_state(Service, NewServiceState, _OldServiceState, #state{states=ServiceStates}=ServerState) -> - NewServiceStates = orddict:store(Service, NewServiceState, ServiceStates), + NewServiceStates = maps:put(Service, NewServiceState, ServiceStates), ServerState#state{states=NewServiceStates}. %% @doc Given an unencoded response message, attempts to encode it and send it @@ -545,17 +553,8 @@ format_peername({IP, Port}) -> receive_closed_socket_test_() -> {setup, - fun() -> - %% Create the registration table so the server will start up. - try ets:new(?ETS_NAME, ?ETS_OPTS) of - ?ETS_NAME -> true - catch - _:badarg -> false - end - end, - fun(true) -> ets:delete(?ETS_NAME); - (_) -> ok - end, + fun() -> ok end, + fun(_) -> ok end, ?_test( begin %% Pretend that we're a listener, listen on any port diff --git a/src/riak_api_sup.erl b/src/riak_api_sup.erl index 5c2cd2f..3b084f8 100644 --- a/src/riak_api_sup.erl +++ b/src/riak_api_sup.erl @@ -49,12 +49,11 @@ start_link() -> MaxT :: pos_integer(), ChildSpec :: supervisor:child_spec(). init([]) -> - Helper = ?CHILD(riak_api_pb_registration_helper, worker), Registrar = ?CHILD(riak_api_pb_registrar, worker), PBProcesses = pb_processes(riak_api_pb_listener:get_listeners()), WebProcesses = web_processes(riak_api_web:get_listeners()), NetworkProcesses = PBProcesses ++ WebProcesses, - {ok, {{one_for_one, 10, 10}, [Helper, Registrar|NetworkProcesses]}}. + {ok, {{one_for_one, 10, 10}, [Registrar|NetworkProcesses]}}. %% Generates child specs from the HTTP/HTTPS listener configuration. %% @private diff --git a/test/pb_service_test.erl b/test/pb_service_test.erl index 58b79e9..9f4cc67 100644 --- a/test/pb_service_test.erl +++ b/test/pb_service_test.erl @@ -87,15 +87,14 @@ process_stream(_, _, State) -> %% =================================================================== %% Eunit tests %% =================================================================== + +-include("riak_api_pb_registrar.hrl"). + setup() -> - application:load(lager), application:load(riak_api), - LogFile = filename:join([code:priv_dir(riak_api), "pb_service_test.log"]), error_logger:tty(false), - application:set_env(lager, handlers, [{lager_file_backend, [{LogFile, debug, 10485760, "$D0", 5}]}]), - application:set_env(lager, error_logger_redirect, true), - + %% Need riak_core.security capability, let's fake it ets:new(riak_capability_ets, [named_table, {read_concurrency, true}]), ets:insert(riak_capability_ets, {{riak_core, security}, false}), @@ -115,6 +114,18 @@ cleanup({L, Sup}) -> ets:delete(riak_capability_ets), exit(Sup, normal), application:set_env(riak_api, pb, L), + lists:foreach( + fun(PKT) -> + case PKT of + {{Name, Code}, Service} + when Name == ?ETS_NAME, is_atom(Service) -> + persistent_term:erase({Name, Code}); + _ -> + ok + end + end, + persistent_term:get() + ), ok. request_multi(Payloads) when is_list(Payloads) -> From 3d0ac3b187eec98a6ea033183d8de7b41704bab7 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 5 Feb 2025 15:25:24 +0000 Subject: [PATCH 2/4] Update meck --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 915edcf..7ecb085 100644 --- a/rebar.config +++ b/rebar.config @@ -16,7 +16,7 @@ ]}. {profiles, [ - {test, [{deps, [meck]}]}, + {test, [{deps, [{meck, {git, "https://github.com/OpenRiak/meck.git", {branch, "openriak-3.2"}}}]}]}, {gha, [{erl_opts, [{d, 'GITHUBEXCLUDE'}]}]} ]}. From d9dddc882771c0b007e3ae009dab37a3629638b6 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 5 Feb 2025 17:33:57 +0000 Subject: [PATCH 3/4] Switch to map-based metadata decoding --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 7ecb085..92dc6dd 100644 --- a/rebar.config +++ b/rebar.config @@ -9,7 +9,7 @@ {eunit_opts, [verbose]}. {deps, [ - {riak_pb, {git, "https://github.com/OpenRiak/riak_pb.git", {branch, "openriak-3.4"}}}, + {riak_pb, {git, "https://github.com/OpenRiak/riak_pb.git", {branch, "nhse-o34-or.i10-nodict"}}}, {webmachine, {git, "https://github.com/OpenRiak/webmachine.git", {branch, "openriak-3.4"}}}, {mochiweb, {git, "https://github.com/OpenRiak/mochiweb.git", {branch, "openriak-3.4"}}}, {riak_core, {git, "https://github.com/OpenRiak/riak_core.git", {branch, "openriak-3.4"}}} From e5d07847a76166f672f42bd5bb56cf615a5a597e Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 5 Feb 2025 20:00:03 +0000 Subject: [PATCH 4/4] clear_all at startup, as nothing should be registered --- src/riak_api_app.erl | 1 - src/riak_api_pb_registrar.erl | 15 +++++++++++++++ src/riak_api_sup.erl | 1 + test/pb_service_test.erl | 20 ++++++++------------ 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/riak_api_app.erl b/src/riak_api_app.erl index 9ea43ec..a2d3f8a 100644 --- a/src/riak_api_app.erl +++ b/src/riak_api_app.erl @@ -48,7 +48,6 @@ Reason :: term(). start(_Type, _StartArgs) -> riak_core_util:start_app_deps(riak_api), - case riak_api_sup:start_link() of {ok, Pid} -> riak_core:register(riak_api, [{stat_mod, riak_api_stat}]), diff --git a/src/riak_api_pb_registrar.erl b/src/riak_api_pb_registrar.erl index 0897241..69658d3 100644 --- a/src/riak_api_pb_registrar.erl +++ b/src/riak_api_pb_registrar.erl @@ -42,6 +42,7 @@ register/1, deregister/1, swap/3, + clear_all/0, services/0, lookup/1 ]). @@ -109,6 +110,20 @@ services() -> ) ). +-spec clear_all() -> ok. +clear_all() -> + lists:foreach( + fun({K, _V}) -> + case K of + {Name, Code} when Name == ?ETS_NAME -> + persistent_term:erase({Name, Code}); + _ -> + ok + end + end, + persistent_term:get() + ). + %% @doc Looks up the registration of a given message code. -spec lookup(non_neg_integer()) -> {ok, module()} | error. lookup(Code) -> diff --git a/src/riak_api_sup.erl b/src/riak_api_sup.erl index 3b084f8..2890d92 100644 --- a/src/riak_api_sup.erl +++ b/src/riak_api_sup.erl @@ -53,6 +53,7 @@ init([]) -> PBProcesses = pb_processes(riak_api_pb_listener:get_listeners()), WebProcesses = web_processes(riak_api_web:get_listeners()), NetworkProcesses = PBProcesses ++ WebProcesses, + riak_api_pb_registrar:clear_all(), {ok, {{one_for_one, 10, 10}, [Registrar|NetworkProcesses]}}. %% Generates child specs from the HTTP/HTTPS listener configuration. diff --git a/test/pb_service_test.erl b/test/pb_service_test.erl index 9f4cc67..0f5078e 100644 --- a/test/pb_service_test.erl +++ b/test/pb_service_test.erl @@ -114,18 +114,14 @@ cleanup({L, Sup}) -> ets:delete(riak_capability_ets), exit(Sup, normal), application:set_env(riak_api, pb, L), - lists:foreach( - fun(PKT) -> - case PKT of - {{Name, Code}, Service} - when Name == ?ETS_NAME, is_atom(Service) -> - persistent_term:erase({Name, Code}); - _ -> - ok - end - end, - persistent_term:get() - ), + case is_process_alive(Sup) of + true -> + % If startup too quick after cleanup, may still be alive + timer:sleep(10), + false = is_process_alive(Sup); + _ -> + ok + end, ok. request_multi(Payloads) when is_list(Payloads) ->