|
22 | 22 | %% Naming pattern of antidote crdts: <type>_<semantics>
|
23 | 23 | %% if there is only one kind of semantics implemented for a certain type
|
24 | 24 | %% only the type is used in the name e.g. rga
|
25 |
| -%% counter_pn: PN-Counter aka Posistive Negative Counter |
| 25 | +%% counter_pn: PN-Counter aka Positive Negative Counter |
26 | 26 | %% counter_b: Bounded Counter
|
27 | 27 | %% counter_fat: Fat Counter
|
28 | 28 | %% integer: Integer (Experimental)
|
|
35 | 35 | %% register_mv: MultiValue Register aka MV-Reg
|
36 | 36 | %% map_go: Grow Only Map aka G-Map
|
37 | 37 | %% map_aw: Add Wins Map aka AW-Map (Experimental)
|
38 |
| -%% map_rr: Recursive Resets Map akak RR-Map |
| 38 | +%% map_rr: Recursive Resets Map aka RR-Map |
39 | 39 | %% rga: Replicated Growable Array (Experimental)
|
40 | 40 |
|
41 | 41 |
|
42 | 42 |
|
43 | 43 | -module(antidote_crdt).
|
44 | 44 |
|
45 |
| --include("antidote_crdt.hrl"). |
46 |
| - |
47 |
| --define(CRDTS, [antidote_crdt_counter_pn, |
48 |
| - antidote_crdt_counter_b, |
49 |
| - antidote_crdt_counter_fat, |
50 |
| - antidote_crdt_flag_ew, |
51 |
| - antidote_crdt_flag_dw, |
52 |
| - antidote_crdt_set_go, |
53 |
| - antidote_crdt_set_aw, |
54 |
| - antidote_crdt_set_rw, |
55 |
| - antidote_crdt_register_lww, |
56 |
| - antidote_crdt_register_mv, |
57 |
| - antidote_crdt_map_go, |
58 |
| - antidote_crdt_map_rr]). |
59 |
| - |
60 |
| --export([is_type/1 |
61 |
| - ]). |
62 |
| - |
63 |
| --callback new() -> crdt(). |
64 |
| --callback value(crdt()) -> value(). |
65 |
| --callback downstream(update(), crdt()) -> {ok, effect()} | {error, reason()}. |
66 |
| --callback update(effect(), crdt()) -> {ok, crdt()}. |
| 45 | + |
| 46 | +% The CRDTs supported by Antidote: |
| 47 | +-type typ() :: |
| 48 | +antidote_crdt_counter_pn |
| 49 | +| antidote_crdt_counter_b |
| 50 | +| antidote_crdt_counter_fat |
| 51 | +| antidote_crdt_flag_ew |
| 52 | +| antidote_crdt_flag_dw |
| 53 | +| antidote_crdt_set_go |
| 54 | +| antidote_crdt_set_aw |
| 55 | +| antidote_crdt_set_rw |
| 56 | +| antidote_crdt_register_lww |
| 57 | +| antidote_crdt_register_mv |
| 58 | +| antidote_crdt_map_go |
| 59 | +| antidote_crdt_map_rr. |
| 60 | + |
| 61 | +% Note: the crdt and effect types are not correct, the tags just help to find errors |
| 62 | +% The State of a CRDT: |
| 63 | +-opaque crdt() :: {antidote_crdt, state, term()}. |
| 64 | +% The downstream effect, which has to be applied at each replica |
| 65 | +-opaque effect() :: {antidote_crdt, effect, term()}. |
| 66 | +% The update operation, consisting of operation name and parameters |
| 67 | +% (e.g. {increment, 1} to increment a counter by one) |
| 68 | +-type update() :: {atom(), term()}. |
| 69 | +% Result of reading a CRDT (state without meta data) |
| 70 | +-type value() :: term(). |
| 71 | + |
| 72 | +% reason for an error |
| 73 | +-type reason() :: term(). |
| 74 | + |
| 75 | +-export_type([ |
| 76 | + crdt/0, |
| 77 | + update/0, |
| 78 | + effect/0, |
| 79 | + value/0, |
| 80 | + typ/0 |
| 81 | +]). |
| 82 | + |
| 83 | + |
| 84 | +-type internal_crdt() :: term(). |
| 85 | +-type internal_effect() :: term(). |
| 86 | + |
| 87 | +-export([is_type/1, new/1, value/2, downstream/3, update/3, require_state_downstream/2, is_operation/2]). |
| 88 | + |
| 89 | +% Callbacks implemented by each concrete CRDT implementation |
| 90 | +-callback new() -> internal_crdt(). |
| 91 | +-callback value(internal_crdt()) -> value(). |
| 92 | +-callback downstream(update(), internal_crdt()) -> {ok, internal_effect()} | {error, reason()}. |
| 93 | +-callback update(internal_effect(), internal_crdt()) -> {ok, internal_crdt()}. |
67 | 94 | -callback require_state_downstream(update()) -> boolean().
|
68 |
| --callback is_operation(update()) -> boolean(). %% Type check |
| 95 | +-callback is_operation(update()) -> boolean(). %% Type check |
| 96 | + |
| 97 | +-callback equal(internal_crdt(), internal_crdt()) -> boolean(). |
| 98 | +-callback to_binary(internal_crdt()) -> binary(). |
| 99 | +-callback from_binary(binary()) -> {ok, internal_crdt()} | {error, reason()}. |
| 100 | + |
| 101 | +% Check if the given type is supported by Antidote |
| 102 | +-spec is_type(typ()) -> boolean(). |
| 103 | +is_type(antidote_crdt_counter_pn) -> true; |
| 104 | +is_type(antidote_crdt_counter_b) -> true; |
| 105 | +is_type(antidote_crdt_counter_fat) -> true; |
| 106 | +is_type(antidote_crdt_flag_ew) -> true; |
| 107 | +is_type(antidote_crdt_flag_dw) -> true; |
| 108 | +is_type(antidote_crdt_set_go) -> true; |
| 109 | +is_type(antidote_crdt_set_aw) -> true; |
| 110 | +is_type(antidote_crdt_set_rw) -> true; |
| 111 | +is_type(antidote_crdt_register_lww) -> true; |
| 112 | +is_type(antidote_crdt_register_mv) -> true; |
| 113 | +is_type(antidote_crdt_map_go) -> true; |
| 114 | +is_type(antidote_crdt_map_rr) -> true; |
| 115 | +is_type(_) -> false. |
| 116 | + |
| 117 | + |
| 118 | +% Returns the initial CRDT state for the given Type |
| 119 | +-spec new(typ()) -> crdt(). |
| 120 | +new(Type) -> |
| 121 | + true = is_type(Type), |
| 122 | + Type:new(). |
| 123 | + |
| 124 | +% Reads the value from a CRDT state |
| 125 | +-spec value(typ(), crdt()) -> any(). |
| 126 | +value(Type, State) -> |
| 127 | + true = is_type(Type), |
| 128 | + Type:value(State). |
69 | 129 |
|
70 |
| --callback equal(crdt(), crdt()) -> boolean(). |
71 |
| --callback to_binary(crdt()) -> binary(). |
72 |
| --callback from_binary(binary()) -> {ok, crdt()} | {error, reason()}. |
| 130 | +% Computes the downstream effect for a given update operation and current state. |
| 131 | +% This has to be called once at the source replica. |
| 132 | +% The effect must then be applied on all replicas using the update function. |
| 133 | +% For some update operation it is not necessary to provide the current state |
| 134 | +% and the atom 'ignore' can be passed instead (see function require_state_downstream). |
| 135 | +-spec downstream(typ(), update(), crdt() | ignore) -> {ok, effect()} | {error, reason()}. |
| 136 | +downstream(Type, Update, State) -> |
| 137 | + true = is_type(Type), |
| 138 | + true = Type:is_operation(Update), |
| 139 | + Type:downstream(Update, State). |
73 | 140 |
|
74 |
| -%% Following callbacks taken from riak_dt |
75 |
| -%% Not sure if it is useful for antidote |
76 |
| -%-callback stats(crdt()) -> [{atom(), number()}]. |
77 |
| -%-callback stat(atom(), crdt()) -> number() | undefined. |
| 141 | +% Updates the state of a CRDT by applying a downstream effect calculated |
| 142 | +% using the downstream function. |
| 143 | +% For most types the update function must be called in causal order: |
| 144 | +% if Eff2 was calculated on a state where Eff1 was already replied, |
| 145 | +% then Eff1 has to be applied before Eff2 on all replicas. |
| 146 | +-spec update(typ(), effect(), crdt()) -> {ok, crdt()}. |
| 147 | +update(Type, Effect, State) -> |
| 148 | + true = is_type(Type), |
| 149 | + Type:update(Effect, State). |
78 | 150 |
|
79 |
| -is_type(Type) -> |
80 |
| - is_atom(Type) andalso lists:member(Type, ?CRDTS). |
| 151 | +% Checks whether the current state is required by the downstream function |
| 152 | +% for a specific type and update operation |
| 153 | +-spec require_state_downstream(typ(), update()) -> boolean(). |
| 154 | +require_state_downstream(Type, Update) -> |
| 155 | + true = is_type(Type), |
| 156 | + Type:require_state_downstream(Update). |
81 | 157 |
|
82 |
| -%% End of Module. |
| 158 | +% Checks whether the given update operation is valid for the given type |
| 159 | +-spec is_operation(typ(), update()) -> boolean(). |
| 160 | +is_operation(Type, Update) -> |
| 161 | + true = is_type(Type), |
| 162 | + Type:is_operation(Update). |
0 commit comments