Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thumbs #131

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
11 changes: 11 additions & 0 deletions .thumbs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
minimum_reviewers: 2
merge: true
build_steps:
- make clean
- make deps
- make compile
- make test
- make xref
- make dialyzer
org_mode: true
timeout: 1800
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## WHAT?

A set of state based CRDTs implemented in Erlang and on the paper -
A set of state-based CRDTs implemented in Erlang, and based on the paper -
[A Comprehensive study of Convergent and Commutative Replicated Data Types]
(http://hal.inria.fr/docs/00/55/55/88/PDF/techreport.pdf) - which you
may find an interesting read.
Expand All @@ -11,6 +11,6 @@ may find an interesting read.

Riak is getting CRDT support built in, so we've archived the old
riak_dt in the branch `prototype`. No further work will be done on
it. This repo is now a resuable library of Quickcheck tested
it. This repo is now a reusable library of QuickCheck tested
implementations of CRDTs.

4 changes: 2 additions & 2 deletions include/riak_dt.hrl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-ifdef(namespaced_types).
-type riak_dt_dict() :: dict:dict().
-type riak_dt_set() :: sets:set().
-else.
-type riak_dt_dict() :: dict().
-type riak_dt_set() :: set().
-endif.

-type riak_dt_set() :: ordsets:ordset(_).
7 changes: 5 additions & 2 deletions src/riak_dt.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% -*- erlang -*-
{application, riak_dt,
[
{description, ""},
{description, "riak CRDT datatypes"},
{vsn, git},
{registered, []},
{applications, [
Expand All @@ -14,5 +14,8 @@
%% indicates the level of compression. Higher number means more
%% compression, but more time to compress. In tests so far 1 has
%% been enough for CRDTs
{env, [{binary_compression, 1}]}
{env, [{binary_compression, 1}]},
{maintainers, ["Basho", "Heinz N. Gies"]},
{licenses, ["Apache"]},
{links, [{"Github", "https://github.com/basho/riak_dt"}]}
]}.
36 changes: 29 additions & 7 deletions src/riak_dt_gset.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
%%
%% riak_dt_gset: A convergent, replicated, state based grow only set
%%
%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2007-2016 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
Expand Down Expand Up @@ -58,7 +58,7 @@

-type binary_gset() :: binary(). %% A binary that from_binary/1 will operate on.

-type gset_op() :: {add, member()}.
-type gset_op() :: {add, member()} | {add_all, members()}.

-type actor() :: riak_dt:actor().

Expand All @@ -78,10 +78,21 @@ value(GSet) ->
value(_, GSet) ->
value(GSet).

-spec apply_ops([gset_op()], actor(), gset()) ->
{ok, gset()}.
apply_ops([], _Actor, GSet) ->
{ok, GSet};
apply_ops([Op | Rest], Actor, GSet) ->
{ok, GSet2} = update(Op, Actor, GSet),
apply_ops(Rest, Actor, GSet2).

-spec update(gset_op(), actor(), gset()) -> {ok, gset()}.
update({add, Elem}, _Actor, GSet) ->
{ok, ordsets:add_element(Elem, GSet)};

update({update, Ops}, _Actor, GSet) ->
apply_ops(Ops,_Actor,GSet);

update({add_all, Elems}, _Actor, GSet) ->
{ok, ordsets:union(GSet, ordsets:from_list(Elems))}.

Expand All @@ -105,21 +116,27 @@ equal(GSet1, GSet2) ->
-include("riak_dt_tags.hrl").
-define(TAG, ?DT_GSET_TAG).
-define(V1_VERS, 1).
-define(V2_VERS, 2).

-spec to_binary(gset()) -> binary_gset().
to_binary(GSet) ->
<<?TAG:8/integer, ?V1_VERS:8/integer, (riak_dt:to_binary(GSet))/binary>>.
%%<<?TAG:8/integer, ?V1_VERS:8/integer, (riak_dt:to_binary(GSet))/binary>>.
{ok, B} = to_binary(?V2_VERS, GSet),
B.

-spec to_binary(Vers :: pos_integer(), gset()) -> {ok, binary()} | ?UNSUPPORTED_VERSION.
to_binary(1, S) ->
B = to_binary(S),
{ok, B};
to_binary(Vers, _S) ->
to_binary(?V1_VERS, S) ->
{ok, <<?TAG:8/integer, ?V1_VERS:8/integer, (riak_dt:to_binary(S))/binary>>};
to_binary(?V2_VERS, S) ->
{ok, <<?TAG:8/integer, ?V2_VERS:8/integer, (riak_dt:to_binary(S))/binary>>};
to_binary(Vers, _S0) ->
?UNSUPPORTED_VERSION(Vers).

-spec from_binary(binary()) -> {ok, gset()} | ?UNSUPPORTED_VERSION | ?INVALID_BINARY.
from_binary(<<?TAG:8/integer, ?V1_VERS:8/integer, Bin/binary>>) ->
{ok, riak_dt:from_binary(Bin)};
from_binary(<<?TAG:8/integer, ?V2_VERS:8/integer, Bin/binary>>) ->
{ok, riak_dt:from_binary(Bin)};
from_binary(<<?TAG:8/integer, Vers:8/integer, _Bin/binary>>) ->
?UNSUPPORTED_VERSION(Vers);
from_binary(_B) ->
Expand Down Expand Up @@ -157,6 +174,11 @@ stat_test() ->
?assertEqual(15, stat(max_element_size, S1)),
?assertEqual(undefined, stat(actor_count, S1)).

to_binary_test() ->
GSet = update({add, <<"foo">>}, undefined_actor, riak_dt_gset:new()),
Bin = riak_dt_gset:to_binary(GSet),
?assertMatch( <<82:8/integer, ?V2_VERS:8/integer, _/binary>> , Bin).

-ifdef(EQC).
eqc_value_test_() ->
crdt_statem_eqc:run(?MODULE, 1000).
Expand Down
18 changes: 9 additions & 9 deletions src/riak_dt_map.erl
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ merge({LHSClock, LHSEntries, LHSDeferred}, {RHSClock, RHSEntries, RHSDeferred})
%% only.
-spec filter_unique(riak_dt_set(), entries(), riak_dt_vclock:vclock(), entries()) -> entries().
filter_unique(FieldSet, Entries, Clock, Acc) ->
sets:fold(fun({_Name, Type}=Field, Keep) ->
ordsets:fold(fun({_Name, Type}=Field, Keep) ->
{Dots, TS} = ?DICT:fetch(Field, Entries),
KeepDots = ?DICT:filter(fun(Dot, _CRDT) ->
is_dot_unseen(Dot, Clock)
Expand Down Expand Up @@ -518,44 +518,44 @@ is_dot_unseen(Dot, Clock) ->
%% @doc Get the keys from an ?DICT as a ?SET
-spec key_set(riak_dt_dict()) -> riak_dt_set().
key_set(Dict) ->
sets:from_list(?DICT:fetch_keys(Dict)).
ordsets:from_list(?DICT:fetch_keys(Dict)).

%% @doc break the keys from an two ?DICTs out into three ?SETs, the
%% common keys, those unique to one, and those unique to the other.
-spec key_sets(riak_dt_dict(), riak_dt_dict()) -> {riak_dt_set(), riak_dt_set(), riak_dt_set()}.
key_sets(LHS, RHS) ->
LHSet = key_set(LHS),
RHSet = key_set(RHS),
{sets:intersection(LHSet, RHSet),
sets:subtract(LHSet, RHSet),
sets:subtract(RHSet, LHSet)}.
{ordsets:intersection(LHSet, RHSet),
ordsets:subtract(LHSet, RHSet),
ordsets:subtract(RHSet, LHSet)}.


%% @private for a set of dots (that are unique to one side) decide
%% whether to keep, or drop each.
-spec filter_dots(riak_dt_set(), riak_dt_dict(), riak_dt_vclock:vclock()) -> entries().
filter_dots(Dots, CRDTs, Clock) ->
DotsToKeep = sets:filter(fun(Dot) ->
DotsToKeep = ordsets:filter(fun(Dot) ->
is_dot_unseen(Dot, Clock)
end,
Dots),

?DICT:filter(fun(Dot, _CRDT) ->
sets:is_element(Dot, DotsToKeep)
ordsets:is_element(Dot, DotsToKeep)
end,
CRDTs).

%% @private merge the common fields into a set of surviving dots and a
%% tombstone per field. If a dot is on both sides, keep it. If it is
%% only on one side, drop it if dominated by the otherside's clock.
merge_common(FieldSet, LHS, RHS, LHSClock, RHSClock, Acc) ->
sets:fold(fun({_, Type}=Field, Keep) ->
ordsets:fold(fun({_, Type}=Field, Keep) ->
{LHSDots, LHTS} = ?DICT:fetch(Field, LHS),
{RHSDots, RHTS} = ?DICT:fetch(Field, RHS),
{CommonDots, LHSUniqe, RHSUnique} = key_sets(LHSDots, RHSDots),
TS = Type:merge(RHTS, LHTS),

CommonSurviving = sets:fold(fun(Dot, Common) ->
CommonSurviving = ordsets:fold(fun(Dot, Common) ->
L = ?DICT:fetch(Dot, LHSDots),
?DICT:store(Dot, L, Common)
end,
Expand Down
8 changes: 4 additions & 4 deletions src/riak_dt_od_flag.erl
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ merge({LHSClock, LHSDots, LHSDeferred}, {RHSClock, RHSDots, RHSDeferred}) ->
%% drop all the RHS dots that dominated by the LHS clock
%% keep all the dots that are in both
%% save value as value of flag
CommonDots = sets:intersection(sets:from_list(LHSDots), sets:from_list(RHSDots)),
LHSUnique = sets:to_list(sets:subtract(sets:from_list(LHSDots), CommonDots)),
RHSUnique = sets:to_list(sets:subtract(sets:from_list(RHSDots), CommonDots)),
CommonDots = ordsets:intersection(ordsets:from_list(LHSDots), ordsets:from_list(RHSDots)),
LHSUnique = ordsets:to_list(ordsets:subtract(ordsets:from_list(LHSDots), CommonDots)),
RHSUnique = ordsets:to_list(ordsets:subtract(ordsets:from_list(RHSDots), CommonDots)),
LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock),
RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock),
Flag = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]),
Flag = riak_dt_vclock:merge([ordsets:to_list(CommonDots), LHSKeep, RHSKeep]),
Deferred = ordsets:union(LHSDeferred, RHSDeferred),

apply_deferred(NewClock, Flag, Deferred).
Expand Down
86 changes: 46 additions & 40 deletions src/riak_dt_orswot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -294,50 +294,56 @@ merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) ->
{Clock, Entries, Deferred};
merge({LHSClock, LHSEntries, LHSDeferred}, {RHSClock, RHSEntries, RHSDeferred}) ->
Clock = riak_dt_vclock:merge([LHSClock, RHSClock]),
{Keep, RHSElems} = ?DICT:fold(fun(Elem, Dots, {Acc, RHSRemaining}) ->
case ?DICT:find(Elem, RHSEntries) of
error ->
%% Only on left, trim dots and keep
%% surviving
case riak_dt_vclock:subtract_dots(Dots, RHSClock) of
{Keep, RHSElems} =
?DICT:fold(fun(Elem, Dots, {Acc, RHSRemaining}) ->
case ?DICT:find(Elem, RHSEntries) of
error ->
%% Only on left, trim dots and keep surviving
case riak_dt_vclock:subtract_dots(Dots, RHSClock) of
[] ->
%% Removed
{Acc, RHSRemaining};
NewDots ->
{?DICT:store(Elem, NewDots, Acc), RHSRemaining}
end;
{ok, RHSDots} ->
%% On both sides
CommonDots = ordsets:intersection(
ordsets:from_list(Dots),
ordsets:from_list(RHSDots)),
LHSUnique = ordsets:to_list(
ordsets:subtract(ordsets:from_list(Dots),
CommonDots)),
RHSUnique = ordsets:to_list(
ordsets:subtract(ordsets:from_list(RHSDots),
CommonDots)),
LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock),
RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock),
V = riak_dt_vclock:merge([ordsets:to_list(CommonDots), LHSKeep, RHSKeep]),
%% Perfectly possible that an item in both sets should be dropped
case V of
[] ->
%% Removed from both sides
{Acc, ?DICT:erase(Elem, RHSRemaining)};
_ ->
{?DICT:store(Elem, V, Acc), ?DICT:erase(Elem, RHSRemaining)}
end
end
end,
{?DICT:new(), RHSEntries},
LHSEntries),
%%Now what about the stuff left from the right hand side? Do the same to that!
Entries = ?DICT:fold(fun(Elem, Dots, Acc) ->
case riak_dt_vclock:subtract_dots(Dots, LHSClock) of
[] ->
%% Removed
{Acc, RHSRemaining};
Acc;
NewDots ->
{?DICT:store(Elem, NewDots, Acc), RHSRemaining}
end;
{ok, RHSDots} ->
%% On both sides
CommonDots = sets:intersection(sets:from_list(Dots), sets:from_list(RHSDots)),
LHSUnique = sets:to_list(sets:subtract(sets:from_list(Dots), CommonDots)),
RHSUnique = sets:to_list(sets:subtract(sets:from_list(RHSDots), CommonDots)),
LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock),
RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock),
V = riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]),
%% Perfectly possible that an item in both sets should be dropped
case V of
[] ->
%% Removed from both sides
{Acc, ?DICT:erase(Elem, RHSRemaining)};
_ ->
{?DICT:store(Elem, V, Acc), ?DICT:erase(Elem, RHSRemaining)}
?DICT:store(Elem, NewDots, Acc)
end
end
end,
{?DICT:new(), RHSEntries},
LHSEntries),
%%Now what about the stuff left from the right hand side? Do the same to that!
Entries = ?DICT:fold(fun(Elem, Dots, Acc) ->
case riak_dt_vclock:subtract_dots(Dots, LHSClock) of
[] ->
%% Removed
Acc;
NewDots ->
?DICT:store(Elem, NewDots, Acc)
end
end,
Keep,
RHSElems),
end,
Keep,
RHSElems),
Deffered = merge_deferred(LHSDeferred, RHSDeferred),

apply_deferred(Clock, Entries, Deffered).
Expand Down
39 changes: 39 additions & 0 deletions test/riak_dt_gset_tests.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
%% -------------------------------------------------------------------
%%
%% riak_dt_gset_test: trivial assertive tests to illustrate module behavior
%%
%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(riak_dt_gset_tests).

-include_lib("eunit/include/eunit.hrl").
-import(riak_dt_gset, [update/3]).

-define(ACTOR_VAL, undefined).
-define(SINGLE_VAL, <<"binarytemple">>).
-define(FRANK_BOOTH, [<<"frank">>, <<"booth">>]).
-define(BOOTH_FRANK, [<<"booth">>, <<"frank">>]).

update_add_test() ->
N = riak_dt_gset:new(),
?assertEqual({ok, [?SINGLE_VAL]}, update({add, ?SINGLE_VAL}, ?ACTOR_VAL, N))
.

update_add_all_test() ->
?assertEqual({ok, ?BOOTH_FRANK}, update({add_all, ?FRANK_BOOTH}, ?ACTOR_VAL, riak_dt_gset:new())),
?assertNotEqual({ok, ?FRANK_BOOTH}, update({add_all, ?FRANK_BOOTH}, ?ACTOR_VAL, riak_dt_gset:new()))
.