Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
feat: base for migrating oc_reporter to gen_event
Browse files Browse the repository at this point in the history
  • Loading branch information
hauleth committed Feb 10, 2019
1 parent b73a12f commit 5a67f73
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 96 deletions.
19 changes: 19 additions & 0 deletions src/oc_internal.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
%%%------------------------------------------------------------------------
%% Copyright 2017, OpenCensus Authors
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%%------------------------------------------------------------------------

-define(SPAN_TAB, oc_span_tab).

-define(SPAN_CTX, oc_span_ctx_key).
-define(TAG_CTX, oc_tag_ctx_key).
38 changes: 38 additions & 0 deletions src/oc_internal_timer.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-module(oc_internal_timer).

-callback ping() -> ok.

-export([start_link/1,
init/1,
handle_call/3,
handle_cast/2,
handle_info/2]).

-record(state, {timer :: reference(),
interval :: pos_integer(),
module :: module()}).

start_link(Opts) ->
gen_server:start_link(?MODULE, Opts, []).

init(Opts) ->
Interval = proplists:get_value(interval, Opts),
Module = proplists:get_value(module, Opts),
Ref = erlang:send_after(Interval, self(), ping),

{ok, #state{timer = Ref,
interval = Interval,
module = Module}}.

handle_call(_Msg, _From, State) -> {reply, ok, State}.

handle_cast(_Msg, State) -> {noreply, State}.

handle_info(ping, #state{timer = Ref, interval = Interval, module = Mod}) ->
_ = erlang:cancel_timer(Ref),
ok = Mod:ping(),
NewRef = erlang:send_after(Interval, self(), ping),

{noreply, #state{timer = NewRef,
interval = Interval,
module = Mod}}.
13 changes: 10 additions & 3 deletions src/oc_reporter_stdout.erl
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
-module(oc_reporter_stdout).

-behaviour(gen_event).

-export([init/1,
report/2]).
handle_call/2,
handle_event/2]).

init(_) ->
ok.

report(Spans, _) ->
[io:format("~p~n", [Span]) || Span <- Spans].
handle_call(_Msg, State) -> {ok, ok, State}.

handle_event({spans, Spans}, State) ->
[io:format("~p~n", [Span]) || Span <- Spans],

{ok, State}.
29 changes: 28 additions & 1 deletion src/oc_trace.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@

message_event/4,

set_status/3]).
set_status/3,

add_handler/1,
add_handler/2,
delete_handler/1]).

-dialyzer({nowarn_function, update_trace_options/2}).

Expand Down Expand Up @@ -327,6 +331,29 @@ link(LinkType, TraceId, SpanId, Attributes) ->
span_id=SpanId,
attributes=Attributes}.

%%--------------------------------------------------------------------
%% @doc
%% @equiv add_handler(Handler, []).
%% @end
%%--------------------------------------------------------------------
add_handler(Handler) -> add_handler(Handler, []).

%%--------------------------------------------------------------------
%% @doc
%% Add new handler
%% @end
%%--------------------------------------------------------------------
add_handler(Handler, Args) ->
gen_event:add_handler(oc_trace_reporter, Handler, Args).

%%--------------------------------------------------------------------
%% @doc
%% Delete handler
%% @end
%%--------------------------------------------------------------------
delete_handler(Handler) ->
gen_event:delete_handler(oc_trace_reporter, Handler, []).

%% Internal functions

lookup_and_replace(#span_ctx{span_id=SpanId,
Expand Down
119 changes: 27 additions & 92 deletions src/oc_reporter.erl → src/oc_trace_reporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,14 @@
%% and creates the buffer of trace spans to be reported.
%% @end
%%%-----------------------------------------------------------------------
-module(oc_reporter).
-module(oc_trace_reporter).

-behaviour(gen_server).
-behaviour(oc_internal_timer).

-compile({no_auto_import, [register/2]}).
-export([start_link/1,
store_span/1]).

-export([start_link/0,
store_span/1,
register/1,
register/2]).

-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3,
terminate/2]).
-export([ping/0]).

-include("opencensus.hrl").
-include("oc_logger.hrl").
Expand All @@ -50,29 +41,20 @@
%% until it returns.
-callback report(nonempty_list(opencensus:span()), opts()) -> ok.

-record(state, {reporters :: [{module(), term()}],
send_interval_ms :: integer(),
timer_ref :: reference()}).

-define(BUFFER_1, oc_report_buffer1).
-define(BUFFER_2, oc_report_buffer2).
-define(BUFFER_STATUS, oc_report_status).

