Skip to content

Commit

Permalink
Move heartbeat timeout to config
Browse files Browse the repository at this point in the history
Add force processing
  • Loading branch information
kostiushkin committed Aug 25, 2017
1 parent 91f263b commit b90a5f3
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 43 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ Getting Started
===============

```erl

%% Push
1> QueueId = 1.
1> application:start(wolfmq).
ok
2>
2> QueueId = erlang:unique_integer([monotonic, positive]) rem 10.
1
2> F = fun() -> io:format("F message") end.
#Fun<erl_eval.20.80484245>
3> MFA = {io, format, ["MFA message~n", []]}.
{io,format,["MFA message~n",[]]}
4> ok = wolfmq:push(QueueId, F).
3>
3> ok = wolfmq:push(QueueId, fun() -> io:format("Hello world!~n"), ok end).
Hello world!
ok
4> ok = wolfmq:push(QueueId, MFA).
4>
4> ok = wolfmq:push(QueueId, {io, format, ["Hello world!~n"]}).
Hello world!
ok

```

Project Chat Room
Expand Down
4 changes: 3 additions & 1 deletion src/wolfmq.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
%% -*- mode: erlang -*-
{application, wolfmq, [
{description, "Erlang message broker"},
{vsn, "0.2.0"},
{vsn, "0.3.0"},
{modules, []},
{registered, []},
{applications, [
Expand All @@ -9,6 +10,7 @@
]},
{mod, {wolfmq_app, []}},
{env, [
{heartbeat_timeout, 1},
{idle_timeout, 10}
]}
]}.
6 changes: 1 addition & 5 deletions src/wolfmq.erl
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
-module(wolfmq).

%% API
-export([push/1, push/2]).
-export([push/2]).

%% API
push(Task) ->
QueueId = self(),
push(QueueId, Task).

push(QueueId, Task) ->
wolfmq_mgr:push(QueueId, Task).
19 changes: 13 additions & 6 deletions src/wolfmq_mgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ push(QueueId, Task) ->
end,
add_to_queue(QueueId, Task).

open_queue(QueueId, {EtsId, HandlerPid}) ->
true = ets:insert(wolfmq_queues, {QueueId, EtsId, HandlerPid}),
open_queue(QueueId, {EtsId, WorkerPid}) ->
true = ets:insert(wolfmq_queues, {QueueId, EtsId, WorkerPid}),
ok.

close_queue(QueueId) ->
Expand Down Expand Up @@ -66,20 +66,27 @@ terminate(_Reason, _State) ->
is_existing_queue(QueueId) ->
case ets:lookup(wolfmq_queues, QueueId) of
[] -> false;
[{QueueId, _, HandlerPid}] -> is_process_alive(HandlerPid)
[{QueueId, _, WorkerPid}] -> is_process_alive(WorkerPid)
end.

add_to_queue(QueueId, Tasks) when is_list(Tasks) ->
Now = erlang_system_time(micro_seconds),
[{QueueId, EtsId, _HandlerPid}] = ets:lookup(wolfmq_queues, QueueId),
[{QueueId, EtsId, WorkerPid}] = ets:lookup(wolfmq_queues, QueueId),
QueueSize = ets:info(EtsId, size),
List = [{Now, Task} || Task <- Tasks],
true = ets:insert(EtsId, List),
ok;
force_processing(WorkerPid, QueueSize);
add_to_queue(QueueId, Task) ->
Now = erlang_system_time(micro_seconds),
[{QueueId, EtsId, _HandlerPid}] = ets:lookup(wolfmq_queues, QueueId),
[{QueueId, EtsId, WorkerPid}] = ets:lookup(wolfmq_queues, QueueId),
QueueSize = ets:info(EtsId, size),
Tuple = {Now, Task},
true = ets:insert(EtsId, Tuple),
force_processing(WorkerPid, QueueSize).

force_processing(WorkerPid, 0) ->
wolfmq_worker:force_processing(WorkerPid);
force_processing(_, _) ->
ok.

