diff --git a/.gitignore b/.gitignore index b1a1f24f43..811033d910 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ _build *.iml rebar3.crashdump data/ +!src/data/ .DS_Store src/pb/ src/grpc/autogen diff --git a/src/blockchain_rocks.erl b/src/blockchain_rocks.erl new file mode 100644 index 0000000000..26f28d982e --- /dev/null +++ b/src/blockchain_rocks.erl @@ -0,0 +1,116 @@ +-module(blockchain_rocks). + +-export([ + fold/4, + fold/5, + foreach/3, + foreach/4, + stream/2, + stream/3, + sample/3, + sample/4 +]). + +%% API ======================================================================== + +-spec fold( + rocksdb:db_handle(), + rocksdb:read_options(), + Acc, + fun(({K :: binary(), V :: binary()}, Acc) -> Acc) +) -> + Acc. +fold(DB, Opts, Acc, F) -> + data_stream:fold(stream(DB, Opts), Acc, F). + +-spec fold( + rocksdb:db_handle(), + rocksdb:cf_handle(), + rocksdb:read_options(), + Acc, + fun(({K :: binary(), V :: binary()}, Acc) -> Acc) +) -> + Acc. +fold(DB, CF, Opts, Acc, F) -> + data_stream:fold(stream(DB, CF, Opts), Acc, F). + +-spec foreach( + rocksdb:db_handle(), + rocksdb:read_options(), + fun(({K :: binary(), V :: binary()}) -> ok) +) -> + ok. +foreach(DB, Opts, F) -> + data_stream:foreach(stream(DB, Opts), F). + +-spec foreach( + rocksdb:db_handle(), + rocksdb:cf_handle(), + rocksdb:read_options(), + fun(({K :: binary(), V :: binary()}) -> ok) +) -> + ok. +foreach(DB, CF, Opts, F) -> + data_stream:foreach(stream(DB, CF, Opts), F). + +-spec stream(rocksdb:db_handle(), rocksdb:read_options()) -> + data_stream:t({K :: binary(), V :: binary()}). +stream(DB, Opts) -> + stream_(fun () -> rocksdb:iterator(DB, Opts) end). + +-spec stream( + rocksdb:db_handle(), + rocksdb:cf_handle(), + rocksdb:read_options() +) -> + data_stream:t({K :: binary(), V :: binary()}). +stream(DB, CF, Opts) -> + stream_(fun () -> rocksdb:iterator(DB, CF, Opts) end). + +%% @doc Select K random records from database. +-spec sample(rocksdb:db_handle(), rocksdb:read_options(), pos_integer()) -> + [{K :: binary(), V :: binary()}]. +sample(DB, Opts, K) -> + Stream = stream(DB, Opts), + data_stream:sample(Stream, K). + +%% @doc Select K random records from CF. +-spec sample( + rocksdb:db_handle(), + rocksdb:cf_handle(), + rocksdb:read_options(), + pos_integer() +) -> + [{K :: binary(), V :: binary()}]. +sample(DB, CF, Opts, K) -> + Stream = stream(DB, CF, Opts), + data_stream:sample(Stream, K). + +%% Internal =================================================================== + +-spec stream_(fun(() -> {ok, rocksdb:itr_handle()} | {error, term()})) -> + data_stream:t({K :: binary(), V :: binary()}). +stream_(IterOpen) -> + case IterOpen() of + {error, Reason} -> + error({blockchain_rocks_iter_make, Reason}); + {ok, Iter} -> + Move = + fun Move_ (Target) -> + fun () -> + case rocksdb:iterator_move(Iter, Target) of + {ok, K, V} -> + {some, {{K, V}, Move_(next)}}; + {error, invalid_iterator} -> + ok = rocksdb:iterator_close(Iter), + none; + Error -> + error({blockchain_rocks_iter_move, Target, Error}) + end + end + end, + data_stream:from_fun(Move(first)) + end. + +%% Test ======================================================================= +%% See test/blockchain_rocks_SUITE.erl diff --git a/src/blockchain_term.erl b/src/blockchain_term.erl index 4774efccaa..83127bddd1 100644 --- a/src/blockchain_term.erl +++ b/src/blockchain_term.erl @@ -12,7 +12,6 @@ -export_type([ t/0, result/0, - stream/1, % TODO Find stream def a better home module than this one. error/0, frame/0, unsound/0, @@ -105,7 +104,7 @@ -include("blockchain_term.hrl"). -%% TODO Maybe use a map? +%% TODO Switch to a map or a record? -type file_handle() :: { file:fd(), @@ -113,42 +112,41 @@ Len :: pos_integer() }. --type stream(A) :: fun(() -> none | {some, {A, stream(A)}}). - -spec from_bin(binary()) -> result(). from_bin(<>) -> envelope(Bin). %% Tries to stream a list of binaries from file. -%% TODO Generalize. -spec from_file_stream_bin_list(file_handle()) -> - stream({ok, binary()} | {error, term()}). + data_stream:t({ok, binary()} | {error, term()}). from_file_stream_bin_list({Fd, Pos, Len}) -> {ok, Pos} = file:position(Fd, {bof, Pos}), - case file:read(Fd, 6) of - {ok, <>} -> - stream_bin_list_elements(N, {Fd, Pos + 6, Len}); - {ok, <>} -> - fun () -> {some, {{error, {bad_etf_version_and_tag_and_len, V}}, stream_end()}} end; - {error, _}=Err -> - fun () -> {some, {Err, stream_end()}} end - end. + Next = + case file:read(Fd, 6) of + {ok, <>} -> + next_bin_list_elements(N, {Fd, Pos + 6, Len}); + {ok, <>} -> + fun () -> {some, {{error, {bad_etf_version_and_tag_and_len, V}}, next_end()}} end; + {error, _}=Err -> + fun () -> {some, {Err, next_end()}} end + end, + data_stream:from_fun(Next). --spec stream_bin_list_elements(non_neg_integer(), file_handle()) -> - stream({ok, binary()} | {error, term()}). -stream_bin_list_elements(0, {Fd, Pos, _}) -> +-spec next_bin_list_elements(non_neg_integer(), file_handle()) -> + data_stream:next({ok, binary()} | {error, term()}). +next_bin_list_elements(0, {Fd, Pos, _}) -> fun () -> {ok, Pos} = file:position(Fd, {bof, Pos}), case file:read(Fd, 1) of {ok, <>} -> none; {ok, <<_/binary>>} -> - {some, {{error, bad_bin_list_nil_tag}, stream_end()}}; + {some, {{error, bad_bin_list_nil_tag}, next_end()}}; {error, _}=Err -> - {some, {Err, stream_end()}} + {some, {Err, next_end()}} end end; -stream_bin_list_elements(N, {Fd, Pos0, L}) -> +next_bin_list_elements(N, {Fd, Pos0, L}) -> fun () -> {ok, Pos1} = file:position(Fd, {bof, Pos0}), case file:read(Fd, 5) of @@ -156,18 +154,20 @@ stream_bin_list_elements(N, {Fd, Pos0, L}) -> {ok, Pos2} = file:position(Fd, {bof, Pos1 + 5}), case file:read(Fd, Len) of {ok, <>} -> - {some, {Bin, stream_bin_list_elements(N - 1, {Fd, Pos2 + Len, L})}}; + {some, {Bin, next_bin_list_elements(N - 1, {Fd, Pos2 + Len, L})}}; {error, _}=Err -> - {some, {Err, stream_end()}} + {some, {Err, next_end()}} end; {ok, <<_/binary>>} -> - {some, {{error, bad_bin_list_element}, stream_end()}}; + {some, {{error, bad_bin_list_element}, next_end()}}; {error, _}=Err -> - {some, {Err, stream_end()}} + {some, {Err, next_end()}} end end. -stream_end() -> +-spec next_end() -> + data_stream:next({ok, binary()} | {error, term()}). +next_end() -> fun () -> none end. %% TODO -spec from_bin_with_contract(binary(), blockchain_contract:t()) -> diff --git a/src/blockchain_utils.erl b/src/blockchain_utils.erl index ec7179462f..b9b0f57ba9 100644 --- a/src/blockchain_utils.erl +++ b/src/blockchain_utils.erl @@ -10,6 +10,7 @@ -include("blockchain_vars.hrl"). -export([ + cpus/0, shuffle_from_hash/2, shuffle/1, shuffle/2, rand_from_hash/1, rand_state/1, @@ -302,6 +303,7 @@ validation_width() -> N end. +-spec cpus() -> non_neg_integer(). cpus() -> Ct = erlang:system_info(schedulers_online), max(2, ceil(Ct/2) + 1). diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl new file mode 100644 index 0000000000..480428d6c1 --- /dev/null +++ b/src/data/data_stream.erl @@ -0,0 +1,578 @@ +-module(data_stream). + +-export_type([ + next/1, + t/1 +]). + +-export([ + next/1, + from_fun/1, + from_list/1, + to_list/1, + append/2, + foreach/2, + fold/3, + map/2, % Alias for lazy_map. + filter/2, % Alias for lazy_filter. + lazy_map/2, + lazy_filter/2, + pmap_to_bag/2, + pmap_to_bag/3, + sample/2 +]). + +-type reservoir(A) :: #{pos_integer() => A}. + +-type filter(A, B) + :: {map, fun((A) -> B)} + | {test, fun((A) -> boolean())} + . + +-type next(A) :: fun(() -> none | {some, {A, next(A)}}). + +-record(stream, { + next :: next(any()), + filters :: [filter(any(), any())] +}). + +-type stream(A) :: + %% XXX Record syntax does not support type parameters, so we get around it with desugaring. + %% XXX Ensure the field order is the same as in the corresponding record! + { + stream, + next(A), + [filter(A, any())] + }. + +-opaque t(A) :: + %% Our stream is a sequence of streams + %% in order to support the append operation. + [stream(A), ...]. + +-record(sched, { + id :: reference(), + producers :: [{pid(), reference()}], + consumers :: [{pid(), reference()}], + consumers_free :: [pid()], % available to work. + work :: [any()], % received from producers. + results :: [any()] % received from consumers. +}). + +%% API ======================================================================== + +-spec from_fun(next(A)) -> t(A). +from_fun(Next) -> + [#stream{next = Next, filters = []}]. + +-spec append(t(A), t(A)) -> t(A). +append([#stream{} | _]=TA, [#stream{} | _]=TB) -> + TA ++ TB. + +-spec next(t(A)) -> none | {some, {A, t(A)}}. +next([#stream{next=Next0, filters=Filters}=S | Streams]) when is_function(Next0) -> + case Next0() of + none -> + case Streams of + [] -> none; + [_|_] -> next(Streams) + end; + {some, {X, Next1}} when is_function(Next1) -> + T1 = [S#stream{next=Next1} | Streams], + case filters_apply(X, Filters) of + none -> + next(T1); + {some, Y} -> + {some, {Y, T1}} + end + end. + +map(T, F) -> + lazy_map(T, F). + +filter(T, F) -> + lazy_filter(T, F). + +-spec lazy_map(t(A), fun((A) -> B)) -> t(B). +lazy_map([#stream{filters=Filters}=S | Streams], F) -> + [S#stream{filters=Filters ++ [{map, F}]} | Streams]. + +-spec lazy_filter(t(A), fun((A) -> boolean())) -> t(A). +lazy_filter([#stream{filters=Filters}=S | Streams], F) -> + [S#stream{filters=Filters ++ [{test, F}]} | Streams]. + +-spec fold(t(A), B, fun((A, B) -> B)) -> B. +fold(T0, Acc, F) -> + case next(T0) of + none -> + Acc; + {some, {X, T1}} -> + fold(T1, F(X, Acc), F) + end. + +-spec foreach(t(A), fun((A) -> ok)) -> ok. +foreach(T0, F) -> + case next(T0) of + none -> + ok; + {some, {X, T1}} -> + F(X), + foreach(T1, F) + end. + +-spec from_list([A]) -> t(A). +from_list(Xs) -> + from_fun(from_list_(Xs)). + +-spec from_list_([A]) -> next(A). +from_list_([]) -> + fun () -> none end; +from_list_([X | Xs]) -> + fun () -> {some, {X, from_list_(Xs)}} end. + +-spec to_list(t(A)) -> [A]. +to_list(T0) -> + case next(T0) of + none -> + []; + {some, {X, T1}} -> + [X | to_list(T1)] + end. + +%% A pmap which doesn't preserve order. +-spec pmap_to_bag(t(A), fun((A) -> B)) -> [B]. +pmap_to_bag(Xs, F) when is_function(F) -> + pmap_to_bag(Xs, F, blockchain_utils:cpus()). + +-spec pmap_to_bag(t(A), fun((A) -> B), non_neg_integer()) -> [B]. +pmap_to_bag(T, F, J) when is_function(F), is_integer(J), J > 0 -> + CallerPid = self(), + SchedID = make_ref(), + Scheduler = + fun () -> + SchedPid = self(), + Consumer = + fun Consume () -> + ConsumerPid = self(), + SchedPid ! {SchedID, consumer_ready, ConsumerPid}, + receive + {SchedID, job, X} -> + Y = F(X), + SchedPid ! {SchedID, consumer_output, Y}, + Consume(); + {SchedID, done} -> + ok + end + end, + Producer = + fun () -> + %% XXX Producer is racing against consumers. + %% + %% This hasn't (yet) caused a problem, but in theory it is + %% bad: producer is pouring into the scheduler's queue as + %% fast as possible, potentially faster than consumers can + %% pull from it, so heap usage could explode. + %% + %% Solution ideas: + %% A. have the scheduler call the producer whenever more + %% work is asked for, but ... that can block the + %% scheduler, starving consumers; + %% B. produce in (configurable size) batches, pausing + %% production when batch is full and resuming when not + %% (this is probably the way to go). + ok = foreach(T, fun (X) -> SchedPid ! {SchedID, producer_output, X} end) + end, + Ys = + sched(#sched{ + id = SchedID, + producers = [spawn_monitor(Producer)], + consumers = [spawn_monitor(Consumer) || _ <- lists:duplicate(J, {})], + consumers_free = [], + work = [], + results = [] + }), + CallerPid ! {SchedID, Ys} + end, + %% XXX Scheduling from a dedicated process to avoid conflating our 'DOWN' + %% messages (from producers and consumers) with those of the caller + %% process. + {SchedPid, SchedMonRef} = spawn_monitor(Scheduler), + %% TODO timeout? + receive + {SchedID, Ys} -> + receive + {'DOWN', SchedMonRef, process, SchedPid, normal} -> + Ys + end; + {'DOWN', SchedMonRef, process, SchedPid, Reason} -> + error({data_stream_scheduler_crashed_before_sending_results, Reason}) + end. + +-spec sample(t(A), non_neg_integer()) -> [A]. +sample(_, 0) -> []; +sample(T, K) when K > 0 -> + {_N, Reservoir} = reservoir_sample(T, #{}, K), + [X || {_, X} <- maps:to_list(Reservoir)]. + +%% Internal =================================================================== + +%% @doc +%% The optimal reservoir sampling algorithm. Known as "Algorithm L" in: +%% https://dl.acm.org/doi/pdf/10.1145/198429.198435 +%% https://en.wikipedia.org/wiki/Reservoir_sampling#An_optimal_algorithm +%% @end +-spec reservoir_sample(t(A), reservoir(A), pos_integer()) -> + {pos_integer(), reservoir(A)}. +reservoir_sample(T0, R0, K) -> + case reservoir_sample_init(T0, R0, 1, K) of + {none, R1, I} -> + {I, R1}; + {{some, T1}, R1, I} -> + W = random_weight_init(K), + J = random_index_next(I, W), + reservoir_sample_update(T1, R1, W, I, J, K) + end. + +-spec reservoir_sample_init(t(A), reservoir(A), pos_integer(), pos_integer()) -> + {none | {some, A}, reservoir(A), pos_integer()}. +reservoir_sample_init(T0, R, I, K) -> + case I > K of + true -> + {{some, T0}, R, I - 1}; + false -> + case next(T0) of + {some, {X, T1}} -> + reservoir_sample_init(T1, R#{I => X}, I + 1, K); + none -> + {none, R, I - 1} + end + end. + +-spec random_weight_init(pos_integer()) -> float(). +random_weight_init(K) -> + math:exp(math:log(rand:uniform()) / K). + +-spec random_weight_next(float(), pos_integer()) -> float(). +random_weight_next(W, K) -> + W * random_weight_init(K). + +-spec random_index_next(pos_integer(), float()) -> pos_integer(). +random_index_next(I, W) -> + I + floor(math:log(rand:uniform()) / math:log(1 - W)) + 1. + +-spec reservoir_sample_update( + t(A), + reservoir(A), + float(), + pos_integer(), + pos_integer(), + pos_integer() +) -> + {pos_integer(), reservoir(A)}. +reservoir_sample_update(T0, R0, W0, I0, J0, K) -> + case next(T0) of + none -> + {I0, R0}; + {some, {X, T1}} -> + I1 = I0 + 1, + case I0 =:= J0 of + true -> + R1 = R0#{rand:uniform(K) => X}, + W1 = random_weight_next(W0, K), + J1 = random_index_next(J0, W0), + reservoir_sample_update(T1, R1, W1, I1, J1, K); + false -> + % Here is where the big win takes place over the simple + % Algorithm R. We skip computing random numbers for an + % element that will not be picked. + reservoir_sample_update(T1, R0, W0, I1, J0, K) + end + end. + +-spec sched(#sched{}) -> [any()]. +sched(#sched{id=_, producers=[], consumers=[], consumers_free=[], work=[], results=Ys}) -> + Ys; +sched(#sched{id=ID, producers=[], consumers=[_|_], consumers_free=[_|_]=CsFree, work=[]}=S0) -> + _ = [C ! {ID, done} || C <- CsFree], + sched(S0#sched{consumers_free=[]}); +sched(#sched{id=_, producers=_, consumers=[_|_], consumers_free=[_|_], work=[_|_]}=S0) -> + S1 = sched_assign(S0), + sched(S1); +sched(#sched{id=ID, producers=Ps, consumers=_, consumers_free=CsFree, work=Xs, results=Ys }=S) -> + receive + {ID, producer_output, X} -> sched(S#sched{work=[X | Xs]}); + {ID, consumer_output, Y} -> sched(S#sched{results=[Y | Ys]}); + {ID, consumer_ready, C} -> sched(S#sched{consumers_free=[C | CsFree]}); + {'DOWN', MonRef, process, Pid, normal} -> + S1 = sched_remove_worker(S, {Pid, MonRef}), + sched(S1); + {'DOWN', MonRef, process, Pid, Reason} -> + case lists:member({Pid, MonRef}, Ps) of + true -> error({?MODULE, pmap_to_bag, producer_crash, Reason}); + false -> error({?MODULE, pmap_to_bag, consumer_crash, Reason}) + end + end. + +-spec sched_remove_worker(#sched{}, {pid(), reference()}) -> #sched{}. +sched_remove_worker(#sched{producers=Ps, consumers=Cs, consumers_free=CsFree}=S, {Pid, _}=PidRef) -> + case lists:member(PidRef, Ps) of + true -> + S#sched{producers = Ps -- [PidRef]}; + false -> + S#sched{ + consumers = Cs -- [PidRef], + consumers_free = CsFree -- [Pid] + } + end. + +-spec sched_assign(#sched{}) -> #sched{}. +sched_assign(#sched{consumers_free=[], work=Xs}=S) -> S#sched{consumers_free=[], work=Xs}; +sched_assign(#sched{consumers_free=Cs, work=[]}=S) -> S#sched{consumers_free=Cs, work=[]}; +sched_assign(#sched{consumers_free=[C | Cs], work=[X | Xs], id=ID}=S) -> + C ! {ID, job, X}, + sched_assign(S#sched{consumers_free=Cs, work=Xs}). + +-spec filters_apply(A, [filter(A, B)]) -> none | {some, B}. +filters_apply(X, Filters) -> + lists:foldl( + fun (_, none) -> + none; + (F, {some, Y}) -> + case F of + {map, Map} -> + {some, Map(Y)}; + {test, Test} -> + case Test(Y) of + true -> + {some, Y}; + false -> + none + end + end + end, + {some, X}, + Filters + ). + +%% Tests ====================================================================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +pmap_to_bag_test_() -> + NonDeterminism = fun (N) -> timer:sleep(rand:uniform(N)) end, + FromListWithNonDeterminism = + fun (N) -> + fun (Xs) -> + lazy_map(from_list(Xs), fun (X) -> NonDeterminism(N), X end) + end + end, + Tests = + [ + begin + G = fun (X) -> NonDeterminism(ConsumerDelay), F(X) end, + Test = + ?_assertEqual( + lists:sort(lists:map(G, Xs)), + lists:sort(pmap_to_bag( + (FromListWithNonDeterminism(ProducerDelay))(Xs), + G, + J + )) + ), + Timeout = 1000 + ProducerDelay + (ConsumerDelay * J), + Name = lists:flatten(io_lib:format( + "#Xs: ~p, J: ~p, ProducerDelay: ~p, ConsumerDelay: ~p, Timeout: ~p", + [length(Xs), J, ProducerDelay, ConsumerDelay, Timeout] + )), + {Name, {timeout, Timeout, Test}} + end + || + J <- lists:seq(1, 16), + F <- [ + fun (X) -> {X, X} end, + fun (X) -> X * 2 end + ], + Xs <- [ + lists:seq(1, 100) + ], + {ProducerDelay, ConsumerDelay} <- + begin + Lo = 1, + Hi = 10, + [ + {Hi, Lo}, % slow producer, fast consumer + {Lo, Hi}, % fast producer, slow consumer + {Lo, Lo}, % both fast + {Hi, Hi} % both slow + ] + end + ], + {inparallel, Tests}. + +round_trip_test_() -> + [ + ?_assertEqual(Xs, to_list(from_list(Xs))) + || + Xs <- [ + [1, 2, 3], + [a, b, c], + [<<>>, <<"foo">>, <<"bar">>, <<"baz">>, <<"qux">>] + ] + ]. + +lazy_map_test_() -> + Double = fun (X) -> X * 2 end, + [ + ?_assertEqual( + lists:map(Double, Xs), + to_list(lazy_map(from_list(Xs), Double)) + ) + || + Xs <- [ + [1, 2, 3, 4, 5] + ] + ]. + +lazy_filter_test_() -> + IsEven = fun (X) -> 0 =:= X rem 2 end, + [ + ?_assertEqual( + lists:filter(IsEven, Xs), + to_list(lazy_filter(from_list(Xs), IsEven)) + ) + || + Xs <- [ + [1, 2, 3, 4, 5] + ] + ]. + +lazy_filters_compose_test_() -> + IsMultOf = fun (M) -> fun (N) -> 0 =:= N rem M end end, + Double = fun (N) -> N * 2 end, + [ + ?_assertEqual( + begin + L0 = Xs, + L1 = lists:filter(IsMultOf(2), L0), + L2 = lists:map(Double, L1), + L3 = lists:filter(IsMultOf(3), L2), + L3 + end, + to_list( + begin + S0 = from_list(Xs), + S1 = lazy_filter(S0, IsMultOf(2)), + S2 = lazy_map(S1, Double), + S3 = lazy_filter(S2, IsMultOf(3)), + S3 + end + ) + ) + || + Xs <- [ + lists:seq(1, 10), + lists:seq(1, 100), + lists:seq(1, 100, 3) + ] + ]. + +fold_test_() -> + [ + ?_assertEqual( + lists:foldl(F, Acc, Xs), + fold(from_list(Xs), Acc, F) + ) + || + {Acc, F} <- [ + {0, fun erlang:'+'/2}, + {[], fun (X, Xs) -> [X | Xs] end} + ], + Xs <- [ + [1, 2, 3, 4, 5] + ] + ]. + +append_test_() -> + [ + ?_assertEqual( + [1, 2, 3, 4, 5], + to_list(append(from_list([1, 2]), from_list([3, 4, 5]))) + ), + ?_assertEqual( + [1, 2, 3, 4, 5, 6, 7, 8], + to_list( + append( + append(from_list([1, 2]), from_list([3, 4, 5])), + from_list([6, 7, 8])) + ) + ), + ?_assertEqual( + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + to_list( + append( + append( + from_list([1, 2]), + append( + append(from_list([3]), from_list([4])), + from_list([5]) + ) + ), + append( + from_list([6, 7]), + append( + from_list([8]), + from_list([9, 10]) + ) + ) + ) + ) + ) + ]. + +random_elements_test_() -> + TestCases = + [ + ?_assertMatch([a], sample(from_list([a]), 1)), + ?_assertEqual(0, length(sample(from_list([]), 1))), + ?_assertEqual(0, length(sample(from_list([]), 10))), + ?_assertEqual(0, length(sample(from_list([]), 100))), + ?_assertEqual(1, length(sample(from_list(lists:seq(1, 100)), 1))), + ?_assertEqual(2, length(sample(from_list(lists:seq(1, 100)), 2))), + ?_assertEqual(3, length(sample(from_list(lists:seq(1, 100)), 3))), + ?_assertEqual(5, length(sample(from_list(lists:seq(1, 100)), 5))) + | + [ + (fun () -> + Trials = 10, + K = floor(N * KF), + L = lists:seq(1, N), + S = from_list(L), + Rands = + [ + sample(S, K) + || + _ <- lists:duplicate(Trials, {}) + ], + Head = lists:sublist(L, K), + Unique = lists:usort(Rands) -- [Head], + Name = + lists:flatten(io_lib:format( + "At least 1/~p of trials makes a new sequence. " + "N:~p K:~p KF:~p length(Unique):~p", + [Trials, N, K, KF, length(Unique)] + )), + {Name, ?_assertMatch([_|_], Unique)} + end)() + || + N <- lists:seq(10, 100), + KF <- [ + 0.25, + 0.50, + 0.75 + ] + ] + ], + {inparallel, TestCases}. + +-endif. diff --git a/src/ledger/v1/blockchain_ledger_snapshot_v1.erl b/src/ledger/v1/blockchain_ledger_snapshot_v1.erl index e7955c74bb..56f68b95a9 100644 --- a/src/ledger/v1/blockchain_ledger_snapshot_v1.erl +++ b/src/ledger/v1/blockchain_ledger_snapshot_v1.erl @@ -52,7 +52,7 @@ -type kv_stream() :: kv_stream(binary(), binary()). -%% TODO Should be: -type stream(A) :: fun(() -> none | {some, {A, t(A)}}). +%% TODO Convert to data_stream:t/1 -type kv_stream(K, V) :: fun(() -> ok | {K, V, kv_stream()}). @@ -660,13 +660,14 @@ load_blocks(Ledger0, Chain, Snapshot) -> Infos = case maps:find(infos, Snapshot) of {ok, Is} when is_binary(Is) -> - stream_from_list(binary_to_term(Is)); + data_stream:from_list(binary_to_term(Is)); {ok, {_, _, _}=InfoFileHandle} -> blockchain_term:from_file_stream_bin_list(InfoFileHandle); error -> - stream_from_list([]) + data_stream:from_list([]) end, - stream_iter( + data_stream:foreach( + Infos, fun(Bin) -> case binary_to_term(Bin) of ({Ht, #block_info{hash = Hash} = Info}) -> @@ -676,8 +677,8 @@ load_blocks(Ledger0, Chain, Snapshot) -> ok = blockchain:put_block_height(Hash, Ht, Chain), ok = blockchain:put_block_info(Ht, Info, Chain) end - end, - Infos), + end + ), print_memory(), lager:info("loading blocks"), BlockStream = @@ -687,11 +688,11 @@ load_blocks(Ledger0, Chain, Snapshot) -> print_memory(), %% use a custom decoder here to preserve sub binary references {ok, Blocks0} = blockchain_term:from_bin(Bs), - stream_from_list(Blocks0); + data_stream:from_list(Blocks0); {ok, {_, _, _}=FileHandle} -> blockchain_term:from_file_stream_bin_list(FileHandle); error -> - stream_from_list([]) + data_stream:from_list([]) end, print_memory(), @@ -699,7 +700,8 @@ load_blocks(Ledger0, Chain, Snapshot) -> lager:info("ledger height is ~p before absorbing snapshot", [Curr2]), - stream_iter( + data_stream:foreach( + BlockStream, fun(Res) -> Block0 = case Res of @@ -741,24 +743,8 @@ load_blocks(Ledger0, Chain, Snapshot) -> _ -> ok end - end, - BlockStream). - --spec stream_iter(fun((A) -> ok), blockchain_term:stream(A)) -> ok. -stream_iter(F, S0) -> - case S0() of - none -> - ok; - {some, {X, S1}} -> - F(X), - stream_iter(F, S1) - end. - --spec stream_from_list([A]) -> blockchain_term:stream(A). -stream_from_list([]) -> - fun () -> none end; -stream_from_list([X | Xs]) -> - fun () -> {some, {X, stream_from_list(Xs)}} end. + end + ). -spec get_infos(blockchain:blockchain()) -> [binary()]. @@ -1548,6 +1534,7 @@ bin_pair_to_iolist({<>, V}) -> V ]. +%% TODO Convert to data_stream:t/1 mk_bin_iterator(<<>>) -> fun() -> ok end; mk_bin_iterator(< fun() -> ok end; mk_file_iterator(FD, Pos, End) when Pos < End -> diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl new file mode 100644 index 0000000000..12b310f6a9 --- /dev/null +++ b/test/blockchain_rocks_SUITE.erl @@ -0,0 +1,129 @@ +-module(blockchain_rocks_SUITE). + +%% CT +-export([ + all/0, + init_per_suite/1, + end_per_suite/1 +]). + +%% Test cases +-export([ + t_fold/1, + t_sample_sanity_check/1, + t_sample/1, + t_sample_filtered/1, + t_stream_mapped_and_filtered/1 +]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +%% CT ========================================================================= + +all() -> + [ + t_fold, + t_sample_sanity_check, + t_sample, + t_sample_filtered, + t_stream_mapped_and_filtered + ]. + +%%% TODO CF and non-CF groups + +init_per_suite(Cfg) -> + NumRecords = 10_000, + DB = db_init(?MODULE, Cfg, NumRecords), + [{db, DB}, {num_records, NumRecords} | Cfg]. + +end_per_suite(_) -> + ok. + +%% Test cases ================================================================= + +t_fold(Cfg) -> + DB = ?config(db, Cfg), + N = ?config(num_records, Cfg), + ?assertEqual( + lists:foldl(fun (X, Sum) -> X + Sum end, 0, lists:seq(1, N)), + blockchain_rocks:fold(DB, [], 0, fun(KV, Sum) -> kv_to_int(KV) + Sum end) + ). + +t_sample_sanity_check(Cfg) -> + DB = ?config(db, Cfg), + Sample = blockchain_rocks:sample(DB, [], 1), + ?assertMatch([{<<"k", V/binary>>, <<"v", V/binary>>}], Sample), + DBEmpty = db_init(list_to_atom(atom_to_list(?FUNCTION_NAME) ++ "__empty"), Cfg, 0), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 1)), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 5)), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 10)). + +t_sample(Cfg) -> + DB = ?config(db, Cfg), + K = 10, + Trials = 100, + Samples = [blockchain_rocks:sample(DB, [], K) || _ <- lists:duplicate(Trials, {})], + NumUniqueSamples = length(lists:usort(Samples)), + ProportionOfUnique = NumUniqueSamples / Trials, + %% At least 1/2 the time a new record-set was sampled: + ?assert(ProportionOfUnique >= 0.5). + +t_sample_filtered(Cfg) -> + DB = ?config(db, Cfg), + S0 = blockchain_rocks:stream(DB, []), + S1 = data_stream:filter(S0, fun kv_is_even/1), + SampleSize = 100, + Sample = data_stream:sample(S1, SampleSize), + ?assertEqual(SampleSize, length(Sample)), + lists:foreach( + fun (KV) -> + ?assertMatch({<<"k", IBin/binary>>, <<"v", IBin/binary>>}, KV), + {<<"k", IBin/binary>>, <<"v", IBin/binary>>} = KV, + ?assertEqual(0, binary_to_integer(IBin) rem 2) + end, + Sample + ). + +t_stream_mapped_and_filtered(Cfg) -> + DB = ?config(db, Cfg), + S0 = blockchain_rocks:stream(DB, []), + S1 = data_stream:map(S0, fun kv_to_int/1), + S2 = data_stream:filter(S1, fun (I) -> I rem 2 =:= 0 end), + data_stream:foreach(S2, fun (I) -> ?assert(I rem 2 =:= 0) end). + +%% Internal =================================================================== + +-spec db_init(atom(), [{atom(), term()}], non_neg_integer()) -> + rocksdb:db_handle(). +db_init(TestCase, Cfg, NumRecords) -> + PrivDir = ?config(priv_dir, Cfg), + DBFile = atom_to_list(TestCase) ++ ".db", + DBPath = filename:join(PrivDir, DBFile), + {ok, DB} = rocksdb:open(DBPath, [{create_if_missing, true}]), + lists:foreach( + fun ({K, V}) -> ok = rocksdb:put(DB, K, V, []) end, + lists:map(fun int_to_kv/1, lists:seq(1, NumRecords)) + ), + %% Sanity check that all keys and values are formatted as expected: + blockchain_rocks:foreach( + DB, + [], + fun(KV) -> ?assertMatch({<<"k", I/binary>>, <<"v", I/binary>>}, KV) end + ), + DB. + +-spec int_to_kv(integer()) -> {binary(), binary()}. +int_to_kv(I) -> + K = <<"k", (integer_to_binary(I))/binary>>, + V = <<"v", (integer_to_binary(I))/binary>>, + {K, V}. + +-spec kv_to_int({binary(), binary()}) -> integer(). +kv_to_int({<<"k", I/binary>>, <<"v", I/binary>>}) -> + binary_to_integer(I). + +-spec kv_is_even({binary(), binary()}) -> boolean(). +kv_is_even(KV) -> + kv_to_int(KV) rem 2 =:= 0.