start_link() ->
start_link(Handlers) ->
maybe_init_ets(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
case gen_event:start_link({local, ?MODULE}, []) of
{ok, Pid} ->
[gen_event:add_handler(Pid, Handler, Opts)
|| {Handler, Opts} <- Handlers],

%% @doc
%% @equiv register(Reporter, []).
%% @end
register(Reporter) -> register(Reporter, []).

%% @doc
%% Register new traces reporter `Reporter' with `Config'.
%% @end
-spec register(module(), term()) -> ok.
register(Reporter, Options) ->
gen_server:call(?MODULE, {register, init_reporter({Reporter, Options})}).
{ok, Pid};
Other -> Other
end.

-spec store_span(opencensus:span()) -> true | {error, invalid_span} | {error, no_report_buffer}.
store_span(Span=#span{}) ->
Expand All @@ -86,55 +68,7 @@ store_span(Span=#span{}) ->
store_span(_) ->
{error, invalid_span}.

init(_Args) ->
SendInterval = application:get_env(opencensus, send_interval_ms, 500),
Reporters = [init_reporter(Config) || Config <- application:get_env(opencensus, reporters, [])],
Ref = erlang:send_after(SendInterval, self(), report_spans),
{ok, #state{reporters=Reporters,
send_interval_ms=SendInterval,
timer_ref=Ref}}.

handle_call({register, Reporter}, _From, #state{reporters=Reporters} = State) ->
{reply, ok, State#state{reporters=[Reporter | Reporters]}};
handle_call(_, _From, State) ->
{noreply, State}.

handle_cast(_, State) ->
{noreply, State}.

handle_info(report_spans, State=#state{reporters=Reporters,
send_interval_ms=SendInterval,
timer_ref=Ref}) ->
erlang:cancel_timer(Ref),
Ref1 = erlang:send_after(SendInterval, self(), report_spans),
send_spans(Reporters),
{noreply, State#state{timer_ref=Ref1}}.

code_change(_, State, _) ->
{ok, State}.

terminate(_, #state{timer_ref=Ref}) ->
erlang:cancel_timer(Ref),
ok.

init_reporter({Reporter, Config}) ->
{Reporter, Reporter:init(Config)};
init_reporter(Reporter) when is_atom(Reporter) ->
{Reporter, Reporter:init([])}.

maybe_init_ets() ->
case ets:info(?BUFFER_STATUS, name) of
undefined ->
[ets:new(Tab, [named_table, public | TableProps ]) ||
{Tab, TableProps} <- [{?BUFFER_1, [{write_concurrency, true}, {keypos, #span.span_id}]},
{?BUFFER_2, [{write_concurrency, true}, {keypos, #span.span_id}]},
{?BUFFER_STATUS, [{read_concurrency, true}]}]],
ets:insert(?BUFFER_STATUS, {current_buffer, ?BUFFER_1});
_ ->
ok
end.

send_spans(Reporters) ->
ping() ->
[{_, Buffer}] = ets:lookup(?BUFFER_STATUS, current_buffer),
NewBuffer = case Buffer of
?BUFFER_1 ->
Expand All @@ -148,19 +82,20 @@ send_spans(Reporters) ->
ok;
Spans ->
ets:delete_all_objects(Buffer),
[report(Reporter, Spans, Config)
|| {Reporter, Config} <- Reporters],
gen_event:sync_notify(?MODULE, {spans, Spans}),
ok
end.

report(undefined, _, _) ->
ok;
report(Reporter, Spans, Config) ->
%% don't let a reporter exception crash us
try
Reporter:report(Spans, Config)
catch
?WITH_STACKTRACE(Class, Exception, StackTrace)
?LOG_INFO("reporter threw exception: reporter=~p ~p:~p stacktrace=~p",
[Reporter, Class, Exception, StackTrace])
maybe_init_ets() ->
case ets:info(?BUFFER_STATUS, name) of
undefined ->
[ets:new(Tab, [named_table, public | TableProps]) ||
{Tab, TableProps} <- [{?BUFFER_1, [{write_concurrency, true}, {keypos, #span.span_id}]},
{?BUFFER_2, [{write_concurrency, true}, {keypos, #span.span_id}]},
{?BUFFER_STATUS, [{read_concurrency, true}]}]],
ets:insert(?BUFFER_STATUS, {current_buffer, ?BUFFER_1}),

ok;
_ ->
ok
end.
52 changes: 52 additions & 0 deletions src/oc_trace_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
%%%------------------------------------------------------------------------
%% Copyright 2017, OpenCensus Authors
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%%------------------------------------------------------------------------

-module(oc_trace_sup).

-behaviour(supervisor).

-export([start_link/1, init/1]).

-include("opencensus.hrl").

start_link(Opts) ->
supervisor:start_link(?MODULE, Opts).

init(Opts) ->
Interval = proplists:get_value(Opts, interval, 500),
Handlers = proplists:get_value(Opts, handlers, []),

Exporter = #{id => exporter,
start => {oc_trace_reporter, start_link, [Handlers]}},
% TODO: Rename oc_span_sweeper to oc_trace_sweeper
Sweeper = #{id => sweeper,
start => {oc_span_sweeper, start_link, []}},
Timer = #{id => timer,
start => {oc_internal_timer, start_link, [{interval, Interval},
{module, oc_trace_reporter}]}
},

ok = maybe_init_span_tab(),

{ok, {#{strategy => one_for_one}, [Exporter, Timer, Sweeper]}}.

maybe_init_span_tab() ->
case ets:info(?SPAN_TAB, name) of
undefined ->
ets:new(?SPAN_TAB, [named_table, public, {write_concurrency, true}, {keypos, #span.span_id}]),
ok;
_ ->
ok
end.

0 comments on commit 5a67f73

Please sign in to comment.