%% erlang:system_time fallback functions
Expand Down
50 changes: 29 additions & 21 deletions src/wolfmq_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

%% API
-export([start_link/1, stop/1]).
-export([force_processing/1]).

%% gen_server callbacks
-export([init/1, terminate/2]).
Expand All @@ -11,11 +12,13 @@

-define(T, ?MODULE).
-record(state, {
activity_timer,
heartbeat_timer,
idle_timer,
heartbeat_timeout,
idle_timeout,
queue_id,
ets_id}).
ets_id
}).

%% API
start_link(Args) ->
Expand All @@ -24,19 +27,23 @@ start_link(Args) ->
stop(Pid) ->
gen_server:cast(Pid, stop).

force_processing(Pid) ->
Pid ! process_queue,
ok.

%% gen_server callbacks
init([QueueId]) ->
{ok, Secs} = application:get_env(wolfmq, idle_timeout),
IdleTimeout = timer:seconds(Secs),
HandlerPid = self(),
{ok, HeartbeatTimeout} = application:get_env(wolfmq, heartbeat_timeout),
{ok, IdleTimeout} = application:get_env(wolfmq, idle_timeout),
EtsId = ets:new(?T, [public, ordered_set, {write_concurrency, true}]),
ok = wolfmq_mgr:open_queue(QueueId, {EtsId, HandlerPid}),
{ok, ActivityTimerRef} = timer:send_after(1000, process_queue),
ok = wolfmq_mgr:open_queue(QueueId, {EtsId, self()}),
{ok, HeartbeatTimerRef} = timer:send_after(0, process_queue),
State = #state{
activity_timer = ActivityTimerRef,
idle_timeout = IdleTimeout,
queue_id = QueueId,
ets_id = EtsId
heartbeat_timer = HeartbeatTimerRef,
heartbeat_timeout = timer:seconds(HeartbeatTimeout),
idle_timeout = timer:seconds(IdleTimeout),
queue_id = QueueId,
ets_id = EtsId
},
{ok, State}.

Expand All @@ -51,23 +58,24 @@ handle_cast(stop, #state{ets_id = EtsId, queue_id = QueueId} = State) ->
ok = process_queue(EtsId),
true = ets:delete(EtsId),
{stop, normal, State}.

handle_info(process_queue, #state{activity_timer = ActivityTimerRef,
idle_timer = IdleTimerRef1, idle_timeout = IdleTimeout,
ets_id = EtsId} = State) ->
{ok, cancel} = timer:cancel(ActivityTimerRef),
EtsInfoList = ets:info(EtsId),
IdleTimerRef2 = case proplists:get_value(size, EtsInfoList) of

handle_info(process_queue, #state{heartbeat_timer = HeartbeatTimerRef,
idle_timer = IdleTimerRef1, idle_timeout = IdleTimeout,
heartbeat_timeout = HeartbeatTimeout, ets_id = EtsId} = State) ->
{ok, cancel} = timer:cancel(HeartbeatTimerRef),
IdleTimerRef2 = case ets:info(EtsId, size) of
Size when Size > 0 ->
{ok, cancel} = cancel_idle_timer(IdleTimerRef1),
ok = process_queue(EtsId),
undefined;
_ ->
start_idle_timer(IdleTimerRef1, IdleTimeout)
end,
{ok, ActivityTimerRef2} = timer:send_after(1000, process_queue),
Sate2 = State#state{activity_timer = ActivityTimerRef2,
idle_timer = IdleTimerRef2},
{ok, HeartbeatTimerRef2} = timer:send_after(HeartbeatTimeout, process_queue),
Sate2 = State#state{
heartbeat_timer = HeartbeatTimerRef2,
idle_timer = IdleTimerRef2
},
{noreply, Sate2};
handle_info(stop, #state{ets_id = EtsId, queue_id = QueueId} = State) ->
ok = wolfmq_mgr:close_queue(QueueId),
Expand Down

0 comments on commit b90a5f3

Please sign in to comment.