From dea41707d990ed37ae12ffca05583799fde951e2 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sat, 2 Sep 2023 09:12:50 +0100 Subject: [PATCH 1/2] Merge pull request #1 from nhs-riak/nhse-contrib-kv1871 KV i1871 - Handle timeout on remote connection --- src/riak_kv_replrtq_peer.erl | 11 +++-- src/riak_kv_replrtq_snk.erl | 88 ++++++++++++++++++++---------------- 2 files changed, 57 insertions(+), 42 deletions(-) diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index 8c92df1b6..bbeb19455 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -41,8 +41,8 @@ -type discovery_peer() :: {riak_kv_replrtq_snk:queue_name(), [riak_kv_replrtq_snk:peer_info()]}. --define(DISCOVERY_TIMEOUT_SECONDS, 60). --define(UPDATE_TIMEOUT_SECONDS, 60). +-define(DISCOVERY_TIMEOUT_SECONDS, 300). +-define(UPDATE_TIMEOUT_SECONDS, 300). -define(AUTO_DISCOVERY_MAXIMUM_SECONDS, 900). -define(AUTO_DISCOVERY_MINIMUM_SECONDS, 60). @@ -66,7 +66,7 @@ update_discovery(QueueName) -> ?DISCOVERY_TIMEOUT_SECONDS * 1000). -spec update_workers(pos_integer(), pos_integer()) -> boolean(). -update_workers(WorkerCount, PerPeerLimit) -> +update_workers(WorkerCount, PerPeerLimit) when PerPeerLimit =< WorkerCount -> gen_server:call( ?MODULE, {update_workers, WorkerCount, PerPeerLimit}, @@ -142,6 +142,11 @@ handle_info({scheduled_discovery, QueueName}, State) -> ?AUTO_DISCOVERY_MAXIMUM_SECONDS), Delay = rand:uniform(max(1, MaxDelay - MinDelay)) + MinDelay, _ = schedule_discovery(QueueName, self(), Delay), + {noreply, State}; +handle_info({Ref, {error, HTTPClientError}}, State) when is_reference(Ref) -> + lager:info( + "Client error caught - error ~p returned after timeout", + [HTTPClientError]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index dbfb4949e..28478f179 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -31,6 +31,7 @@ handle_call/3, handle_cast/2, handle_info/2, + handle_continue/2, terminate/2, code_change/3]). @@ -65,7 +66,7 @@ -define(STARTING_DELAYMS, 8). -define(MAX_SUCCESS_DELAYMS, 1024). -define(ON_ERROR_DELAYMS, 65536). --define(INACTIVITY_TIMEOUT_MS, 60000). +-define(INITIAL_TIMEOUT_MS, 60000). -define(DEFAULT_WORKERCOUNT, 1). -record(sink_work, {queue_name :: queue_name(), @@ -208,10 +209,13 @@ add_snkqueue(QueueName, Peers, WorkerCount) -> %% number of workers overall -spec add_snkqueue(queue_name(), list(peer_info()), pos_integer(), pos_integer()) -> ok. -add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) - when PerPeerLimit =< WorkerCount -> - gen_server:call(?MODULE, - {add, QueueName, Peers, WorkerCount, PerPeerLimit}). +add_snkqueue( + QueueName, Peers, WorkerCount, PerPeerLimit) + when PerPeerLimit =< WorkerCount -> + gen_server:call( + ?MODULE, + {add, QueueName, Peers, WorkerCount, PerPeerLimit}, + infinity). %% @doc @@ -220,7 +224,7 @@ add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) %% Returns undefined if there are currently no peers defined. -spec current_peers(queue_name()) -> list(peer_info())|undefined. current_peers(QueueName) -> - gen_server:call(?MODULE, {current_peers, QueueName}). + gen_server:call(?MODULE, {current_peers, QueueName}, infinity). %% @doc @@ -236,47 +240,26 @@ set_workercount(QueueName, WorkerCount) -> %% @doc %% Change the number of concurrent workers whilst limiting the number of %% workers per peer --spec set_workercount(queue_name(), pos_integer(), pos_integer()) - -> ok|not_found. -set_workercount(QueueName, WorkerCount, PerPeerLimit) - when PerPeerLimit =< WorkerCount -> - gen_server:call(?MODULE, - {worker_count, QueueName, WorkerCount, PerPeerLimit}). +-spec set_workercount( + queue_name(), pos_integer(), pos_integer()) -> ok|not_found. +set_workercount( + QueueName, WorkerCount, PerPeerLimit) + when PerPeerLimit =< WorkerCount -> + gen_server:call( + ?MODULE, + {worker_count, QueueName, WorkerCount, PerPeerLimit}, + infinity + ). %%%============================================================================ %%% gen_server callbacks %%%============================================================================ init([]) -> - SinkEnabled = - app_helper:get_env(riak_kv, replrtq_enablesink, false), + SinkEnabled = app_helper:get_env(riak_kv, replrtq_enablesink, false), case SinkEnabled of true -> - SinkPeers = - app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), - DefaultQueue = - app_helper:get_env(riak_kv, replrtq_sinkqueue), - SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), - {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), - Iteration = 1, - MapPeerInfoFun = - fun({SnkQueueName, SnkPeerInfo}) -> - {SnkQueueLength, SnkWorkQueue} = - determine_workitems(SnkQueueName, - Iteration, - SnkPeerInfo, - SnkWorkerCount, - min(SnkWorkerCount, PerPeerLimit)), - SnkW = - #sink_work{queue_name = SnkQueueName, - work_queue = SnkWorkQueue, - minimum_queue_length = SnkQueueLength, - peer_list = SnkPeerInfo, - max_worker_count = SnkWorkerCount}, - {SnkQueueName, Iteration, SnkW} - end, - Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), - {ok, #state{enabled = true, work = Work}, ?INACTIVITY_TIMEOUT_MS}; + {ok, #state{}, {continue, initialise_work}}; false -> {ok, #state{}} end. @@ -432,6 +415,33 @@ handle_info({prompt_requeue, WorkItem}, State) -> requeue_work(WorkItem), {noreply, State}. +handle_continue(initialise_work, State) -> + SinkPeers = + app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), + DefaultQueue = + app_helper:get_env(riak_kv, replrtq_sinkqueue), + SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), + {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), + Iteration = 1, + MapPeerInfoFun = + fun({SnkQueueName, SnkPeerInfo}) -> + {SnkQueueLength, SnkWorkQueue} = + determine_workitems(SnkQueueName, + Iteration, + SnkPeerInfo, + SnkWorkerCount, + min(SnkWorkerCount, PerPeerLimit)), + SnkW = + #sink_work{queue_name = SnkQueueName, + work_queue = SnkWorkQueue, + minimum_queue_length = SnkQueueLength, + peer_list = SnkPeerInfo, + max_worker_count = SnkWorkerCount}, + {SnkQueueName, Iteration, SnkW} + end, + Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), + {noreply, State#state{enabled = true, work = Work}, ?INITIAL_TIMEOUT_MS}. + terminate(_Reason, State) -> WorkItems = lists:map(fun(SW) -> element(3, SW) end, State#state.work), CloseFun = From 763f749e6701f1a4a9deccdc96641c22f06ce9f6 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 4 Sep 2023 22:49:16 +0100 Subject: [PATCH 2/2] Force timeout to trigger (#3) Previously, the inactivity timeout on handle_continue could be cancelled by a call to riak_kv_rpelrtq_snk (e.g. from riak_kv_rpelrtq_peer). this might lead to the log_stats loop never being triggered. --- src/riak_kv_replrtq_snk.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index 28478f179..e0328c84d 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -397,7 +397,7 @@ handle_cast({requeue_work, WorkItem}, State) -> {noreply, State} end. -handle_info(timeout, State) -> +handle_info(deferred_start, State) -> prompt_work(), erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats), {noreply, State}; @@ -440,7 +440,8 @@ handle_continue(initialise_work, State) -> {SnkQueueName, Iteration, SnkW} end, Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), - {noreply, State#state{enabled = true, work = Work}, ?INITIAL_TIMEOUT_MS}. + erlang:send_after(?INITIAL_TIMEOUT_MS, self(), deferred_start), + {noreply, State#state{enabled = true, work = Work}}. terminate(_Reason, State) -> WorkItems = lists:map(fun(SW) -> element(3, SW) end, State#state.work),