diff --git a/dialyzer.ignore-warnings b/dialyzer.ignore-warnings index 1c1ac711..7c5069f3 100644 --- a/dialyzer.ignore-warnings +++ b/dialyzer.ignore-warnings @@ -1,19 +1,14 @@ -riak_repl_keylist_client.erl:108: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) -riak_repl_keylist_client.erl:218: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) -riak_repl_keylist_client.erl:267: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) -riak_repl_keylist_client.erl:120: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) -riak_repl_keylist_client.erl:132: The call application:set_env('riak_repl',{'progress',_},nonempty_maybe_improper_list()) breaks the contract (Application,Par,Val) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()), is_subtype(Val,term()) -riak_core_connection.erl:108: Function exchange_handshakes_with/4 has no local return -riak_core_connection.erl:172: The call ranch_tcp:send(Socket::port(),Hello::binary()) breaks the contract (inet:socket(),iolist()) -> 'ok' | {'error',atom()} -riak_repl_keylist_client.erl:106: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) -riak_repl_keylist_client.erl:216: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) -riak_repl_keylist_client.erl:265: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) -riak_repl_keylist_client.erl:118: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) -riak_repl_keylist_client.erl:130: The call application:set_env('riak_repl',{'progress',_},nonempty_maybe_improper_list()) breaks the contract (Application,Par,Val) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()), is_subtype(Val,term()) -riak_core_connection_mgr.erl:546: The pattern 'ok' can never match the type {'error',atom()} +Unknown functions cluster_info:format/3 cluster_info:register_app/1 -Unknown functions +gen_leader.erl:1115: The call sys:handle_debug(Debug::any(),{'gen_leader', 'print_event'},any(),Event::{'$leader_cast',_} | {'noreply',_} | {'ok',_} | {'out',_,_,_}) breaks the contract (Debug,FormFunc,Extra,Event) -> [dbg_opt()] when is_subtype(Debug,[dbg_opt()]), is_subtype(FormFunc,dbg_fun()), is_subtype(Extra,term()), is_subtype(Event,system_event()) gen_leader.erl:936: The call sys:handle_debug(Debug::any(),{'gen_leader', 'print_event'},any(),{'in',_}) breaks the contract (Debug,FormFunc,Extra,Event) -> [dbg_opt()] when is_subtype(Debug,[dbg_opt()]), is_subtype(FormFunc,dbg_fun()), is_subtype(Extra,term()), is_subtype(Event,system_event()) gen_leader.erl:952: Function system_terminate/4 has no local return -gen_leader.erl:1115: The call sys:handle_debug(Debug::any(),{'gen_leader', 'print_event'},any(),Event::{'$leader_cast',_} | {'noreply',_} | {'ok',_} | {'out',_,_,_}) breaks the contract (Debug,FormFunc,Extra,Event) -> [dbg_opt()] when is_subtype(Debug,[dbg_opt()]), is_subtype(FormFunc,dbg_fun()), is_subtype(Extra,term()), is_subtype(Event,system_event()) +riak_core_connection.erl:108: Function exchange_handshakes_with/4 has no local return +riak_core_connection.erl:172: The call ranch_tcp:send(Socket::port(),Hello::binary()) breaks the contract (inet:socket(),iolist()) -> 'ok' | {'error',atom()} +riak_core_connection_mgr.erl:546: The pattern 'ok' can never match the type {'error',atom()} +riak_repl_keylist_client.erl:108: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) +riak_repl_keylist_client.erl:120: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) +riak_repl_keylist_client.erl:132: The call application:set_env('riak_repl',{'progress',_},nonempty_maybe_improper_list()) breaks the contract (Application,Par,Val) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()), is_subtype(Val,term()) +riak_repl_keylist_client.erl:218: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) +riak_repl_keylist_client.erl:267: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()) diff --git a/priv/riak_repl.schema b/priv/riak_repl.schema index b76a93de..3babeb58 100644 --- a/priv/riak_repl.schema +++ b/priv/riak_repl.schema @@ -2,9 +2,10 @@ %% Replication config %% @doc Path (relative or absolute) to the working directory for the -%% replication process +%% replication process. {mapping, "mdc.data_root", "riak_repl.data_root", [ - {default, "{{repl_data_root}}"} + {default, "{{repl_data_root}}"}, + {datatype, directory} ]}. %% @doc The cluster manager will listen for connections from remote @@ -12,134 +13,171 @@ %% but only the cluster manager running on the cluster_leader will %% service requests. This can change as nodes enter and leave the %% cluster. The value is a combination of an IP address (**not -%% hostname**) followed by a port number +%% hostname**) followed by a port number. {mapping, "mdc.cluster_manager", "riak_core.cluster_mgr", [ {default, {"{{cluster_manager_ip}}", {{cluster_manager_port}} }}, {datatype, ip} ]}. -%% @doc The hard limit of fullsync workers that will be running on the -%% source side of a cluster across all nodes on that cluster for a -%% fullsync to a sink cluster. This means if one has configured -%% fullsync for two different clusters, both with a -%% max_fssource_cluster of 5, 10 fullsync workers can be in -%% progress. Only affects nodes on the source cluster on which this -%% parameter is defined via the configuration file or command line -{mapping, "mdc.max_fssource_cluster", "riak_repl.max_fssource_cluster", [ +%% @doc The fullsync strategy to use. +{mapping, "mdc.fullsync.strategy", "riak_repl.fullsync_strategy", +[{datatype, {enum, [keylist, aae]}}, + {default, keylist}, + hidden +]}. + +%% @doc The limit of fullsync workers running per source-side of a +%% replication connection. For example, if you have a cluster that is +%% replicating to 3 sink clusters and this set to 5, you will have at +%% most 15 workers in total. +{mapping, "mdc.fullsync.source.max_workers_per_cluster", "riak_repl.max_fssource_cluster", [ {datatype, integer}, {default, 5} ]}. -%% @doc Limits the number of fullsync workers that will be running on -%% each individual node in a source cluster. This is a hard limit for -%% all fullsyncs enabled; additional fullsync configurations will not -%% increase the number of fullsync workers allowed to run on any node. -%% Only affects nodes on the source cluster on which this parameter is -%% defined via the configuration file or command line -{mapping, "mdc.max_fssource_node", "riak_repl.max_fssource_node", [ +%% @doc The limit of fullsync workers running per node in the source +%% cluster. This setting is independent of the number of replication +%% connections. Thus, multiple simultaneous sink connections from the +%% source cluster will have to share the source node's number of +%% maximum connections. For example, if you have a cluster that is +%% replicating to 3 sink cluster and this is set to 1, you will have +%% at most 1 worker per node. +{mapping, "mdc.fullsync.source.max_workers_per_node", "riak_repl.max_fssource_node", [ {datatype, integer}, {default, 1} ]}. -%% @doc Limits the number of fullsync workers allowed to run on each -%% individual node in a sink cluster. This is a hard limit for all -%% fullsync sources interacting with the sink cluster. Thus, multiple -%% simultaneous source connections to the sink cluster will have to -%% share the sink node's number of maximum connections. Only affects -%% nodes on the sink cluster on which this parameter is defined via -%% the configuration file or command line. -{mapping, "mdc.max_fssink_node", "riak_repl.max_fssink_node", [ +%% @doc The limit of fullsync workers running per node in the sink +%% cluster. This setting is independent of the number of replication +%% connections. Thus, multiple simultaneous source connections to the +%% sink cluster will have to share the sink node's number of maximum +%% connections. For example, if you have a cluster that is replicating +%% from 3 source clusters and this is set to 1, you will have at most +%% 1 worker per node. +{mapping, "mdc.fullsync.sink.max_workers_per_node", "riak_repl.max_fssink_node", [ {datatype, integer}, {default, 1} ]}. -%% @doc Whether to initiate a fullsync on initial connection from the -%% secondary cluster -{mapping, "mdc.fullsync_on_connect", "riak_repl.fullsync_on_connect", [ - {datatype, {enum, [true, false]}}, - {default, true} +%% @doc Whether to initiate a fullsync on initial connection from a +%% source cluster. +{mapping, "mdc.fullsync.start_on_connect", "riak_repl.fullsync_on_connect", [ + {datatype, flag}, + {default, on} ]}. -%% @doc a single integer value representing the duration to wait in -%% minutes between fullsyncs, or a list of {clustername, -%% time_in_minutes} pairs for each sink participating in fullsync -%% replication. -{mapping, "mdc.fullsync_interval.$cluster_name", "riak_repl.fullsync_interval", [ - {datatype, {duration, ms}}, - {include_default, "all"}, - {commented, "30m"} +%% @doc The duration to wait between initiating fullsyncs for all +%% connected sink clusters. If set to "never", fullsync will not be +%% automatically initiated. If set to "per_sink", individual intervals +%% should be set using mdc.fullsync.interval. +%% @see mdc.fullsync.interval.$cluster_name +{mapping, "mdc.fullsync.interval", "riak_repl.fullsync_interval", [ + {datatype, [{duration, m}, {atom, per_sink}, {atom, never}]}, + {default, "6h"} +]}. + +%% @doc The duration to wait between initiating fullsync with a +%% specific connected sink cluster. +%% @see mdc.fullsync.interval +{mapping, "mdc.fullsync.interval.$cluster_name", "riak_repl.fullsync_interval", [ + {datatype, {duration, m}}, + {include_default, "sink_cluster"}, + {commented, "2d"} ]}. {translation, "riak_repl.fullsync_interval", fun(Conf) -> - Minute = fun(Millis) -> Millis div 60000 end, - FullSyncIntervals = cuttlefish_variable:filter_by_prefix("mdc.fullsync_interval", Conf), - case proplists:get_value(["mdc", "fullsync_interval", "all"], FullSyncIntervals) of - undefined -> - [ {list_to_atom(Name), Minute(Value)} || {["mdc", "fullsync_interval", Name], Value} <- FullSyncIntervals]; - X -> Minute(X) - end + FullSyncIntervals = cuttlefish_variable:filter_by_prefix("mdc.fullsync.interval", Conf), + {[{_, Global}], Sinks} = lists:partition(fun({I, _}) -> ["mdc", "fullsync", "interval"] == I end, FullSyncIntervals), + if Global == never, Sinks == [] -> + disabled; + Global == never -> + cuttlefish:warn("mdc.fullsync.interval is set to never," + " sink-specific intervals are ignored"), + disabled; + is_integer(Global), Sinks == [] -> + Global; + Global == per_sink, Sinks == [] -> + cuttlefish:invalid("Cannot set mdc.fullsync.interval = per_sink and" + " omit sink-specific intervals, set sink-specific" + " intervals or use 'never'"); + Global == per_sink -> + [{list_to_atom(SinkName), Value} || {["mdc", "fullsync", "interval", SinkName], Value} <- Sinks ]; + true -> + cuttlefish:invalid("Cannot set both mdc.fullsync.interval and" + " sink-specific intervals") + end end}. +%% @doc By default, fullsync replication will not try to coordinate +%% with other Riak subsystems that may be contending for the same +%% resources. Enabling this can help to prevent system response +%% degradation under times of heavy load from multiple background +%% tasks. To disable background coordination, set this parameter to +%% off. +{mapping, "mdc.fullsync.background_manager", "riak_repl.fullsync_use_background_manager", [ + {datatype, flag}, + {default, off}, + hidden +]}. + +%% @doc How frequently the metrics for fullsync source processes should +%% be gathered. The output of `riak-repl status` is calculated on this +%% interval. +{mapping, "mdc.fullsync.source.metrics_refresh_interval", "riak_repl.fullsync_stat_refresh_interval", [ + {datatype, {duration, ms}}, + {commented, "1m"}, + hidden +]}. + %% @doc The maximum size the realtime replication queue can grow to -%% before new objects are dropped. Defaults to 100MB. Dropped objects -%% will need to be replication with a fullsync. -{mapping, "mdc.rtq_max_bytes", "riak_repl.rtq_max_bytes", [ +%% before old objects are dropped. +{mapping, "mdc.realtime.queue_max_bytes", "riak_repl.rtq_max_bytes", [ {datatype, bytesize}, {default, "100MB"} ]}. -%% @doc Enable Riak CS proxy_get and block filter. -{mapping, "mdc.proxy_get", "riak_repl.proxy_get", [ - {datatype, {enum, [on, off]}}, - {default, off} +%% @doc Whether heartbeats are enabled for realtime replication +%% connections. +{mapping, "mdc.realtime.heartbeat", "riak_repl.rt_heartbeat_interval", [ + {datatype, flag}, + {default, on} ]}. -{translation, - "riak_repl.proxy_get", - fun(Conf) -> - case cuttlefish:conf_get("mdc.proxy_get", Conf) of - on -> enabled; - off -> disabled; - _ -> disabled - end - end}. - -%% @doc A heartbeat message is sent from the source to the sink every -%% heartbeat_interval. Setting heartbeat_interval to undefined -%% disables the realtime heartbeat. This feature is only available in -%% Riak Enterprise 1.3.2+. -{mapping, "mdc.realtime.heartbeat_interval", "riak_repl.rt_heartbeat_interval", [ +%% @doc When heartbeats are enabled, this setting is the interval +%% between heartbeat messages over the realtime replication +%% connection. +%% @see mdc.realtime.heartbeat +{mapping, "mdc.realtime.heartbeat.interval", "riak_repl.rt_heartbeat_interval", [ {datatype, {duration, s}}, {default, "15s"} ]}. -%% @doc If a heartbeat response is not received in -%% rt_heartbeat_timeout seconds, then the source connection exits and -%% will be re-established. This feature is only available in Riak -%% Enterprise 1.3.2+. -{mapping, "mdc.realtime.heartbeat_timeout", "riak_repl.rt_heartbeat_timeout", [ +%% @doc When heartbeats are enabled, this setting is the amount of +%% time to wait for a heartbeat response from the sink. If a heartbeat +%% response is not received within this time, then the source +%% connection closes and will be re-established. +%% @see mdc.realtime.heartbeat +{mapping, "mdc.realtime.heartbeat.timeout", "riak_repl.rt_heartbeat_timeout", [ {datatype, {duration, s}}, {default, "15s"} ]}. -%% @doc By default, fullsync replication will try to coordinate with other -%% riak subsystems that may be contending for the same resources. This will help -%% to prevent system response degradation under times of heavy load from multiple -%% background tasks. To disable background coordination, set this parameter to false. -%% Enterprise 2.0+. -{mapping, "mdc.fullsync.use_bg_manager", "riak_repl.fullsync_use_background_manager", [ - {datatype, {enum, [true, false]}}, - {level, advanced}, - {default, false} -]}. +{translation, "riak_repl.rt_heartbeat_interval", + fun(Conf) -> + case cuttlefish:conf_get("mdc.realtime.heartbeat", Conf) of + true -> + cuttlefish:conf_get("mdc.realtime.heartbeat.interval", Conf); + false -> + undefined + end + end}. -%% @doc How frequently the stats for fullsync source processes should be -%% gathered. Requests for fullsync status always returned the most recently -%% gathered data, and thus can be at most as old as this value. -{mapping, "mdc.fullsync.stat_refresh_interval", "riak_repl.fullsync_stat_refresh_interval", [ - {datatype, {duration, ms}}, - {commented, "1m"} +%% @doc Enable Riak CS proxy_get and block filter. +{mapping, "mdc.proxy_get", "riak_repl.proxy_get", [ + {datatype, {flag, {on, enabled}, {off, disabled}}}, + {default, off}, + hidden ]}. diff --git a/priv/riak_repl_experimental.schema b/priv/riak_repl_experimental.schema new file mode 100644 index 00000000..8dc30675 --- /dev/null +++ b/priv/riak_repl_experimental.schema @@ -0,0 +1,163 @@ +%%-*- mode: erlang -*- +%% ============ Experimental / Transient Settings ============ + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting. +{mapping, "mdc.cluster_manager.cancellation_interval", + "riak_repl.cm_cancellation_interval", + [{datatype, {duration, ms}}, + hidden + ]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting. +{mapping, "mdc.cluster_manager.no_endpoints_retry_interval", + "riak_repl.connmgr_no_endpoint_retry", + [{datatype, {duration, ms}}, + hidden + ]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.status_timeout", "riak_repl.status_timeout", +[{datatype, {duration, ms}}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.status_helper_timeout", "riak_repl.status_helper_timeout", +[{datatype, {duration, ms}}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.replicate_cs_bucket_objects", "riak_repl.replicate_cs_bucket_objects", +[{datatype, flag}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.replicate_cs_user_objects", "riak_repl.replicate_cs_user_objects", +[{datatype, flag}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.sink.min_put_workers", "riak_repl.fssink_min_workers", +[{datatype, integer}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.sink.max_put_workers", "riak_repl.fssink_max_workers", +[{datatype, integer}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.queue.overload_threshold", "riak_repl.rtq_overload_threshold", +[{datatype, integer}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.queue.overload_recover", "riak_repl.rtq_overload_recover", +[{datatype, integer}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.source.max_retries", "riak_repl.max_fssource_retries", +[{datatype, [{atom, infinity}, integer]}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.max_reserve_retries", "riak_repl.max_reserve_retries", +[{datatype, integer}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.queue.drop_report_interval", "riak_repl.rtq_drop_report_interval", +[{datatype, {duration, ms}}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.sink.max_pending", "riak_repl.rtsink_max_pending", +[ + %% TODO: Jon says size this according to the rtsink_max_workers + %% setting + {datatype, integer}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.sink.min_put_workers", "riak_repl.rtsink_min_workers", +[{datatype, integer}, + {default, 5}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.sink.max_put_workers", "riak_repl.rtsink_max_workers", +[{datatype, integer}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.sink.bucket_type_drop_report_interval", "riak_repl.bucket_type_drop_report_interval", +[{datatype, {duration, ms}}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.socket_reactivate_interval", "riak_repl.reactivate_socket_interval_millis", +[{datatype, {duration, ms}}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.source.helper_status_timeout", "riak_repl.riak_repl2_rtsource_helper_status_to", +[{datatype, {duration, ms}}, + hidden +]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.aae.direct.limit", "riak_repl.fullsync_direct_limit", + [{datatype, integer}, + hidden]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.aae.direct.mode", "riak_repl.fullsync_direct_mode", + [{datatype, {enum, [inline, buffered]}}, + hidden]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.keylist.min_get_workers", "riak_repl.min_get_workers", + [{datatype, integer}, + hidden]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.keylist.max_get_workers", "riak_repl.max_get_workers", + [{datatype, integer}, + hidden]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.keylist.vnode_gets", "riak_repl.vnode_gets", + [{datatype, flag}, + hidden]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.keylist.diff_batch_size", "riak_repl.diff_batch_size", + [{datatype, integer}, + hidden]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.fullsync.keylist.bloom_fold", "riak_repl.bloom_fold", + [{datatype, flag}, + hidden]}. + +%% @doc EXPERIMENTAL:: Consult with Basho about changing this setting +{mapping, "mdc.realtime.queue_migration_timeout", "riak_repl.queue_migration_timeout", +[{datatype, {duration, ms}}, + hidden +]}. diff --git a/rebar.config b/rebar.config index 8218f10d..24e512c4 100644 --- a/rebar.config +++ b/rebar.config @@ -7,6 +7,7 @@ {erl_first_files, ["src/gen_leader.erl"]}. {xref_checks, []}. {xref_queries, [{"(XC - UC) || (XU - X - B - cluster_info : Mod)", []}]}. +{eunit_opts, [verbose]}. {deps, [ {lager, "2.0.3", {git, "git://github.com/basho/lager.git", {tag, "2.0.3"}}}, diff --git a/src/riak_repl_app.erl b/src/riak_repl_app.erl index 4b536598..4519225e 100644 --- a/src/riak_repl_app.erl +++ b/src/riak_repl_app.erl @@ -110,6 +110,9 @@ start(_Type, _StartArgs) -> ok = riak_repl2_fscoordinator_serv:sync_register_service(), ok = riak_repl2_pg_block_requester:sync_register_service(), + %% register console commands + ok = riak_repl_console:register_cli(), + %% Don't announce service is available until we've %% registered all listeners. riak_core_node_watcher:service_up(riak_repl, Pid), diff --git a/src/riak_repl_cinfo.erl b/src/riak_repl_cinfo.erl index a5ca9a1b..af419f47 100644 --- a/src/riak_repl_cinfo.erl +++ b/src/riak_repl_cinfo.erl @@ -24,4 +24,4 @@ cluster_info_generator_funs() -> ]. status(CPid) -> % CPid is the data collector's pid. - cluster_info:format(CPid, "~p\n", [riak_repl_console:status(quiet)]). + cluster_info:format(CPid, "~p\n", [riak_repl_console:status()]). diff --git a/src/riak_repl_console.erl b/src/riak_repl_console.erl index 6f044fb7..70c7e67a 100644 --- a/src/riak_repl_console.erl +++ b/src/riak_repl_console.erl @@ -1,144 +1,188 @@ %% Riak EnterpriseDS -%% Copyright 2007-2009 Basho Technologies, Inc. All Rights Reserved. +%% Copyright 2007-2015 Basho Technologies, Inc. All Rights Reserved. -module(riak_repl_console). --author('Andy Gross '). -include("riak_repl.hrl"). --export([add_listener/1, del_listener/1, add_nat_listener/1]). --export([add_site/1, del_site/1]). --export([status/1, start_fullsync/1, cancel_fullsync/1, - pause_fullsync/1, resume_fullsync/1]). --export([client_stats_rpc/0, server_stats_rpc/0]). +-behaviour(clique_handler). + +%% Clique behavior +-export([register_cli/0]). + +%% Main entry-point for script +-export([command/1]). + +%% Utility functions for script integration +-export([script_name/0, + register_command/4, + register_usage/2]). + +%% Stats functions +-export([status/0]). % NB: Used by riak_repl_cinfo + +-export([client_stats_rpc/0, + server_stats_rpc/0]). + -export([extract_rt_fs_send_recv_kbps/1]). --export([clustername/1, clusters/1,clusterstats/1, - connect/1, disconnect/1, connections/1, - realtime/1, fullsync/1, proxy_get/1 - ]). -export([rt_remotes_status/0, fs_remotes_status/0]). -export([get_config/0, - cluster_mgr_stats/0, leader_stats/0, client_stats/0, server_stats/0, coordinator_stats/0, - coordinator_srv_stats/0]). --export([modes/1, set_modes/1, get_modes/0, - max_fssource_node/1, - max_fssource_cluster/1, - max_fssink_node/1, - realtime_cascades/1, - cascades/1, - show_nat_map/1, - add_nat_map/1, - del_nat_map/1, - add_block_provider_redirect/1, - show_block_provider_redirect/1, - show_local_cluster_id/1, - delete_block_provider_redirect/1 - ]). - -add_listener(Params) -> - lager:warning(?V2REPLDEP, []), - Ring = get_ring(), - case add_listener_internal(Ring,Params) of - {ok, NewRing} -> - ok = maybe_set_ring(Ring, NewRing); - error -> error - end. + coordinator_srv_stats/0, + cluster_mgr_stats/0]). + +%% Modes functions +-export([set_modes/1, get_modes/0]). + +%% Ring utilities +-export([get_ring/0, maybe_set_ring/2]). + +%% Clique output utilities +-export([upgrade_warning/3, + output/1, + text_out/1, + text_out/2, + error_out/2]). + +-spec register_cli() -> ok. +register_cli() -> + ok = riak_repl_console13:register_cli(), + ok = riak_repl_console12:register_usage(), + ok = register_commands(), + ok = register_usage(). + +register_commands() -> + true = register_command(["status"], [], [], fun status/2), + true = register_command(["modes", "show"], [], [], fun modes_show/2), + true = register_command(["modes", "set"], + [{mode_repl12, [{longname, "v2"}, + {datatype, flag}]}, + {mode_repl13, [{longname, "v3"}, + {datatype, flag}]}], + [], + fun modes_set/2), + ok. -add_nat_listener(Params) -> - lager:warning(?V2REPLDEP, []), - Ring = get_ring(), - case add_nat_listener_internal(Ring, Params) of - {ok, NewRing} -> - ok = maybe_set_ring(Ring, NewRing); - error -> error - end. +register_usage() -> + true = register_usage([], fun repl_usage/0), + true = register_usage(["modes"], modes_usage()), + ok. -add_listener_internal(Ring, [NodeName, IP, Port]) -> - Listener = make_listener(NodeName, IP, Port), - case lists:member(Listener#repl_listener.nodename, riak_core_ring:all_members(Ring)) of - true -> - case catch rpc:call(Listener#repl_listener.nodename, - riak_repl_util, valid_host_ip, [IP]) of - true -> - NewRing = riak_repl_ring:add_listener(Ring, Listener), - {ok,NewRing}; - false -> - io:format("~p is not a valid IP address for ~p\n", - [IP, Listener#repl_listener.nodename]), - error; - Error -> - io:format("Node ~p must be available to add listener: ~p\n", - [Listener#repl_listener.nodename, Error]), - error - end; - false -> - io:format("~p is not a member of the cluster\n", [Listener#repl_listener.nodename]), - error +repl_usage() -> + EnabledModes = get_modes(), + ModeHelp = [{mode_repl13, riak_repl_console13:commands_usage()}, + {mode_repl12, riak_repl_console12:commands_usage()}], + ModesCommands = [ Commands || {Mode, Commands} <- ModeHelp, + lists:member(Mode, EnabledModes) ], + ["COMMAND [...]\n\n", + " Commands:\n", + " modes Show or set replication modes\n", + " status Display status and metrics\n\n", + string:join(ModesCommands,"\n\n")]. + +modes_usage() -> + "modes ( show | set [ v2=(on|off) ] [ v3=(on|off) ] )\n\n" + " Manipulate active replication modes.\n\n" + " Subcommands:\n" + " show Shows the active replication modes.\n" + " set Toggles active replication modes.\n\n" + " When setting modes, omitting the mode name is the same as `off`. New\n" + " clusters should use `v3` (previously `mode_repl13`) exclusively. `v2`\n" + " (previously `mode_repl12`) replication will be removed in a future release.". + +-spec script_name() -> string(). +script_name() -> + case get(script_name) of + undefined -> "riak-repl"; + Script -> Script end. -add_nat_listener_internal(Ring, [NodeName, IP, Port, PublicIP, PublicPort]) -> - case add_listener_internal(Ring, [NodeName, IP, Port]) of - {ok,NewRing} -> - case inet_parse:address(PublicIP) of - {ok, _} -> - NatListener = make_nat_listener(NodeName, IP, Port, PublicIP, PublicPort), - NewRing2 = riak_repl_ring:add_nat_listener(NewRing, NatListener), - {ok, NewRing2}; - {error, IPParseError} -> - io:format("Invalid NAT IP address: ~p~n", [IPParseError]), - error - end; - error -> - io:format("Error adding nat address. ~n"), - error +-spec register_command([string()], [{atom(),[{_,_}]}], [{atom(),[{_,_}]}], fun()) -> true. +register_command(Cmd, Keys, Flags, Fun) -> + clique:register_command(["riak-repl"|Cmd], Keys, Flags, Fun). + +-spec register_usage([string()], iolist() | fun(() -> iolist())) -> true. +register_usage(Cmd, Usage) -> + UsageFun = fun() -> + UsageStr = if is_function(Usage) -> Usage(); + true -> Usage + end, + ScriptName = script_name(), + erase(script_name), + [ScriptName, " ", UsageStr] + end, + %% TODO: specs are wrong on clique:register_usage/2 + clique_usage:register(["riak-repl"|Cmd], UsageFun). + +%% @doc Entry-point for all riak-repl commands. +-spec command([string()]) -> ok | error. +command([Script|Args]) -> + %% We stash the script name (which may be a partial or absolute + %% path) in the process dictionary so usage output can grab it + %% later. + put(script_name, Script), + %% We don't really want to touch legacy commands, so try to + %% dispatch them first. + case riak_repl_console12:dispatch(Args) of + %% If there's no matching legacy command (or usage should be + %% printed), try to "upgrade" the arguments to clique-style, + %% then invoke clique. + nomatch -> + NewCmd = upgrade(riak_repl_console13:upgrade(Args)), + Cmd = ["riak-repl" | NewCmd], + lager:debug("Running riak-repl command: Script: ~p Args: ~p UpgCmd: ~p RunCmd: ~p", [Script, Args, NewCmd, Cmd]), + clique:run(Cmd); + OkOrError -> + OkOrError end. -del_listener([NodeName, IP, Port]) -> - lager:warning(?V2REPLDEP, []), - Ring = get_ring(), - Listener = make_listener(NodeName, IP, Port), - NewRing0 = riak_repl_ring:del_listener(Ring, Listener), - NewRing = riak_repl_ring:del_nat_listener(NewRing0, Listener), - ok = maybe_set_ring(Ring, NewRing). - -add_site([IP, Port, SiteName]) -> - lager:warning(?V2REPLDEP, []), - Ring = get_ring(), - Site = make_site(SiteName, IP, Port), - NewRing = riak_repl_ring:add_site(Ring, Site), - ok = maybe_set_ring(Ring, NewRing). - -del_site([SiteName]) -> - lager:warning(?V2REPLDEP, []), - Ring = get_ring(), - NewRing = riak_repl_ring:del_site(Ring, SiteName), - ok = maybe_set_ring(Ring, NewRing). - -set_modes(Modes) -> - Ring = get_ring(), - NewRing = riak_repl_ring:set_modes(Ring, Modes), - ok = maybe_set_ring(Ring, NewRing). - -get_modes() -> - Ring = get_ring(), - riak_repl_ring:get_modes(Ring). - - -status([]) -> - status2(true); -status(quiet) -> - status2(false). - -status2(Verbose) -> +upgrade(["modes", "show"]=Args) -> + Args; +upgrade(["modes", "set"|_Rest]=Args) -> + Args; +upgrade(["modes"|[_|_]=Modes]=Args) -> + case upgrade_modes(Modes) of + Modes -> + Args; + NewModes -> + upgrade_warning(Args, "Use `modes set ~s`", [string:join(NewModes, " ")]), + ["modes", "set"|NewModes] + end; +upgrade(Args) -> + Args. + +upgrade_modes(Modes) -> + lists:filtermap(fun upgrade_mode/1, Modes). + +upgrade_mode("mode_repl13") -> {true, "v3=on"}; +upgrade_mode("mode_repl12") -> {true, "v2=on"}; +upgrade_mode("v3="++_=Mode) -> {true, Mode}; +upgrade_mode("v2="++_=Mode) -> {true, Mode}; +upgrade_mode(_) -> false. + +%%----------------------- +%% Command: status +%%----------------------- + +status([], []) -> + All = status(), + [clique_status:list(lists:filtermap(fun format_counter_stat/1, All))]; +status(_,_) -> + usage. + +status() -> + %% NB: We export this for compatibility with previous components, + %% but all formatting for output is now done when preparing return + %% values for clique. Config = get_config(), + Ring = get_ring(), Stats1 = riak_repl_stats:get_stats(), - RTRemotesStatus = rt_remotes_status(), - FSRemotesStatus = fs_remotes_status(), - PGRemotesStatus = pg_remotes_status(), + RTRemotesStatus = rt_remotes_status(Ring), + FSRemotesStatus = fs_remotes_status(Ring), + PGRemotesStatus = pg_remotes_status(Ring), LeaderStats = leader_stats(), ClientStats = client_stats(), ServerStats = server_stats(), @@ -153,27 +197,26 @@ status2(Verbose) -> Stats1++LeaderStats++ClientStats++ServerStats++ CoordStats++CoordSrvStats++CMgrStats++RTQStats++PGStats, SendRecvKbps = extract_rt_fs_send_recv_kbps(Most), - All = Most ++ SendRecvKbps, - if Verbose -> - format_counter_stats(All); - true -> - All - end. + Most ++ SendRecvKbps. -pg_remotes_status() -> - Ring = get_ring(), + +pg_remotes_status(Ring) -> Enabled = string:join(riak_repl_ring:pg_enabled(Ring),", "), [{proxy_get_enabled, Enabled}]. rt_remotes_status() -> - Ring = get_ring(), + rt_remotes_status(get_ring()). + +rt_remotes_status(Ring) -> Enabled = string:join(riak_repl_ring:rt_enabled(Ring),", "), Started = string:join(riak_repl_ring:rt_started(Ring),", "), [{realtime_enabled, Enabled}, {realtime_started, Started}]. fs_remotes_status() -> - Ring = get_ring(), + fs_remotes_status(get_ring()). + +fs_remotes_status(Ring) -> Sinks = riak_repl_ring:fs_enabled(Ring), RunningSinks = [Sink || Sink <- Sinks, cluster_fs_running(Sink)], [{fullsync_enabled, string:join(Sinks, ", ")}, @@ -183,562 +226,72 @@ cluster_fs_running(Sink) -> ClusterCoord = riak_repl2_fscoordinator_sup:coord_for_cluster(Sink), riak_repl2_fscoordinator:is_running(ClusterCoord). -start_fullsync([]) -> - lager:warning(?V2REPLDEP, []), - _ = [riak_repl_tcp_server:start_fullsync(Pid) || - Pid <- riak_repl_listener_sup:server_pids()], - ok. - -cancel_fullsync([]) -> - lager:warning(?V2REPLDEP, []), - _ = [riak_repl_tcp_server:cancel_fullsync(Pid) || - Pid <- riak_repl_listener_sup:server_pids()], - ok. - -pause_fullsync([]) -> - lager:warning(?V2REPLDEP, []), - _ = [riak_repl_tcp_server:pause_fullsync(Pid) || - Pid <- riak_repl_listener_sup:server_pids()], - ok. - -resume_fullsync([]) -> - lager:warning(?V2REPLDEP, []), - _ = [riak_repl_tcp_server:resume_fullsync(Pid) || - Pid <- riak_repl_listener_sup:server_pids()], - ok. - - -%% -%% Repl2 commands -%% -rtq_stats() -> - case erlang:whereis(riak_repl2_rtq) of - Pid when is_pid(Pid) -> - [{realtime_queue_stats, riak_repl2_rtq:status()}]; - _ -> [] - end. - -cluster_mgr_stats() -> - case erlang:whereis(riak_repl_leader_gs) of - Pid when is_pid(Pid) -> - ConnectedClusters = case riak_core_cluster_mgr:get_known_clusters() of - {ok, Clusters} -> - [erlang:list_to_binary(Cluster) || Cluster <- - Clusters]; - Error -> Error - end, - [{cluster_name, - erlang:list_to_binary(riak_core_connection:symbolic_clustername())}, - {cluster_leader, riak_core_cluster_mgr:get_leader()}, - {connected_clusters, ConnectedClusters}]; - _ -> [] - end. - -%% Show cluster stats for this node -clusterstats([]) -> - %% connection manager stats - CMStats = cluster_mgr_stats(), - CConnStats = riak_core_connection_mgr_stats:get_consolidated_stats(), - Stats = CMStats ++ CConnStats, - io:format("~p~n", [Stats]); -%% slice cluster stats by remote "IP:Port" or "protocol-id". -%% Example protocol-id is rt_repl -clusterstats([Arg]) -> - NWords = string:words(Arg, $:), - case NWords of - 1 -> - %% assume protocol-id - ProtocolId = list_to_atom(Arg), - CConnStats = riak_core_connection_mgr_stats:get_stats_by_protocol(ProtocolId), - CMStats = cluster_mgr_stats(), - Stats = CMStats ++ CConnStats, - io:format("~p~n", [Stats]); - 2 -> - Address = Arg, - IP = string:sub_word(Address, 1, $:), - PortStr = string:sub_word(Address, 2, $:), - {Port,_Rest} = string:to_integer(PortStr), - CConnStats = riak_core_connection_mgr_stats:get_stats_by_ip({IP,Port}), - CMStats = cluster_mgr_stats(), - Stats = CMStats ++ CConnStats, - io:format("~p~n", [Stats]); - _ -> - {error, {badarg, Arg}} - end. - -%% TODO: cluster naming belongs in riak_core_ring, not in riak_core_connection, but -%% not until we move all of the connection stuff to core. -clustername([]) -> - MyName = riak_core_connection:symbolic_clustername(), - io:format("~s~n", [MyName]), - ok; -clustername([ClusterName]) -> - ?LOG_USER_CMD("Set clustername to ~p", [ClusterName]), - riak_core_ring_manager:ring_trans(fun riak_core_connection:set_symbolic_clustername/2, - ClusterName), - ok. - -clusters([]) -> - {ok, Clusters} = riak_core_cluster_mgr:get_known_clusters(), - lists:foreach( - fun(ClusterName) -> - {ok,Members} = riak_core_cluster_mgr:get_ipaddrs_of_cluster(ClusterName), - IPs = [string_of_ipaddr(Addr) || Addr <- Members], - io:format("~s: ~p~n", [ClusterName, IPs]), - ok - end, - Clusters), - ok. - -string_of_ipaddr({IP, Port}) -> - lists:flatten(io_lib:format("~s:~p", [IP, Port])). - -choose_best_addr({cluster_by_addr, {IP,Port}}, _ClientAddr) -> - string_of_ipaddr({IP,Port}); -choose_best_addr({cluster_by_name, _}, ClientAddr) -> - string_of_ipaddr(ClientAddr). - -string_of_remote({cluster_by_addr, {IP,Port}}) -> - string_of_ipaddr({IP,Port}); -string_of_remote({cluster_by_name, ClusterName}) -> - ClusterName. - -%% Print info about this sink -%% Remote :: {ip,port} | ClusterName -showClusterConn({Remote,Pid}) -> - ConnName = string_of_remote(Remote), - PidStr = io_lib:format("~p", [Pid]), - %% try to get status from Pid of cluster control channel. - %% if we haven't connected successfully yet, it will time out, which we will fail - %% fast for since it's a local process, not a remote one. - try riak_core_cluster_conn:status(Pid, 2) of - {Pid, status, {ClientAddr, _Transport, Name, Members}} -> - IPs = [string_of_ipaddr(Addr) || Addr <- Members], - CAddr = choose_best_addr(Remote, ClientAddr), - io:format("~-20s ~-20s ~-15s ~p (via ~s)~n", - [ConnName, Name, PidStr,IPs, CAddr]); - {_StateName, SRemote} -> - io:format("~-20s ~-20s ~-15s (connecting to ~p)~n", - [ConnName, "", PidStr, string_of_remote(SRemote)]) - catch - 'EXIT':{timeout, _} -> - io:format("~-20s ~-20s ~-15s (status timed out)~n", - [ConnName, "", PidStr]) - end. - -connections([]) -> - %% get cluster manager's outbound connections to other "remote" clusters, - %% which for now, are all the "sinks". - {ok, Conns} = riak_core_cluster_mgr:get_connections(), - io:format("~-20s ~-20s ~-15s [Members]~n", ["Connection", "Cluster Name", ""]), - io:format("~-20s ~-20s ~-15s ---------~n", ["----------", "------------", "----------"]), - _ = [showClusterConn(Conn) || Conn <- Conns], - ok. - -connect([Address]) -> - ?LOG_USER_CMD("Connect to cluster at ~p", [Address]), - NWords = string:words(Address, $:), - case NWords of - 2 -> - IP = string:sub_word(Address, 1, $:), - PortStr = string:sub_word(Address, 2, $:), - connect([IP, PortStr]); - _ -> - io:format("Error: remote connection is missing port. Expected 'connect '~n"), - {error, {badarg, Address}} - end; -connect([IP, PortStr]) -> - ?LOG_USER_CMD("Connect to cluster at ~p:~p", [IP, PortStr]), - {Port,_Rest} = string:to_integer(PortStr), - case riak_core_connection:symbolic_clustername() of - "undefined" -> - io:format("Error: Unable to establish connections until local cluster is named.~n"), - io:format("First use 'riak-repl clustername '~n"), - {error, undefined_cluster_name}; - _Name -> - riak_core_cluster_mgr:add_remote_cluster({IP, Port}), - ok - end. - -%% remove a remote connection by clustername or by IP/Port address: -%% clustername -%% | ip:port -%% | ip port -disconnect([Address]) -> - ?LOG_USER_CMD("Disconnect from cluster at ~p", [Address]), - NWords = string:words(Address, $:), - case NWords of - 1 -> - Remote = Address, - %% TODO: need to wrap a single ring transition around all of these. - %% fullsync(["stop", Remote]), - %% fullsync(["disable", Remote]), - %% realtime(["stop", Remote]), - %% realtime(["disable", Remote]), - %% tear down cluster manager connection - riak_core_cluster_mgr:remove_remote_cluster(Remote), - ok; - 2 -> - IP = string:sub_word(Address, 1, $:), - PortStr = string:sub_word(Address, 2, $:), - _ = disconnect([IP, PortStr]), - ok; - _ -> - {error, {badarg, Address}} - end; -disconnect([IP, PortStr]) -> - ?LOG_USER_CMD("Disconnect from cluster at ~p:~p", [IP, PortStr]), - {Port,_Rest} = string:to_integer(PortStr), - riak_core_cluster_mgr:remove_remote_cluster({IP, Port}), - ok. - -realtime([Cmd, Remote]) -> - case Cmd of - "enable" -> - ?LOG_USER_CMD("Enable Realtime Replication to cluster ~p", [Remote]), - riak_repl2_rt:enable(Remote); - "disable" -> - ?LOG_USER_CMD("Disable Realtime Replication to cluster ~p", [Remote]), - riak_repl2_rt:disable(Remote); - "start" -> - ?LOG_USER_CMD("Start Realtime Replication to cluster ~p", [Remote]), - riak_repl2_rt:start(Remote); - "stop" -> - ?LOG_USER_CMD("Stop Realtime Replication to cluster ~p", [Remote]), - riak_repl2_rt:stop(Remote) - end, - ok; -realtime([Cmd]) -> - Remotes = riak_repl2_rt:enabled(), - _ = case Cmd of - "start" -> - ?LOG_USER_CMD("Start Realtime Replication to all connected clusters", - []), - _ = [riak_repl2_rt:start(Remote) || Remote <- Remotes]; - "stop" -> - ?LOG_USER_CMD("Stop Realtime Replication to all connected clusters", - []), - _ = [riak_repl2_rt:stop(Remote) || Remote <- Remotes] - end, - ok. - -fullsync([Cmd, Remote]) -> - Leader = riak_core_cluster_mgr:get_leader(), - case Cmd of - "enable" -> - ?LOG_USER_CMD("Enable Fullsync Replication to cluster ~p", [Remote]), - riak_core_ring_manager:ring_trans(fun - riak_repl_ring:fs_enable_trans/2, Remote), - _ = riak_repl2_fscoordinator_sup:start_coord(Leader, Remote), - ok; - "disable" -> - ?LOG_USER_CMD("Disable Fullsync Replication to cluster ~p", [Remote]), - riak_core_ring_manager:ring_trans(fun - riak_repl_ring:fs_disable_trans/2, Remote), - _ = riak_repl2_fscoordinator_sup:stop_coord(Leader, Remote), - ok; - "start" -> - ?LOG_USER_CMD("Start Fullsync Replication to cluster ~p", [Remote]), - Fullsyncs = riak_repl2_fscoordinator_sup:started(Leader), - case proplists:get_value(Remote, Fullsyncs) of - undefined -> - io:format("Fullsync not enabled for cluster ~p~n", [Remote]), - io:format("Use 'fullsync enable ~p' before start~n", [Remote]), - {error, not_enabled}; - Pid -> - riak_repl2_fscoordinator:start_fullsync(Pid), - ok - end; - "stop" -> - ?LOG_USER_CMD("Stop Fullsync Replication to cluster ~p", [Remote]), - Fullsyncs = riak_repl2_fscoordinator_sup:started(Leader), - case proplists:get_value(Remote, Fullsyncs) of - undefined -> - %% Fullsync is not enabled, but carry on quietly. - ok; - Pid -> - riak_repl2_fscoordinator:stop_fullsync(Pid), - ok - end - end; -fullsync([Cmd]) -> - Leader = riak_core_cluster_mgr:get_leader(), - Fullsyncs = riak_repl2_fscoordinator_sup:started(Leader), - case Cmd of - "start" -> - ?LOG_USER_CMD("Start Fullsync Replication to all connected clusters",[]), - _ = [riak_repl2_fscoordinator:start_fullsync(Pid) || {_, Pid} <- - Fullsyncs], - ok; - "stop" -> - ?LOG_USER_CMD("Stop Fullsync Replication to all connected clusters",[]), - _ = [riak_repl2_fscoordinator:stop_fullsync(Pid) || {_, Pid} <- - Fullsyncs], - ok - end, - ok. - -proxy_get([Cmd, Remote]) -> - case Cmd of - "enable" -> - ?LOG_USER_CMD("Enable Riak CS Proxy GET block provider for ~p",[Remote]), - riak_core_ring_manager:ring_trans(fun - riak_repl_ring:pg_enable_trans/2, Remote), - ok; - "disable" -> - ?LOG_USER_CMD("Disable Riak CS Proxy GET block provider for ~p",[Remote]), - riak_core_ring_manager:ring_trans(fun - riak_repl_ring:pg_disable_trans/2, Remote), - ok - end. - -modes([]) -> - CurrentModes = get_modes(), - io:format("Current replication modes: ~p~n",[CurrentModes]), - ok; -modes(NewModes) -> - ?LOG_USER_CMD("Set replication mode(s) to ~p",[NewModes]), - Modes = [ list_to_atom(Mode) || Mode <- NewModes], - set_modes(Modes), - modes([]). - -realtime_cascades(["always"]) -> - ?LOG_USER_CMD("Enable Realtime Replication cascading", []), - riak_core_ring_manager:ring_trans(fun - riak_repl_ring:rt_cascades_trans/2, always); -realtime_cascades(["never"]) -> - ?LOG_USER_CMD("Disable Realtime Replication cascading", []), - riak_core_ring_manager:ring_trans(fun - riak_repl_ring:rt_cascades_trans/2, never); -realtime_cascades([]) -> - Cascades = app_helper:get_env(riak_repl, realtime_cascades, always), - io:format("realtime_cascades: ~p~n", [Cascades]); -realtime_cascades(_Wut) -> - io:format("realtime_cascades either \"always\" or \"never\"~n"). - -cascades(Val) -> - realtime_cascades(Val). - -%% For each of these "max" parameter changes, we need to make an rpc multi-call to every node -%% so that all nodes have the new value in their application environment. That way, whoever -%% becomes the fullsync coordinator will have the correct values. TODO: what happens when a -%% machine bounces and becomes leader? It won't know the new value. Seems like we need a central -%% place to hold these configuration values. -max_fssource_node([]) -> - %% show the default so as not to confuse the user - io:format("max_fssource_node value = ~p~n", - [app_helper:get_env(riak_repl, max_fssource_node, - ?DEFAULT_SOURCE_PER_NODE)]); -max_fssource_node([FSSourceNode]) -> - NewVal = erlang:list_to_integer(FSSourceNode), - riak_core_util:rpc_every_member(?MODULE, max_fssource_node, [NewVal], ?CONSOLE_RPC_TIMEOUT), - ?LOG_USER_CMD("Set max number of Fullsync workers per Source node to ~p",[NewVal]), - max_fssource_node([]), - ok; -max_fssource_node(NewVal) -> - ?LOG_USER_CMD("Locally set max number of Fullsync workers to ~p",[NewVal]), - application:set_env(riak_repl, max_fssource_node, NewVal). - -max_fssource_cluster([]) -> - %% show the default so as not to confuse the user - io:format("max_fssource_cluster value = ~p~n", - [app_helper:get_env(riak_repl, max_fssource_cluster, - ?DEFAULT_SOURCE_PER_CLUSTER)]); -max_fssource_cluster([FSSourceCluster]) -> - NewVal = erlang:list_to_integer(FSSourceCluster), - riak_core_util:rpc_every_member(?MODULE, max_fssource_cluster, [NewVal], ?CONSOLE_RPC_TIMEOUT), - ?LOG_USER_CMD("Set max number of Fullsync workers for Source cluster to ~p",[NewVal]), - max_fssource_cluster([]), - ok; -max_fssource_cluster(NewVal) -> - ?LOG_USER_CMD("Locally set max number of Fullsync workersfor Source cluster to ~p",[NewVal]), - application:set_env(riak_repl, max_fssource_cluster, NewVal). - -max_fssink_node([]) -> - io:format("max_fssink_node value = ~p~n", - [app_helper:get_env(riak_repl, max_fssink_node, ?DEFAULT_MAX_SINKS_NODE)]); -max_fssink_node([FSSinkNode]) -> - NewVal = erlang:list_to_integer(FSSinkNode), - riak_core_util:rpc_every_member(?MODULE, max_fssink_node, [NewVal], ?CONSOLE_RPC_TIMEOUT), - ?LOG_USER_CMD("Set max number of Fullsync works per Sink node to ~p",[NewVal]), - max_fssink_node([]), - ok; -max_fssink_node(NewVal) -> - ?LOG_USER_CMD("Locally set max number of Fullsync workers per Sink node to ~p",[NewVal]), - application:set_env(riak_repl, max_fssink_node, NewVal). - -show_nat_map([]) -> - Ring = get_ring(), - io:format("Nat map: ~n"), - [io:format(" ~-21.. s -> ~s~n", - [print_ip_and_maybe_port(Int), print_ip_and_maybe_port(Ext)]) - || {Int, Ext} <- riak_repl_ring:get_nat_map(Ring)]. - -add_nat_map([External, Internal]) -> - case {parse_ip_and_maybe_port(External, false), - parse_ip_and_maybe_port(Internal, true)} of - {{error, Reason}, _} -> - io:format("Bad external IP ~p", [Reason]), - error; - {_, {error, Reason}} -> - io:format("Bad internal IP ~p", [Reason]), - error; - {ExternalIP, InternalIP} -> - ?LOG_USER_CMD("Add a NAT map from External IP ~p to Internal IP ~p", [ExternalIP, InternalIP]), - riak_core_ring_manager:ring_trans( - fun riak_repl_ring:add_nat_map/2, - {ExternalIP, InternalIP}), - ok - end. - -del_nat_map([External, Internal]) -> - case {parse_ip_and_maybe_port(External, false), - parse_ip_and_maybe_port(Internal, true)} of - {{error, Reason}, _} -> - io:format("Bad external IP ~p", [Reason]), - error; - {_, {error, Reason}} -> - io:format("Bad internal IP ~p", [Reason]), - error; - {ExternalIP, InternalIP} -> - ?LOG_USER_CMD("Delete a NAT map from External IP ~p to Internal IP ~p", [ExternalIP, InternalIP]), - riak_core_ring_manager:ring_trans( - fun riak_repl_ring:del_nat_map/2, - {ExternalIP, InternalIP}), - ok - end. - -% NB: the following commands are around the "Dead Cluster" redirect feature, -% 306. They all operate using cluster_id (tuple), not clustername, for now, as -% of this writing we had no reliable way to map a clustername to an id -% over disterlang. When this API becomes available, this feature may use -% it. -add_block_provider_redirect([FromClusterId, ToClusterId]) -> - lager:info("Redirecting cluster id: ~p to ~p", [FromClusterId, ToClusterId]), - riak_core_metadata:put({<<"replication">>, <<"cluster-mapping">>}, - FromClusterId, ToClusterId). - -show_block_provider_redirect([FromClusterId]) -> - case riak_core_metadata:get({<<"replication">>, <<"cluster-mapping">>}, FromClusterId) of - undefined -> - io:format("No mapping for ~p~n", [FromClusterId]); - ToClusterId -> - io:format("Cluster id ~p redirecting to cluster id ~p~n", [FromClusterId, ToClusterId]) - end. - -delete_block_provider_redirect([FromClusterId]) -> - lager:info("Deleting redirect to ~p", [FromClusterId]), - riak_core_metadata:delete({<<"replication">>, <<"cluster-mapping">>}, FromClusterId). - -show_local_cluster_id([]) -> - {ok, Ring} = riak_core_ring_manager:get_my_ring(), - ClusterId = lists:flatten( - io_lib:format("~p", [riak_core_ring:cluster_name(Ring)])), - io:format("local cluster id: ~p~n", [ClusterId]). - -%% helper functions - -parse_ip_and_maybe_port(String, Hostname) -> - case string:tokens(String, ":") of - [IPStr, PortStr] -> - case inet_parse:ipv4strict_address(IPStr) of - {ok, IP} -> - try list_to_integer(PortStr) of - Port -> - {IP, Port} - catch - _:_ -> - {error, {bad_port, PortStr}} - end; - _ when Hostname -> - case inet_gethost_native:gethostbyname(IPStr) of - {ok, _} -> - try list_to_integer(PortStr) of - Port -> - {IPStr, Port} - catch - _:_ -> - {error, {bad_port, PortStr}} - end; - _ -> - {error, {bad_ip, IPStr}} - end; - _ -> - {error, {bad_ip, IPStr}} - end; - [IPStr] -> - case inet_parse:ipv4strict_address(IPStr) of - {ok, IP} -> - IP; - _ when Hostname -> - case inet_gethost_native:gethostbyname(IPStr) of - {ok, _} -> - IPStr; - _ -> - {error, {bad_ip, IPStr}} - end; - _ -> - {error, {bad_ip, IPStr}} - end - end. - -%% helper functions - extract_rt_fs_send_recv_kbps(Most) -> RTSendKbps = sum_rt_send_kbps(Most), RTRecvKbps = sum_rt_recv_kbps(Most), FSSendKbps = sum_fs_send_kbps(Most), FSRecvKbps = sum_fs_recv_kbps(Most), [{realtime_send_kbps, RTSendKbps}, {realtime_recv_kbps, RTRecvKbps}, - {fullsync_send_kbps, FSSendKbps}, {fullsync_recv_kbps, FSRecvKbps}]. - -print_ip_and_maybe_port({IP, Port}) when is_tuple(IP) -> - [inet_parse:ntoa(IP), $:, integer_to_list(Port)]; -print_ip_and_maybe_port({Host, Port}) when is_list(Host) -> - [Host, $:, integer_to_list(Port)]; -print_ip_and_maybe_port(IP) when is_tuple(IP) -> - inet_parse:ntoa(IP); -print_ip_and_maybe_port(Host) when is_list(Host) -> - Host. - -format_counter_stats([]) -> ok; -format_counter_stats([{K,V}|T]) when is_list(K) -> - io:format("~s: ~p~n", [K,V]), - format_counter_stats(T); -%format_counter_stats([{K,V}|T]) when K == fullsync_coordinator -> -% io:format("V = ~p",[V]), -% case V of -% [] -> io:format("~s: {}~n",[K]); -% Val -> io:format("~s: ~s",[K,Val]) -% end, -% format_counter_stats(T); -format_counter_stats([{K,V}|T]) when K == client_rx_kbps; + {fullsync_send_kbps, FSSendKbps}, {fullsync_recv_kbps, FSRecvKbps}]. + +%% Filters and formats stats for output. +format_counter_stat({K,V}) when K == client_rx_kbps; K == client_tx_kbps; K == server_rx_kbps; K == server_tx_kbps -> - io:format("~s: ~w~n", [K,V]), - format_counter_stats(T); -format_counter_stats([{K,V}|T]) -> - io:format("~p: ~p~n", [K,V]), - format_counter_stats(T); -format_counter_stats([{_K,_IPAddr,_V}|T]) -> - %% Don't include per-IP stats in this output - %% io:format("~p(~p): ~p~n", [K,IPAddr,V]), - format_counter_stats(T). + {true, io_lib:format("~s: ~w~n", [K,V])}; +format_counter_stat({K,V}) -> + {true, io_lib:format("~p: ~p~n", [K,V])}; +format_counter_stat(_) -> + %% NB: this covers the {_K,_IPAddr,_V} clause in the previous + %% version. + false. + +%%----------------------- +%% Command: modes show +%%----------------------- + +modes_show([], []) -> + CurrentModes = get_modes(), + [clique_status:text(io_lib:format("Currently enabled replication modes: ~s~n", + [modes_to_string(CurrentModes)]))]; +modes_show(_, _) -> + usage. + +%%----------------------- +%% Command: modes set [v2=on|off] [v3=on|off] +%%----------------------- + +modes_set([_|_]=InModes, []) -> + InvalidModes = [ I || {I,_} <- InModes, + not lists:keymember(I, 1, ?REPL_MODES) ], + if InvalidModes == [] -> + NewModes = [ M || {M, true} <- InModes ], + ?LOG_USER_CMD("Set replication mode(s) to ~p",[NewModes]), + set_modes(NewModes), + text_out("Set enabled replication modes to: ~s",[modes_to_string(NewModes)]); + true -> + error_out("Invalid modes requested: ~p~n", [InvalidModes]) + end; +modes_set(_,_) -> + usage. -make_listener(NodeName, IP, Port) -> - #repl_listener{nodename=list_to_atom(NodeName), - listen_addr={IP, list_to_integer(Port)}}. +modes_to_string(Modes) -> + string:join([ mode_to_string(Mode) || Mode <- Modes ], ", "). -make_nat_listener(NodeName, IP, Port, PublicIP, PublicPort) -> - #nat_listener{nodename=list_to_atom(NodeName), - listen_addr={IP, list_to_integer(Port)}, - nat_addr={PublicIP, list_to_integer(PublicPort)}}. +mode_to_string(mode_repl12) -> "v2"; +mode_to_string(mode_repl13) -> "v3". +set_modes(Modes) -> + Ring = get_ring(), + NewRing = riak_repl_ring:set_modes(Ring, Modes), + ok = maybe_set_ring(Ring, NewRing). -make_site(SiteName, IP, Port) -> - #repl_site{name=SiteName, addrs=[{IP, list_to_integer(Port)}]}. +get_modes() -> + Ring = get_ring(), + riak_repl_ring:get_modes(Ring). +%% helper functions maybe_set_ring(_R, _R) -> ok; maybe_set_ring(_R1, R2) -> RC = riak_repl_ring:get_repl_config(R2), @@ -759,24 +312,22 @@ get_config() -> undefined -> []; Repl -> - case dict:find(sites, Repl) of - error -> - []; - {ok, Sites} -> - lists:flatten([format_site(S) || S <- Sites]) - end ++ - case dict:find(listeners, Repl) of - error -> - []; - {ok, Listeners} -> - lists:flatten([format_listener(L) || L <- Listeners]) - end ++ - case dict:find(natlisteners, Repl) of - error -> - []; - {ok, NatListeners} -> - lists:flatten([format_nat_listener(L) || L <- NatListeners]) - end + lists:foldr( + fun({Key, Formatter}, Acc) -> + format_config(Repl, Key, Formatter) ++ Acc + end, + [], + [{sites, fun format_site/1}, + {listeners, fun format_listener/1}, + {natlisteners, fun format_nat_listener/1}]) + end. + +format_config(Repl, Key, Formatter) -> + case dict:find(Key, Repl) of + error -> + []; + {ok, List} -> + lists:flatmap(Formatter, List) end. format_site(S) -> @@ -795,36 +346,36 @@ format_listener(L) -> format_nat_listener(L) -> [{"natlistener_" ++ atom_to_list(L#nat_listener.nodename), format_ip(L#nat_listener.listen_addr) ++ "->" ++ - format_ip(L#nat_listener.nat_addr)}]. + format_ip(L#nat_listener.nat_addr)}]. leader_stats() -> case erlang:whereis(riak_repl_leader_gs) of Pid when is_pid(Pid) -> LeaderNode = riak_repl_leader:leader_node(), LocalStats = - try - LocalProcInfo = erlang:process_info(whereis(riak_repl_leader_gs), - [message_queue_len, heap_size]), - [{"local_leader_" ++ atom_to_list(K), V} || {K,V} <- LocalProcInfo] - catch _:_ -> - [] - end, + try + LocalProcInfo = erlang:process_info(whereis(riak_repl_leader_gs), + [message_queue_len, heap_size]), + [{"local_leader_" ++ atom_to_list(K), V} || {K,V} <- LocalProcInfo] + catch _:_ -> + [] + end, RemoteStats = - try - LeaderPid = rpc:call(LeaderNode, erlang, whereis, - [riak_repl_leader_gs]), - LeaderStats = rpc:call(LeaderNode, erlang, process_info, - [LeaderPid, [message_queue_len, - total_heap_size, - heap_size, - stack_size, - reductions, - garbage_collection]]), - [{"leader_" ++ atom_to_list(K), V} || {K,V} <- LeaderStats] - catch - _:_ -> - [] - end, + try + LeaderPid = rpc:call(LeaderNode, erlang, whereis, + [riak_repl_leader_gs]), + LeaderStats = rpc:call(LeaderNode, erlang, process_info, + [LeaderPid, [message_queue_len, + total_heap_size, + heap_size, + stack_size, + reductions, + garbage_collection]]), + [{"leader_" ++ atom_to_list(K), V} || {K,V} <- LeaderStats] + catch + _:_ -> + [] + end, [{leader, LeaderNode}] ++ RemoteStats ++ LocalStats; _ -> [] end. @@ -833,13 +384,13 @@ client_stats() -> case erlang:whereis(riak_repl_leader_gs) of Pid when is_pid(Pid) -> %% NOTE: rpc:multicall to all clients removed - riak_repl_console:client_stats_rpc(); + ?MODULE:client_stats_rpc(); _ -> [] end. client_stats_rpc() -> RT2 = [rt2_sink_stats(P) || P <- riak_repl2_rt:get_sink_pids()] ++ - [fs2_sink_stats(P) || P <- riak_repl2_fssink_sup:started()], + [fs2_sink_stats(P) || P <- riak_repl2_fssink_sup:started()], Pids = [P || {_,P,_,_} <- supervisor:which_children(riak_repl_client_sup), P /= undefined], [{client_stats, [client_stats(P) || P <- Pids]}, {sinks, RT2}]. @@ -847,7 +398,7 @@ server_stats() -> case erlang:whereis(riak_repl_leader_gs) of Pid when is_pid(Pid) -> RT2 = [rt2_source_stats(P) || {_R,P} <- - riak_repl2_rtsource_conn_sup:enabled()], + riak_repl2_rtsource_conn_sup:enabled()], LeaderNode = riak_repl_leader:leader_node(), case LeaderNode of undefined -> @@ -860,6 +411,30 @@ server_stats() -> _ -> [] end. +rtq_stats() -> + case erlang:whereis(riak_repl2_rtq) of + Pid when is_pid(Pid) -> + [{realtime_queue_stats, riak_repl2_rtq:status()}]; + _ -> [] + end. + +cluster_mgr_stats() -> + case erlang:whereis(riak_repl_leader_gs) of + Pid when is_pid(Pid) -> + ConnectedClusters = case riak_core_cluster_mgr:get_known_clusters() of + {ok, Clusters} -> + [erlang:list_to_binary(Cluster) || Cluster <- + Clusters]; + Error -> Error + end, + [{cluster_name, + erlang:list_to_binary(riak_core_connection:symbolic_clustername())}, + {cluster_leader, riak_core_cluster_mgr:get_leader()}, + {connected_clusters, ConnectedClusters}]; + _ -> [] + end. + + server_stats_rpc() -> [server_stats(P) || P <- riak_repl_listener_sup:server_pids()]. @@ -918,7 +493,7 @@ rt2_source_stats(Pid) -> end, FormattedPid = riak_repl_util:safe_pid_to_list(Pid), {source_stats, [{pid,FormattedPid}, erlang:process_info(Pid, message_queue_len), - {rt_source_connected_to, State}]}. + {rt_source_connected_to, State}]}. rt2_sink_stats(Pid) -> Timeout = app_helper:get_env(riak_repl, status_timeout, 5000), @@ -931,18 +506,18 @@ rt2_sink_stats(Pid) -> %%{Pid, erlang:process_info(Pid, message_queue_len), State}. FormattedPid = riak_repl_util:safe_pid_to_list(Pid), {sink_stats, [{pid,FormattedPid}, erlang:process_info(Pid, message_queue_len), - {rt_sink_connected_to, State}]}. + {rt_sink_connected_to, State}]}. fs2_sink_stats(Pid) -> Timeout = app_helper:get_env(riak_repl, status_timeout, 5000), State = try - %% even though it's named legacy_status, it's BNW code - riak_repl2_fssink:legacy_status(Pid, Timeout) - catch - _:_ -> - too_busy - end, - %% {Pid, erlang:process_info(Pid, message_queue_len), State}. + %% even though it's named legacy_status, it's BNW code + riak_repl2_fssink:legacy_status(Pid, Timeout) + catch + _:_ -> + too_busy + end, + %% {Pid, erlang:process_info(Pid, message_queue_len), State}. {sink_stats, [{pid,riak_repl_util:safe_pid_to_list(Pid)}, erlang:process_info(Pid, message_queue_len), {fs_connected_to, State}]}. @@ -957,14 +532,14 @@ sum_rt_kbps(Stats, KbpsDirection) -> Sinks = proplists:get_value(sinks, Stats, []), Sources = proplists:get_value(sources, Stats, []), Kbpss = lists:foldl(fun({StatKind, SinkProps}, Acc) -> - Path1 = case StatKind of - sink_stats -> rt_sink_connected_to; - source_stats -> rt_source_connected_to; - _Else -> not_found - end, - KbpsStr = proplists_get([Path1, socket, KbpsDirection], SinkProps, "[]"), - get_first_kbsp(KbpsStr) + Acc - end, 0, Sinks ++ Sources), + Path1 = case StatKind of + sink_stats -> rt_sink_connected_to; + source_stats -> rt_source_connected_to; + _Else -> not_found + end, + KbpsStr = proplists_get([Path1, socket, KbpsDirection], SinkProps, "[]"), + get_first_kbsp(KbpsStr) + Acc + end, 0, Sinks ++ Sources), Kbpss. sum_fs_send_kbps(Stats) -> @@ -976,40 +551,40 @@ sum_fs_recv_kbps(Stats) -> sum_fs_kbps(Stats, Direction) -> Coordinators = proplists:get_value(fullsync_coordinator, Stats), CoordFoldFun = fun({_SinkName, FSCoordStats}, Acc) -> - CoordKbpsStr = proplists_get([socket, Direction], FSCoordStats, "[]"), - CoordKbps = get_first_kbsp(CoordKbpsStr), - CoordSourceKpbs = sum_fs_source_kbps(FSCoordStats, Direction), - SinkKbps = sum_fs_sink_kbps(Stats, Direction), - Acc + CoordKbps + CoordSourceKpbs + SinkKbps - end, + CoordKbpsStr = proplists_get([socket, Direction], FSCoordStats, "[]"), + CoordKbps = get_first_kbsp(CoordKbpsStr), + CoordSourceKpbs = sum_fs_source_kbps(FSCoordStats, Direction), + SinkKbps = sum_fs_sink_kbps(Stats, Direction), + Acc + CoordKbps + CoordSourceKpbs + SinkKbps + end, CoordSrvs = proplists:get_value(fullsync_coordinator_srv, Stats), CoordSrvsFoldFun = fun({_IPPort, SrvStats}, Acc) -> - KbpsStr = proplists_get([socket, Direction], SrvStats, "[]"), - Kbps = get_first_kbsp(KbpsStr), - Kbps + Acc - end, + KbpsStr = proplists_get([socket, Direction], SrvStats, "[]"), + Kbps = get_first_kbsp(KbpsStr), + Kbps + Acc + end, lists:foldl(CoordFoldFun, 0, Coordinators) + lists:foldl(CoordSrvsFoldFun, 0, CoordSrvs). sum_fs_source_kbps(CoordStats, Direction) -> Running = proplists:get_value(running_stats, CoordStats, []), FoldFun = fun({_Pid, Stats}, Acc) -> - KbpsStr = proplists_get([socket, Direction], Stats, "[]"), - Kbps = get_first_kbsp(KbpsStr), - Acc + Kbps - end, + KbpsStr = proplists_get([socket, Direction], Stats, "[]"), + Kbps = get_first_kbsp(KbpsStr), + Acc + Kbps + end, lists:foldl(FoldFun, 0, Running). sum_fs_sink_kbps(Stats, Direction) -> Sinks = proplists:get_value(sinks, Stats, []), FoldFun = fun({sink_stats, SinkStats}, Acc) -> - case proplists_get([fs_connected_to, socket, Direction], SinkStats) of - undefined -> - Acc; - KbpsStr -> - Kbps = get_first_kbsp(KbpsStr), - Acc + Kbps - end - end, + case proplists_get([fs_connected_to, socket, Direction], SinkStats) of + undefined -> + Acc; + KbpsStr -> + Kbps = get_first_kbsp(KbpsStr), + Acc + Kbps + end + end, lists:foldl(FoldFun, 0, Sinks). proplists_get(Path, Props) -> @@ -1047,3 +622,33 @@ simple_parse(Str) -> {ok, AbsForm} = erl_parse:parse_exprs(Tokens), {value, Value, _Bs} = erl_eval:exprs(AbsForm, erl_eval:new_bindings()), Value. + + +%% @doc Registers a warning about using a deprecated form of a +%% command. +upgrade_warning(Args, Fmt, FArgs) -> + put(upgrade_warning, {string:join(Args, " "), io_lib:format(Fmt, FArgs)}). + +-spec output(clique_status:status()) -> clique_status:status(). +output(CmdOut) -> + case get(upgrade_warning) of + undefined -> CmdOut; + {Arguments, Message} -> + erase(upgrade_warning), + error_msg("The command form `~s` is deprecated. ~s~n", [Arguments, Message]) ++ CmdOut + end. + +error_out(Fmt, Args) -> + output(error_msg(Fmt, Args)). + +error_msg(Fmt, Args) -> + [clique_status:alert(text_msg(Fmt, Args))]. + +text_out(Str) -> + text_out(Str, []). + +text_out(Str, Args) -> + output(text_msg(Str, Args)). + +text_msg(Fmt, Args) -> + [clique_status:text(io_lib:format(Fmt, Args))]. diff --git a/src/riak_repl_console12.erl b/src/riak_repl_console12.erl new file mode 100644 index 00000000..b02defe8 --- /dev/null +++ b/src/riak_repl_console12.erl @@ -0,0 +1,207 @@ +%% @doc Console commands for "Version 2" replication, aka +%% 'mode_repl12'. +-module(riak_repl_console12). +-include("riak_repl.hrl"). + +%% Points where riak_repl_console calls in. +-export([register_usage/0, dispatch/1, commands_usage/0]). + +%% Commands +-export([add_listener/1, del_listener/1, add_nat_listener/1, + add_site/1, del_site/1, + start_fullsync/1, cancel_fullsync/1, + pause_fullsync/1, resume_fullsync/1]). + +-import(riak_repl_console, + [register_usage/2, get_ring/0, + maybe_set_ring/2]). + +-define(USAGE, + [{"add-listener", "add-listener "}, + {"add-nat-listener", "add-nat-listener "}, + {"del-listener", "del-listener "}, + {"add-site", "add-site "}, + {"del-site", "del-site "}, + {"start-fullsync", "start-fullsync"}, + {"cancel-fullsync", "cancel-fullsync"}, + {"pause-fullsync", "pause-fullsync"}, + {"resume-fullsync", "resume-fullsync"} + ]). + +%% @doc Attempts to dispatch a legacy command, returning `nomatch' if +%% the command did not match by name or number of arguments. If the +%% last thing on the line is `-h' or `--help', `nomatch' is returned +%% so that usage can be printed. +-spec dispatch([string()]) -> ok | error | nomatch. +dispatch([]) -> + nomatch; +dispatch(Cmd) -> + case lists:last(Cmd) of + "--help" -> nomatch; + "-h" -> nomatch; + _ -> dispatch_internal(Cmd) + end. + +dispatch_internal(["add-listener"|[_,_,_]=Params]) -> + add_listener(Params); +dispatch_internal(["add-nat-listener"|[_,_,_,_,_]=Params]) -> + add_nat_listener(Params); +dispatch_internal(["del-listener"|[_,_,_]=Params]) -> + del_listener(Params); +dispatch_internal(["add-site"|[_,_,_]=Params]) -> + add_site(Params); +dispatch_internal(["del-site"|[_]=Params]) -> + del_site(Params); +dispatch_internal(["start-fullsync"]) -> + start_fullsync([]); +dispatch_internal(["cancel-fullsync"]) -> + cancel_fullsync([]); +dispatch_internal(["pause-fullsync"]) -> + pause_fullsync([]); +dispatch_internal(["resume-fullsync"]) -> + resume_fullsync([]); +dispatch_internal(_) -> nomatch. + +%% @doc List of implemented commands for this module, for printing out +%% at the top-level command. +-spec commands_usage() -> string(). +commands_usage() -> + " Version 2 Commands:\n" + " add-listener Add a sink listener\n" + " add-nat-listener Add a sink listener with NAT\n" + " add-site Add a sink site\n" + " cancel-fullsync Cancel running fullsync replication\n" + " del-listener Delete a sink listener\n" + " del-site Delete a sink site\n" + " pause-fullsync Pause running fullsync replication\n" + " resume-fullsync Resume paused fullsync replication\n" + " start-fullsync Start fullsync replication". + +%% @doc Registers usage output with Clique. +register_usage() -> + _ = [ true = register_usage([Cmd], [UsageStr, "\n\n", ?V2REPLDEP, "\n\n"]) || + {Cmd, UsageStr} <- ?USAGE ], + ok. + +add_listener(Params) -> + warn_v2_repl(), + Ring = get_ring(), + case add_listener_internal(Ring,Params) of + {ok, NewRing} -> + ok = maybe_set_ring(Ring, NewRing); + error -> error + end. + +warn_v2_repl() -> + io:format(?V2REPLDEP++"~n~n", []), + lager:warning(?V2REPLDEP, []). + +add_nat_listener(Params) -> + lager:warning(?V2REPLDEP, []), + Ring = get_ring(), + case add_nat_listener_internal(Ring, Params) of + {ok, NewRing} -> + ok = maybe_set_ring(Ring, NewRing); + error -> error + end. + +add_listener_internal(Ring, [NodeName, IP, Port]) -> + Listener = make_listener(NodeName, IP, Port), + case lists:member(Listener#repl_listener.nodename, riak_core_ring:all_members(Ring)) of + true -> + case catch rpc:call(Listener#repl_listener.nodename, + riak_repl_util, valid_host_ip, [IP]) of + true -> + NewRing = riak_repl_ring:add_listener(Ring, Listener), + {ok,NewRing}; + false -> + io:format("~p is not a valid IP address for ~p\n", + [IP, Listener#repl_listener.nodename]), + error; + Error -> + io:format("Node ~p must be available to add listener: ~p\n", + [Listener#repl_listener.nodename, Error]), + error + end; + false -> + io:format("~p is not a member of the cluster\n", [Listener#repl_listener.nodename]), + error + end. + +add_nat_listener_internal(Ring, [NodeName, IP, Port, PublicIP, PublicPort]) -> + case add_listener_internal(Ring, [NodeName, IP, Port]) of + {ok,NewRing} -> + case inet_parse:address(PublicIP) of + {ok, _} -> + NatListener = make_nat_listener(NodeName, IP, Port, PublicIP, PublicPort), + NewRing2 = riak_repl_ring:add_nat_listener(NewRing, NatListener), + {ok, NewRing2}; + {error, IPParseError} -> + io:format("Invalid NAT IP address: ~p~n", [IPParseError]), + error + end; + error -> + io:format("Error adding nat address. ~n"), + error + end. + +del_listener([NodeName, IP, Port]) -> + lager:warning(?V2REPLDEP, []), + Ring = get_ring(), + Listener = make_listener(NodeName, IP, Port), + NewRing0 = riak_repl_ring:del_listener(Ring, Listener), + NewRing = riak_repl_ring:del_nat_listener(NewRing0, Listener), + ok = maybe_set_ring(Ring, NewRing). + +add_site([IP, Port, SiteName]) -> + lager:warning(?V2REPLDEP, []), + Ring = get_ring(), + Site = make_site(SiteName, IP, Port), + NewRing = riak_repl_ring:add_site(Ring, Site), + ok = maybe_set_ring(Ring, NewRing). + +del_site([SiteName]) -> + lager:warning(?V2REPLDEP, []), + Ring = get_ring(), + NewRing = riak_repl_ring:del_site(Ring, SiteName), + ok = maybe_set_ring(Ring, NewRing). + +start_fullsync([]) -> + lager:warning(?V2REPLDEP, []), + _ = [riak_repl_tcp_server:start_fullsync(Pid) || + Pid <- riak_repl_listener_sup:server_pids()], + io:format("Fullsync started~n"), + ok. + +cancel_fullsync([]) -> + lager:warning(?V2REPLDEP, []), + _ = [riak_repl_tcp_server:cancel_fullsync(Pid) || + Pid <- riak_repl_listener_sup:server_pids()], + io:format("Fullsync canceled~n"), + ok. + +pause_fullsync([]) -> + lager:warning(?V2REPLDEP, []), + _ = [riak_repl_tcp_server:pause_fullsync(Pid) || + Pid <- riak_repl_listener_sup:server_pids()], + io:format("Fullsync paused~n"), + ok. + +resume_fullsync([]) -> + lager:warning(?V2REPLDEP, []), + _ = [riak_repl_tcp_server:resume_fullsync(Pid) || + Pid <- riak_repl_listener_sup:server_pids()], + io:format("Fullsync resumed~n"), + ok. + +make_listener(NodeName, IP, Port) -> + #repl_listener{nodename=list_to_atom(NodeName), + listen_addr={IP, list_to_integer(Port)}}. + +make_nat_listener(NodeName, IP, Port, PublicIP, PublicPort) -> + #nat_listener{nodename=list_to_atom(NodeName), + listen_addr={IP, list_to_integer(Port)}, + nat_addr={PublicIP, list_to_integer(PublicPort)}}. + +make_site(SiteName, IP, Port) -> + #repl_site{name=SiteName, addrs=[{IP, list_to_integer(Port)}]}. diff --git a/src/riak_repl_console13.erl b/src/riak_repl_console13.erl new file mode 100644 index 00000000..a8ea2573 --- /dev/null +++ b/src/riak_repl_console13.erl @@ -0,0 +1,1059 @@ +%% @doc Console commands for "Version 3" replication, aka +%% 'mode_repl13'. +-module(riak_repl_console13). +-include("riak_repl.hrl"). +-export([register_cli/0, commands_usage/0, upgrade/1]). + +-import(clique_status, [text/1, table/1]). + +-export([cluster_mgr_stats/0]). + +-import(riak_repl_console, + [register_command/4, + script_name/0, + get_ring/0, + output/1, + text_out/1, + text_out/2, + error_out/2, + upgrade_warning/3]). + +%%----------------------- +%% Interface +%%----------------------- + +-spec register_cli() -> ok. +register_cli() -> + ok = register_commands(), + ok = register_usage(), + ok = register_configs(), + ok. + +register_commands() -> + true = register_command(["clusterstats"], [], + [{host, [{longname, "host"}, + {datatype, ip}]}, + {protocol, [{longname, "protocol"}, + {datatype, atom}]}], + fun clusterstats/2), + true = register_command(["clustername", "show"], + [],[], + fun clustername_show/2), + true = register_command(["clustername", "set"], + [{name, [{longname, "name"}, + {datatype, string}]}], + [], + fun clustername_set/2), + true = register_command(["clusters"], [], [], fun clusters/2), + true = register_command(["connections"], [], [], fun connections/2), + true = register_command(["connect"], [{address, [{datatype, ip}]}], [], + fun connect/2), + %% TODO: disconnect should allow a clustername instead of IP, but + %% the "list of datatypes" thing is not part of + %% cuttlefish_datatypes, but of cuttlefish_generator. + true = register_command(["disconnect"], [{remote, [{datatype, ip}]}], [], + fun disconnect/2), + true = register_command(["realtime", "enable"], + [{remote, [{datatype, string}]}], + [], + fun realtime_enable/2), + true = register_command(["realtime", "disable"], + [{remote, [{datatype, string}]}], + [], + fun realtime_disable/2), + true = register_command(["realtime", "start"], + [{remote, [{datatype, string}]}], + [{all, [{longname, "all"}, + {shortname, "a"}]}], + fun realtime_start/2), + true = register_command(["realtime", "stop"], + [{remote, [{datatype, string}]}], + [{all, [{longname, "all"}, + {shortname, "a"}]}], + fun realtime_stop/2), + true = register_command(["realtime", "cascades", "enable"], + [],[], + fun realtime_cascades_enable/2), + true = register_command(["realtime", "cascades", "disable"], + [],[], + fun realtime_cascades_disable/2), + true = register_command(["realtime", "cascades", "show"], + [],[], + fun realtime_cascades_show/2), + true = register_command(["fullsync", "enable"], + [{remote, [{datatype, string}]}], + [], + fun fullsync_enable/2), + true = register_command(["fullsync", "disable"], + [{remote, [{datatype, string}]}], + [], + fun fullsync_disable/2), + true = register_command(["fullsync", "start"], + [{remote, [{datatype, string}]}], + [{all, [{longname, "all"}, + {shortname, "a"}]}], + fun fullsync_start/2), + true = register_command(["fullsync", "stop"], + [{remote, [{datatype, string}]}], + [{all, [{longname, "all"}, + {shortname, "a"}]}], + fun fullsync_stop/2), + true = register_command(["proxy-get", "enable"], + [{remote, [{datatype, string}]}], + [], fun proxy_get_enable/2), + true = register_command(["proxy-get", "disable"], + [{remote, [{datatype, string}]}], + [], fun proxy_get_disable/2), + true = register_command(["proxy-get", "redirect", "cluster-id"], + [], + [], fun proxy_get_redirect_cluster_id/2), + true = register_command(["proxy-get", "redirect", "show"], + [{from, [{datatype, string}]}], + [], fun proxy_get_redirect_show/2), + true = register_command(["proxy-get", "redirect", "add"], + [{from, [{datatype, string}]}, + {to, [{datatype, string}]}], + [], fun proxy_get_redirect_add/2), + true = register_command(["proxy-get", "redirect", "delete"], + [{from, [{datatype, string}]}, + {to, [{datatype, string}]}], + [], fun proxy_get_redirect_delete/2), + true = register_command(["nat-map", "show"], + [], [], + fun nat_map_show/2), + %% TODO: nat-map add|delete should allow a hostname instead of IP, + %% but the "list of datatypes" thing is not part of + %% cuttlefish_datatypes, but of cuttlefish_generator. + true = register_command(["nat-map", "add"], + [{external, [{datatype, ip}]}, + {internal, [{datatype, string}]}], + [], + fun nat_map_add/2), + true = register_command(["nat-map", "delete"], + [{external, [{datatype, ip}]}, + {internal, [{datatype, string}]}], + [], + fun nat_map_delete/2), + ok. + + +register_usage(Cmd, Usage) -> + riak_repl_console:register_usage(Cmd, Usage). + +register_usage() -> + true = register_usage(["clusterstats"], + "clusterstats [ --protocol=PROTO | --host=IP:PORT ]\n\n" + " Displays cluster statistics, optionally filtered by a protocol or host connection.\n\n" + " Options:\n" + " --protocol=PROTO Filters to a protocol where PROTO is one of:\n" + " rt_repl, proxy_get, identity\n" + " --host=IP:PORT Filters to a specific host, identified by IP and PORT"), + true = register_usage(["clustername"], + "clustername ( show | set name=NAME )\n\n" + " Shows or sets the symbolic clustername."), + true = register_usage(["clusters"], + "clusters\n\n" + " Displays information about known clusters."), + true = register_usage(["connections"], + "connections\n\n" + " Displays a list of current replication connections."), + true = register_usage(["connect"], + "connect address=IP:PORT\n\n" + " Connects to a remote cluster."), + true = register_usage(["disconnect"], + "disconnect remote=(IP:PORT | NAME)\n\n" + " Disconnects from a connected remote cluster."), + true = register_usage(["realtime"], realtime_usage()), + true = register_usage(["realtime", "enable"], realtime_enable_disable_usage()), + true = register_usage(["realtime", "disable"], realtime_enable_disable_usage()), + true = register_usage(["realtime", "start"], realtime_start_stop_usage()), + true = register_usage(["realtime", "stop"], realtime_start_stop_usage()), + true = register_usage(["realtime", "cascades"], realtime_cascades_usage()), + true = register_usage(["fullsync"], fullsync_usage()), + true = register_usage(["fullsync", "enable"], fullsync_enable_disable_usage()), + true = register_usage(["fullsync", "disable"], fullsync_enable_disable_usage()), + true = register_usage(["fullsync", "start"], fullsync_start_stop_usage()), + true = register_usage(["fullsync", "stop"], fullsync_start_stop_usage()), + true = register_usage(["proxy-get"], proxy_get_usage()), + true = register_usage(["proxy-get", "enable"], proxy_get_enable_disable_usage()), + true = register_usage(["proxy-get", "disable"], proxy_get_enable_disable_usage()), + true = register_usage(["proxy-get", "redirect"], proxy_get_redirect_usage()), + true = register_usage(["proxy-get", "redirect", "show"], fun proxy_get_redirect_show_usage/0), + true = register_usage(["proxy-get", "redirect", "add"], fun proxy_get_redirect_add_delete_usage/0), + true = register_usage(["proxy-get", "redirect", "delete"], fun proxy_get_redirect_add_delete_usage/0), + true = register_usage(["proxy-get", "redirect", "cluster-id"], proxy_get_redirect_usage()), + true = register_usage(["nat-map"], nat_map_usage()), + true = register_usage(["nat-map", "add"], nat_map_add_del_usage()), + true = register_usage(["nat-map", "delete"], nat_map_add_del_usage()), + ok. + + +register_configs() -> + %% "mdc.fullsync.source.max_workers_per_node" + %% "mdc.fullsync.source.max_workers_per_cluster" + %% "mdc.fullsync.sink.max_workers_per_node" + Keys = ["mdc.fullsync.source.max_workers_per_node", + "mdc.fullsync.source.max_workers_per_cluster", + "mdc.fullsync.sink.max_workers_per_node"], + [true, true, true] = [ clique:register_config(cuttlefish_variable:tokenize(Key), + fun set_fullsync_limit/3) || Key <- Keys ], + ok = clique:register_config_whitelist(Keys), + ok. + +-spec commands_usage() -> string(). +commands_usage() -> + " Version 3 Commands:\n" + " clustername Show or set the cluster name\n" + " clusterstats Display cluster stats\n" + " clusters Display known clusters\n" + " connect Connect to a remote cluster\n" + " connections Display a list of connections\n" + " disconnect Disconnect from a remote cluster\n" + " fullsync Manipulate fullsync replication\n" + " nat-map Manipulate NAT mappings\n" + " proxy-get Manipulate proxy-get\n" + " realtime Manipulate realtime replication". + +realtime_usage() -> + "realtime [ ...]\n\n" + " Manipulate realtime replication. Realtime replication streams\n" + " incoming writes on the source cluster to the sink cluster(s).\n\n" + " Sub-commands:\n" + " enable Enable realtime replication\n" + " disable Disable realtime replication\n" + " start Start realtime replication\n" + " stop Stop realtime replication\n" + " cascades Manipulate cascading realtime replication". + +realtime_cascades_usage() -> + "realtime cascades SUBCOMMAND\n\n" + " Manipulate cascading realtime replication. When this cluster is a\n" + " sink and is receiving realtime replication, it can propagate\n" + " incoming writes to any clusters for which it is a source and\n" + " realtime replication is enabled.\n\n" + " Sub-commands:\n" + " enable Enable cascading realtime replication\n" + " disable Disable cascading realtime replication\n" + " show Show the current cascading realtime replication setting". + +realtime_enable_disable_usage() -> + "realtime ( enable | disable ) remote=CLUSTERNAME\n\n" + " Enable or disable realtime replication to CLUSTERNAME.". + +realtime_start_stop_usage() -> + "realtime ( start | stop ) ( remote=CLUSTERNAME | --all )\n\n" + " Start or stop realtime replication. When 'remote' is given, only\n" + " the specified sink CLUSTERNAME will be affected. When --all is given,\n" + " all realtime replication to all sinks will be started or stopped.". + +fullsync_usage() -> + "fullsync SUBCOMMAND ...\n\n" + " Manipulate fullsync replication. Fullsync replication compares data\n" + " on the source and the sink and then sends detected differences to\n" + " the sink cluster.\n\n" + " Sub-commands:\n" + " enable Enable fullsync replication\n" + " disable Disable fullsync replication\n" + " start Start fullsync replication\n" + " stop Stop fullsync replication\n". + +fullsync_enable_disable_usage() -> + "fullsync ( enable | disable ) remote=CLUSTERNAME\n\n" + " Enable or disable fullsync replication to CLUSTERNAME.". + +fullsync_start_stop_usage() -> + "fullsync ( start | stop ) ( remote=CLUSTERNAME | --all )\n\n" + " Start or stop fullsync replication. When 'remote' is given, only\n" + " the specified sink CLUSTERNAME will be affected. When --all is given,\n" + " all realtime replication to all sinks will be started or stopped.". + +proxy_get_usage() -> + "proxy-get SUBCOMMAND ...\n\n" + " Manipulate proxy-get functionality. Proxy-get allows sink clusters\n" + " to actively fetch remote objects over a realtime replication\n" + " connection. Currently, this is only used by Riak CS.\n\n" + " Sub-commands:\n" + " enable Enable proxy-get on the source\n" + " disable Disable proxy-get on the source\n" + " redirect Manipulation proxy-get redirection". + +proxy_get_enable_disable_usage() -> + "proxy-get ( enable | disable ) remote=CLUSTERNAME\n\n" + " Enables or disables proxy-get requests from sink CLUSTERNAME to this\n" + " source cluster.". + +proxy_get_redirect_usage() -> + "proxy-get redirect SUBCOMMAND ...\n\n" + " Manipulate proxy-get redirection functionality. Redirection allows\n" + " existing proxy-get connections to be redirected to new source\n" + " clusters so that the original source cluster can be decommissioned.\n\n" + " Sub-commands:\n" + " add Add a proxy-get redirection\n" + " delete Delete an existing proxy-get redirection\n" + " show Show a proxy-get redirection\n" + " cluster-id Display the local cluster's identifier". + +proxy_get_redirect_show_usage() -> + "proxy-get redirect show from=SOURCE\n\n" + " Show an existing proxy-get redirection. SOURCE must correspond to\n" + " the result from the `" ++ script_name() ++ "proxy-get redirect cluster-id` command.". + +proxy_get_redirect_add_delete_usage() -> + "proxy-get redirect ( add | delete ) from=SOURCE to=DESTINATION\n\n" + " Add or delete a proxy-get redirection. Arguments SOURCE and\n" + " DESTINATION must correspond to the result from the `" ++ script_name() ++ "\n" + " proxy-get redirect cluster-id` command.". + +nat_map_usage() -> + "nat-map SUBCOMMAND\n\n" + " Manipulate NAT mappings. NAT mappings allow replication connections\n" + " to traverse firewalls between private networks on previously\n" + " configured ports.\n\n" + " Sub-commands:\n" + " add Add a NAT mapping\n" + " delete Delete a NAT mapping\n" + " show Display the NAT mapping table". + +nat_map_add_del_usage() -> + "nat-map ( add | delete ) external=EXTERNAL_IF internal=INTERNAL_IP\n\n" + " Add or delete a NAT mapping from the given external IP to the given internal" + " IP. An optional external port can be supplied.". + +upgrade(["clustername", Cmd|_]=Args) when Cmd == "show"; Cmd == "set" -> + %% Don't upgrade a call that includes a command + Args; +upgrade(["clustername", Arg]=Args) -> + upgrade_warning(Args, "Use `clustername set name=~s`", [Arg]), + ["clustername", "set", "name="++Arg]; +upgrade(["clusterstats", [$-|_]|_]=Args) -> + %% Don't upgrade a call that includes a flag + Args; +upgrade(["clusterstats", Arg]=Args) -> + case string:words(Arg, $:) of + 1 -> + upgrade_warning(Args, "Use `clusterstats --protocol ~s`", [Arg]), + ["clusterstats", "--protocol", Arg]; + 2 -> + upgrade_warning(Args, "Use `clusterstats --host ~s`", [Arg]), + ["clusterstats", "--host", Arg]; + _ -> Args + end; +upgrade(["connect", Arg]=Args) -> + case string:words(Arg, $=) of + 2 -> Args; + 1 -> + upgrade_warning(Args, "Use `connect address=~s`", [Arg]), + ["connect", "address="++Arg]; + _ -> Args + end; +upgrade(["disconnect", Arg]=Args) -> + case string:words(Arg, $=) of + 2 -> Args; + 1 -> + upgrade_warning(Args, "Use `disconnect remote=~s`", [Arg]), + ["disconnect", "remote="++Arg] + end; +upgrade(["realtime", Command, [$-|_]|_Rest]=Args) when Command == "enable"; + Command == "disable"; + Command == "start"; + Command == "stop" -> + Args; +upgrade(["realtime", Command, Arg]=Args) when Command == "enable"; + Command == "disable"; + Command == "start"; + Command == "stop" -> + case string:words(Arg, $=) of + 2 -> Args; + 1 -> + upgrade_warning(Args, "Use `realtime ~s remote=~s`", [Command, Arg]), + ["realtime", Command, "remote="++Arg] + end; +upgrade(["realtime", Command]=Args) when Command == "start"; + Command == "stop" -> + upgrade_warning(Args, "Use `realtime ~s --all`", [Command]), + ["realtime", Command, "--all"]; +upgrade(["realtime", "cascades", "always"]=Args) -> + upgrade_warning(Args, "Use `realtime cascades enable`", []), + ["realtime", "cascades", "enable"]; +upgrade(["realtime", "cascades", "never"]=Args) -> + upgrade_warning(Args, "Use `realtime cascades disable`", []), + ["realtime", "cascades", "disable"]; +upgrade(["fullsync", Command, "--all"|_Rest]=Args) when Command == "enable"; + Command == "disable"; + Command == "start"; + Command == "stop" -> + Args; +upgrade(["fullsync", Command, Arg]=Args) when Command == "enable"; + Command == "disable"; + Command == "start"; + Command == "stop" -> + case string:words(Arg, $=) of + 2 -> Args; + 1 -> + upgrade_warning(Args, "Use `fullsync ~s remote=~s`", [Command, Arg]), + ["fullsync", Command, "remote="++Arg] + end; +upgrade(["fullsync", Command]=Args) when Command == "start"; + Command == "stop" -> + upgrade_warning(Args, "Use `fullsync ~s --all`", [Command]), + ["fullsync", Command, "--all"]; +upgrade(["fullsync", Key]=Args) when Key == "max_fssource_node"; + Key == "max_fssource_cluster"; + Key == "max_fssink_node" -> + TKey = config_key_translation(Key), + upgrade_warning(Args, "Use `show ~s`", [TKey]), + ["show", TKey]; +upgrade(["fullsync", Key, Value]=Args) when Key == "max_fssource_node"; + Key == "max_fssource_cluster"; + Key == "max_fssink_node" -> + TKey = config_key_translation(Key), + upgrade_warning(Args, "Use `set ~s`", [TKey]), + ["set", TKey++"="++Value]; +upgrade(["nat-map", Command, External0, Internal0]=Args0) when Command == "add"; + Command == "delete" -> + + {External, EChanged} = case string:words(External0, $=) of + 2 -> {External0, false}; + _ -> {"external="++External0, true} + end, + {Internal, IChanged} = case string:words(Internal0, $=) of + 2 -> {Internal0, false}; + _ -> {"internal="++Internal0, true} + end, + Args = [Command, External, Internal], + if EChanged orelse IChanged -> + upgrade_warning(Args0, "Use `nat-map ~s ~s ~s`", Args), + ok; + true -> + ok + end, + ["nat-map"|Args]; +upgrade(["nat-map", "del"|Rest]) -> + %% TODO: should we include this warning or just silently pass through? + %% upgrade_warning(Args, "Use `nat-map delete ~s`", [string:join(" ", Rest)]), + upgrade(["nat-map", "delete"|Rest]); +upgrade(Args) -> + Args. + +config_key_translation("max_fssource_node") -> "mdc.fullsync.source.max_workers_per_node"; +config_key_translation("max_fssource_cluster") -> "mdc.fullsync.source.max_workers_per_cluster"; +config_key_translation("max_fssink_node") -> "mdc.fullsync.sink.max_workers_per_node". + +%%----------------------- +%% Command: clusterstats +%%----------------------- +%% Show cluster stats for this node +clusterstats(_, Flags) -> + try + CMStats = cluster_mgr_stats(), + CConnStats = case Flags of + [] -> + riak_core_connection_mgr_stats:get_consolidated_stats(); + [{host, {IP, Port}}] when is_list(IP), is_integer(Port) -> + riak_core_connection_mgr_stats:get_stats_by_ip({IP,Port}); + [{protocol, ProtocolId}] when is_atom(ProtocolId) -> + riak_core_connection_mgr_stats:get_stats_by_protocol(ProtocolId); + _ -> + throw(badflags) + end, + %% TODO: make this output better + text_out("~p~n", [CMStats ++ CConnStats]) + catch + throw:badflags -> usage + end. + +%% rtq_stats() -> +%% case erlang:whereis(riak_repl2_rtq) of +%% Pid when is_pid(Pid) -> +%% [{realtime_queue_stats, riak_repl2_rtq:status()}]; +%% _ -> [] +%% end. + +cluster_mgr_stats() -> + case erlang:whereis(riak_repl_leader_gs) of + Pid when is_pid(Pid) -> + ConnectedClusters = case riak_core_cluster_mgr:get_known_clusters() of + {ok, Clusters} -> + [erlang:list_to_binary(Cluster) || Cluster <- + Clusters]; + Error -> Error + end, + [{cluster_name, + erlang:list_to_binary(riak_core_connection:symbolic_clustername())}, + {cluster_leader, riak_core_cluster_mgr:get_leader()}, + {connected_clusters, ConnectedClusters}]; + _ -> [] + end. + +%% clusterstats([Arg]) -> +%% NWords = string:words(Arg, $:), +%% case NWords of +%% 1 -> +%% %% assume protocol-id +%% ProtocolId = list_to_atom(Arg), +%% CConnStats = riak_core_connection_mgr_stats:get_stats_by_protocol(ProtocolId), +%% CMStats = cluster_mgr_stats(), +%% Stats = CMStats ++ CConnStats, +%% io:format("~p~n", [Stats]); +%% 2 -> +%% Address = Arg, +%% IP = string:sub_word(Address, 1, $:), +%% PortStr = string:sub_word(Address, 2, $:), +%% {Port,_Rest} = string:to_integer(PortStr), +%% CConnStats = riak_core_connection_mgr_stats:get_stats_by_ip({IP,Port}), +%% CMStats = cluster_mgr_stats(), +%% Stats = CMStats ++ CConnStats, +%% io:format("~p~n", [Stats]); +%% _ -> +%% {error, {badarg, Arg}} +%% end. + +%%----------------------- +%% Command: clustername show +%%----------------------- +clustername_show([], []) -> + text_out("Cluster name: ~s~n", [riak_core_connection:symbolic_clustername()]); +clustername_show(_,_) -> + usage. + +%%----------------------- +%% Command: clustername set +%%----------------------- +clustername_set([{name, ClusterName}], []) -> + riak_core_ring_manager:ring_trans(fun riak_core_connection:set_symbolic_clustername/2, + ClusterName), + text_out("Cluster name was set to: ~s", [ClusterName]); +clustername_set(_,_) -> + usage. + +%%----------------------- +%% Command: clusters +%%----------------------- +clusters([],[]) -> + case riak_core_cluster_mgr:get_known_clusters() of + {ok, []} -> + text_out("There are no known remote clusters."); + {ok, Clusters} -> + text_out([begin + {ok,Members} = riak_core_cluster_mgr:get_ipaddrs_of_cluster(ClusterName), + IPs = [string_of_ipaddr(Addr) || Addr <- Members], + io_lib:format("~s: ~p~n", [ClusterName, IPs]) + end || ClusterName <- Clusters]) + end; +clusters(_,_) -> + usage. + +%%----------------------- +%% Command: connections +%%----------------------- +connections([], []) -> + %% get cluster manager's outbound connections to other "remote" clusters, + %% which for now, are all the "sinks". + case riak_core_cluster_mgr:get_connections() of + {ok, []} -> + text_out("There are no connected sink clusters."); + {ok, Conns} -> + Rows = [format_cluster_conn(Conn) || Conn <- Conns], + output([table(Rows)]) + end. + +string_of_ipaddr({IP, Port}) -> + lists:flatten(io_lib:format("~s:~p", [IP, Port])). + +choose_best_addr({cluster_by_addr, {IP,Port}}, _ClientAddr) -> + string_of_ipaddr({IP,Port}); +choose_best_addr({cluster_by_name, _}, ClientAddr) -> + string_of_ipaddr(ClientAddr). + +string_of_remote({cluster_by_addr, {IP,Port}}) -> + string_of_ipaddr({IP,Port}); +string_of_remote({cluster_by_name, ClusterName}) -> + ClusterName. + +%% Format info about this sink into a clique table row. +%% Remote :: {ip,port} | ClusterName +format_cluster_conn({Remote,Pid}) -> + {ClusterName, MemberList, Status} = get_cluster_conn_status(Remote, Pid), + [{"Connection", string_of_remote(Remote)}, + {"Cluster Name", ClusterName}, + {"Ctrl-Pid", io_lib:format("~p", [Pid])}, + {"Members", format_cluster_conn_members(MemberList)}, + {"Status", format_cluster_conn_status(Status)}]. + +get_cluster_conn_status(Remote, Pid) -> + %% try to get status from Pid of cluster control channel. if we + %% haven't connected successfully yet, it will time out, which we + %% will fail fast for since it's a local process, not a remote + %% one. + try riak_core_cluster_conn:status(Pid, 2) of + {Pid, status, {ClientAddr, _Transport, Name, Members}} -> + CAddr = choose_best_addr(Remote, ClientAddr), + {Name, Members, {via, CAddr}}; + {_StateName, SRemote} -> + {"", [], {connecting, SRemote}} + catch + 'EXIT':{timeout, _} -> + {"", [], timeout} + end. + +format_cluster_conn_status({via, CAddr}) -> io_lib:format("via ~s", [CAddr]); +format_cluster_conn_status({connecting, SRemote}) -> io_lib:format("connecting to ~s", [string_of_remote(SRemote)]); +format_cluster_conn_status(timeout) -> "timed out". + +format_cluster_conn_members(Members) -> + string:join([ string_of_ipaddr(Addr) || Addr <- Members ], ","). + +%%----------------------- +%% Command: connect +%%----------------------- +connect([{address, {IP, Port}}], []) -> + ?LOG_USER_CMD("Connect to cluster at ~p:~p", [IP, Port]), + case riak_core_connection:symbolic_clustername() of + "undefined" -> + %% TODO: This should return an error, not a bare status, + %% but we still want to be able to print to stderr. This + %% will require a clique enhancement. + error_out("Error: Unable to establish connections until local cluster is named.~n" + "First use ~s clustername set name=NAME", [script_name()]); + _Name -> + riak_core_cluster_mgr:add_remote_cluster({IP, Port}), + text_out("Connecting to remote cluster at ~p:~p.", [IP, Port]) + end; +connect(_, _) -> + usage. + + +%%----------------------- +%% Command: disconnect +%%----------------------- +disconnect([{remote, {IP, Port}}], []) -> + ?LOG_USER_CMD("Disconnect from cluster at ~p:~p", [IP, Port]), + riak_core_cluster_mgr:remove_remote_cluster({IP, Port}), + text_out("Disconnecting from cluster at ~p:~p~n", [IP, Port]); +disconnect([{remote, Name}], []) -> + ?LOG_USER_CMD("Disconnect from cluster ~p", [Name]), + riak_core_cluster_mgr:remove_remote_cluster(Name), + text_out("Disconnecting from cluster ~p~n", [Name]); +disconnect(_, _) -> + usage. + + +%%-------------------------- +%% Command: realtime enable +%%-------------------------- + +realtime_enable([{remote, Remote}], []) -> + ?LOG_USER_CMD("Enable Realtime Replication to cluster ~p", [Remote]), + case riak_repl2_rt:enable(Remote) of + not_changed -> + error_out("Realtime replication to cluster ~p already enabled!~n", [Remote]); + {not_changed, _} -> + error_out("Realtime replication to cluster ~p already enabled!~n", [Remote]); + ok -> + text_out("Realtime replication to cluster ~p enabled.~n", [Remote]) + end; +realtime_enable(_, _) -> + usage. + +%%-------------------------- +%% Command: realtime disable +%%-------------------------- +realtime_disable([{remote, Remote}], []) -> + ?LOG_USER_CMD("Disable Realtime Replication to cluster ~p", [Remote]), + case riak_repl2_rt:disable(Remote) of + not_changed -> + error_out("Realtime replication to cluster ~p already disabled!~n", [Remote]); + {not_changed, _} -> + error_out("Realtime replication to cluster ~p already disabled!~n", [Remote]); + ok -> + text_out("Realtime replication to cluster ~p disabled.~n", [Remote]) + end; +realtime_disable(_, _) -> + usage. + +%%-------------------------- +%% Command: realtime start +%%-------------------------- +realtime_start([{remote, Remote}], []) -> + ?LOG_USER_CMD("Start Realtime Replication to cluster ~p", [Remote]), + case riak_repl2_rt:start(Remote) of + not_changed -> + error_out("Realtime replication to cluster ~p is already started or not enabled!~n", [Remote]); + {not_changed, _} -> + error_out("Realtime replication to cluster ~p is already started or not enabled!~n", [Remote]); + ok -> + text_out("Realtime replication to cluster ~p started.~n", [Remote]) + end; +realtime_start([], [{all, _}]) -> + ?LOG_USER_CMD("Start Realtime Replication to all connected clusters", []), + case riak_repl2_rt:enabled() of + [] -> + error_out("No remote clusters have realtime replication enabled.", []); + Remotes -> + lists:flatten([ realtime_start([{remote, Remote}], []) || Remote <- Remotes ]) + end; +realtime_start(_, _) -> + usage. + +%%-------------------------- +%% Command: realtime stop +%%-------------------------- +realtime_stop([{remote, Remote}], []) -> + ?LOG_USER_CMD("Stop Realtime Replication to cluster ~p", [Remote]), + case riak_repl2_rt:stop(Remote) of + not_changed -> + error_out("Realtime replication to cluster ~p is already stopped or not enabled!~n", [Remote]); + {not_changed, _} -> + error_out("Realtime replication to cluster ~p is already stopped or not enabled!~n", [Remote]); + ok -> + text_out("Realtime replication to cluster ~p stopped.~n", [Remote]) + end; +realtime_stop([], [{all, _}]) -> + ?LOG_USER_CMD("Stop Realtime Replication to all connected clusters", []), + case riak_repl2_rt:enabled() of + [] -> + error_out("No remote clusters have realtime replication enabled.", []); + Remotes -> + lists:flatten([ realtime_stop([{remote, Remote}], []) || Remote <- Remotes ]) + end; +realtime_stop(_, _) -> + usage. + +%%-------------------------- +%% Command: realtime cascades enable +%%-------------------------- +realtime_cascades_enable([], []) -> + ?LOG_USER_CMD("Enable Realtime Replication cascading", []), + riak_core_ring_manager:ring_trans(fun riak_repl_ring:rt_cascades_trans/2, + always), + text_out("Realtime cascades enabled.~n"); +realtime_cascades_enable(_,_) -> + usage. + +%%-------------------------- +%% Command: realtime cascades disable +%%-------------------------- +realtime_cascades_disable([], []) -> + ?LOG_USER_CMD("Disable Realtime Replication cascading", []), + riak_core_ring_manager:ring_trans(fun riak_repl_ring:rt_cascades_trans/2, + never), + text_out("Realtime cascades disabled.~n"); +realtime_cascades_disable(_,_) -> + usage. + +%%-------------------------- +%% Command: realtime cascades show +%%-------------------------- +realtime_cascades_show([], []) -> + case app_helper:get_env(riak_repl, realtime_cascades, always) of + always -> + text_out("Realtime cascades are enabled.~n"); + never -> + text_out("Realtime cascades are disabled.~n") + end; +realtime_cascades_show(_, _) -> + usage. + + +%%-------------------------- +%% Command: fullsync enable +%%-------------------------- +fullsync_enable([{remote, Remote}], []) -> + Leader = riak_core_cluster_mgr:get_leader(), + ?LOG_USER_CMD("Enable Fullsync Replication to cluster ~p", [Remote]), + riak_core_ring_manager:ring_trans(fun riak_repl_ring:fs_enable_trans/2, Remote), + _ = riak_repl2_fscoordinator_sup:start_coord(Leader, Remote), + text_out("Fullsync replication to cluster ~p enabled.", [Remote]); +fullsync_enable(_, _) -> + usage. + +%%-------------------------- +%% Command: fullsync disable +%%-------------------------- + +fullsync_disable([{remote, Remote}], []) -> + Leader = riak_core_cluster_mgr:get_leader(), + ?LOG_USER_CMD("Disable Fullsync Replication to cluster ~p", [Remote]), + riak_core_ring_manager:ring_trans(fun + riak_repl_ring:fs_disable_trans/2, Remote), + _ = riak_repl2_fscoordinator_sup:stop_coord(Leader, Remote), + text_out("Fullsync replication to cluster ~p disabled.", [Remote]); +fullsync_disable(_, _) -> + usage. + + +%%-------------------------- +%% Command: fullsync start +%%-------------------------- + +fullsync_start([{remote, Remote}], []) -> + Leader = riak_core_cluster_mgr:get_leader(), + ?LOG_USER_CMD("Start Fullsync Replication to cluster ~p", [Remote]), + Fullsyncs = riak_repl2_fscoordinator_sup:started(Leader), + case proplists:get_value(Remote, Fullsyncs) of + undefined -> + %% io:format("Fullsync not enabled for cluster ~p~n", [Remote]), + %% io:format("Use 'fullsync enable ~p' before start~n", [Remote]), + %% {error, not_enabled}; + error_out("Fullsync not enabled for cluster ~p~n" + "Use 'fullsync enable ~p' before start~n", [Remote, Remote]); + Pid -> + riak_repl2_fscoordinator:start_fullsync(Pid), + text_out("Fullsync replication to cluster ~p started.", [Remote]) + end; +fullsync_start([], [{all,_}]) -> + Leader = riak_core_cluster_mgr:get_leader(), + Fullsyncs = riak_repl2_fscoordinator_sup:started(Leader), + ?LOG_USER_CMD("Start Fullsync Replication to all connected clusters",[]), + _ = [riak_repl2_fscoordinator:start_fullsync(Pid) || {_, Pid} <- + Fullsyncs], + text_out("Fullsync replication started to all connected clusters."); +fullsync_start(_, _) -> + usage. + +%%-------------------------- +%% Command: fullsync stop +%%-------------------------- + +fullsync_stop([{remote, Remote}], []) -> + Leader = riak_core_cluster_mgr:get_leader(), + ?LOG_USER_CMD("Stop Fullsync Replication to cluster ~p", [Remote]), + Fullsyncs = riak_repl2_fscoordinator_sup:started(Leader), + case proplists:get_value(Remote, Fullsyncs) of + undefined -> + %% Fullsync is not enabled, but carry on quietly. + error_out("Fullsync is not enabled for cluster ~p.", [Remote]); + Pid -> + riak_repl2_fscoordinator:stop_fullsync(Pid), + text_out("Fullsync stopped for cluster ~p.", [Remote]) + end; +fullsync_stop([], [{all,_}]) -> + Leader = riak_core_cluster_mgr:get_leader(), + Fullsyncs = riak_repl2_fscoordinator_sup:started(Leader), + ?LOG_USER_CMD("Stop Fullsync Replication to all connected clusters",[]), + _ = [riak_repl2_fscoordinator:stop_fullsync(Pid) || {_, Pid} <- + Fullsyncs], + text_out("Fullsync replication stopped to all connected clusters."); +fullsync_stop(_, _) -> + usage. + +%%-------------------------- +%% Command: proxy-get enable +%%-------------------------- + +proxy_get_enable([{remote, Remote}], []) -> + ?LOG_USER_CMD("Enable Riak CS Proxy GET block provider for ~p",[Remote]), + riak_core_ring_manager:ring_trans(fun + riak_repl_ring:pg_enable_trans/2, Remote), + text_out("Proxy-get to cluster ~s has been enabled.", [Remote]); +proxy_get_enable(_, _) -> + usage. + + +%%-------------------------- +%% Command: proxy-get disable +%%-------------------------- + +proxy_get_disable([{remote, Remote}], []) -> + ?LOG_USER_CMD("Disable Riak CS Proxy GET block provider for ~p",[Remote]), + riak_core_ring_manager:ring_trans(fun + riak_repl_ring:pg_disable_trans/2, Remote), + text_out("Proxy-get to cluster ~s has been disabled.", [Remote]); +proxy_get_disable(_, _) -> + usage. + +%%-------------------------- +%% Command: proxy-get redirect cluster-id +%%-------------------------- + +proxy_get_redirect_cluster_id([], []) -> + {ok, Ring} = riak_core_ring_manager:get_my_ring(), + ClusterId = lists:flatten( + io_lib:format("~p", [riak_core_ring:cluster_name(Ring)])), + text_out("Local cluster id: ~p", [ClusterId]); +proxy_get_redirect_cluster_id(_, _) -> + usage. + +%%-------------------------- +%% Command: proxy-get redirect show +%%-------------------------- + +proxy_get_redirect_show([{from, FromClusterId}], []) -> + case riak_core_metadata:get({<<"replication">>, <<"cluster-mapping">>}, FromClusterId) of + undefined -> + text_out("No mapping for ~p~n", [FromClusterId]); + ToClusterId -> + text_out("Cluster id ~p redirecting to cluster id ~p~n", [FromClusterId, ToClusterId]) + end; +proxy_get_redirect_show(_, _) -> + usage. + +%%-------------------------- +%% Command: proxy-get redirect add +%%-------------------------- + +proxy_get_redirect_add([{to, _}=To, {from, _}=From], []) -> + proxy_get_redirect_add([From, To], []); +proxy_get_redirect_add([{from, FromClusterId}, {to, ToClusterId}], []) -> + lager:info("Redirecting cluster id: ~p to ~p", [FromClusterId, ToClusterId]), + riak_core_metadata:put({<<"replication">>, <<"cluster-mapping">>}, + FromClusterId, ToClusterId), + text_out("Redirected proxy-get from cluster ~s to cluster ~s~n", + [FromClusterId, ToClusterId]); +proxy_get_redirect_add(_, _) -> + usage. + + +%%-------------------------- +%% Command: proxy-get redirect delete +%%-------------------------- + +proxy_get_redirect_delete([{from, FromClusterId}], []) -> + lager:info("Deleting redirect to ~p", [FromClusterId]), + riak_core_metadata:delete({<<"replication">>, <<"cluster-mapping">>}, FromClusterId), + text_out("Deleted proxy-get redirect from cluster ~s~n", [FromClusterId]); +proxy_get_redirect_delete(_, _) -> + usage. + +%%-------------------------- +%% Command: nat-map show +%%-------------------------- +nat_map_show([], []) -> + Ring = get_ring(), + case riak_repl_ring:get_nat_map(Ring) of + [] -> + text_out("No NAT mappings registered.\n"); + Pairs -> + %% Headers = [{internal, "Internal"}, {external, "External"}], + Rows = [ format_nat_map(Int, Ext) || {Int, Ext} <- Pairs ], + output([text("NAT mappings:\n"), table(Rows)]) + end; +nat_map_show(_,_) -> + usage. + +format_nat_map(Int, Ext) -> + [{"Internal", io_lib:format("~s", [print_ip_and_maybe_port(Int)])}, + {"External", io_lib:format("~s", [print_ip_and_maybe_port(Ext)])}]. + +print_ip_and_maybe_port({IP, Port}) when is_tuple(IP) -> + [inet_parse:ntoa(IP), $:, integer_to_list(Port)]; +print_ip_and_maybe_port({Host, Port}) when is_list(Host) -> + [Host, $:, integer_to_list(Port)]; +print_ip_and_maybe_port(IP) when is_tuple(IP) -> + inet_parse:ntoa(IP); +print_ip_and_maybe_port(Host) when is_list(Host) -> + Host. + +%%-------------------------- +%% Command: nat-map add external=IP:PORT internal=IP +%%-------------------------- +nat_map_add([{external, Ext0}, {internal, Int0}], []) -> + %% We rely on cuttlefish to parse the IP for the most + %% part. However, it leaves addreses as strings so we still need + %% to try parsing them. + Ext = parse_ip(Ext0, false), + Int = parse_ip(Int0, true), + case collect_bad_ips([Ext, Int]) of + [] -> + ?LOG_USER_CMD("Add a NAT map from External IP ~p to Internal IP ~p", [Ext, Int]), + riak_core_ring_manager:ring_trans(fun riak_repl_ring:add_nat_map/2, {Ext, Int}), + text_out("Added a NAT map from External IP ~p to Internal IP ~p~n", [Ext, Int]); + Errors -> + error_out("Invalid IPs given: ~s~n", [string:join(Errors, ", ")]) + end; +nat_map_add([{internal,_}, {external,_}]=Args, []) -> + nat_map_add(lists:sort(Args), []); +nat_map_add(_,_) -> + usage. + +parse_ip({Addr, Port}, HostnameAllowed) when is_list(Addr), is_integer(Port) -> + case parse_ip(Addr, HostnameAllowed) of + {error,_}=E -> E; + IP -> + {IP, Port} + end; +parse_ip(Addr, true) when is_list(Addr) -> + case parse_ip(Addr, false) of + {error,_}=E -> + case inet_gethost_native:gethostbyname(Addr) of + {ok, HostAddr} -> HostAddr; + _ -> E + end; + IP -> IP + end; +parse_ip(Addr, false) when is_list(Addr) -> + case inet_parse:ipv4strict_address(Addr) of + {ok, IP} -> IP; + _ -> + {error, {bad_ip, Addr}} + end. + +collect_bad_ips(List) -> + [ case BadIP of + {IP, Port} -> string:join([IP, Port], ":"); + _ -> BadIP + end || {error, {bad_ip, BadIP}} <- List]. + +%%-------------------------- +%% Command: nat-map delete external=IP:PORT internal=IP +%%-------------------------- + +nat_map_delete([{external, Ext0}, {internal, Int0}], []) -> + %% We rely on cuttlefish to parse the IP for the most + %% part. However, it leaves addreses as strings so we still need + %% to try parsing them. + %% We rely on cuttlefish to parse the IP for the most + %% part. However, it leaves addreses as strings so we still need + %% to try parsing them. + Ext = parse_ip(Ext0, false), + Int = parse_ip(Int0, true), + case collect_bad_ips([Ext, Int]) of + [] -> + ?LOG_USER_CMD("Delete a NAT map from External IP ~p to Internal IP ~p", [Ext, Int]), + riak_core_ring_manager:ring_trans(fun riak_repl_ring:del_nat_map/2, {Ext, Int}), + text_out("Deleted a NAT map from External IP ~p to Internal IP ~p~n", [Ext, Int]); + Errors -> + error_out("Invalid IPs given: ~s~n", [string:join(Errors, ", ")]) + end; +nat_map_delete([{internal,_}, {external,_}]=Args, []) -> + nat_map_delete(lists:sort(Args), []); +nat_map_delete(_,_) -> + usage. + + +%%-------------------------- +%% Command: set FULLSYNC_CONFIG_KEY=VALUE +%%-------------------------- + +%% For each of these "max" parameter changes, we need to make an rpc +%% multi-call to every node so that all nodes have the new value in +%% their application environment. That way, whoever becomes the +%% fullsync coordinator will have the correct values. TODO: what +%% happens when a machine bounces and becomes leader? It won't know +%% the new value. Seems like we need a central place to hold these +%% configuration values. +set_fullsync_limit(["mdc", "fullsync"|Key]=Config, Value, _Flags) -> + %% NB: All config settings are done cluster-wide, there's not + %% flags for specific nodes like in handoff. + AppEnvKey = max_fs_config_key(Key), + Message = max_fs_message(AppEnvKey), + riak_core_util:rpc_every_member(lager, log, + [notice, [{pid, self()}], + "[user] Locally set max number of fullsync workers ~s to ~p", + [Message, Value]], + ?CONSOLE_RPC_TIMEOUT), + riak_core_util:rpc_every_member(application, set_env, + [riak_repl, AppEnvKey, Value], + ?CONSOLE_RPC_TIMEOUT), + Output = text_out("Set max number of fullsync workers ~s to ~p~n", [Message, Value]), + clique:print(Output, ["set", cuttlefish_variable:format(Config)]). + +max_fs_message(max_fssource_node) -> "per source node"; +max_fs_message(max_fssource_cluster) -> "for source cluster"; +max_fs_message(max_fssink_node) -> "per sink node". + + +max_fs_config_key(["source", "max_workers_per_node"]) -> max_fssource_node; +max_fs_config_key(["source", "max_workers_per_cluster"]) -> max_fssource_cluster; +max_fs_config_key(["sink", "max_workers_per_node"]) -> max_fssink_node. + diff --git a/test/riak_repl_lager_test_backend.erl b/test/riak_repl_lager_test_backend.erl new file mode 100644 index 00000000..8f4247bb --- /dev/null +++ b/test/riak_repl_lager_test_backend.erl @@ -0,0 +1,130 @@ +-module(riak_repl_lager_test_backend). + +-behavior(gen_event). + +%% gen_event callbacks +-export([init/1, + handle_call/2, + handle_event/2, + handle_info/2, + terminate/2, + code_change/3]). +-export([get_logs/0, + bounce/0, + bounce/1]). + +%% holds the log messages for retreival on terminate +-record(state, {level :: {mask, integer()}, + verbose :: boolean(), + log = [] :: list()}). + +-include_lib("lager/include/lager.hrl"). + +-spec get_logs() -> [iolist()] | {error, term()}. +get_logs() -> + gen_event:call(lager_event, ?MODULE, get_logs, infinity). + +bounce() -> + bounce(error). + +bounce(Level) -> + application:stop(lager), + lager:start(), + gen_event:add_handler(lager_event, ?MODULE, [error, false]), + lager:set_loglevel(?MODULE, Level), + ok. + +-spec(init(integer()|atom()|[term()]) -> {ok, #state{}} | {error, atom()}). +%% @private +%% @doc Initializes the event handler +init(Level) when is_atom(Level) -> + case lists:member(Level, ?LEVELS) of + true -> + {ok, #state{level=lager_util:level_to_num(Level), verbose=false}}; + _ -> + {error, bad_log_level} + end; +init([Level, Verbose]) -> + case lists:member(Level, ?LEVELS) of + true -> + {ok, #state{level=lager_util:level_to_num(Level), verbose=Verbose}}; + _ -> + {error, bad_log_level} + end. + +-spec(handle_event(tuple(), #state{}) -> {ok, #state{}}). +%% @private +%% @doc handles the event, adding the log message to the gen_event's state. +%% this function attempts to handle logging events in both the simple tuple +%% and new record (introduced after lager 1.2.1) formats. +handle_event({log, Dest, Level, {Date, Time}, [LevelStr, Location, Message]}, %% lager 1.2.1 + #state{level=L, verbose=Verbose, log = Logs} = State) when Level > L -> + case lists:member(riak_test_lager_backend, Dest) of + true -> + Log = case Verbose of + true -> + [Date, " ", Time, " ", LevelStr, Location, Message]; + _ -> + [Time, " ", LevelStr, Message] + end, + {ok, State#state{log=[Log|Logs]}}; + false -> + {ok, State} + end; +handle_event({log, Level, {Date, Time}, [LevelStr, Location, Message]}, %% lager 1.2.1 + #state{level=LogLevel, verbose=Verbose, log = Logs} = State) when Level =< LogLevel -> + Log = case Verbose of + true -> + [Date, " ", Time, " ", LevelStr, Location, Message]; + _ -> + [Time, " ", LevelStr, Message] + end, + {ok, State#state{log=[Log|Logs]}}; +handle_event({log, {lager_msg, Dest, _Meta, Level, DateTime, _Timestamp, Message}}, + State) -> %% lager 2.0.0 + case lager_util:level_to_num(Level) of + L when L =< State#state.level -> + handle_event({log, L, DateTime, + [["[",atom_to_list(Level),"] "], " ", Message]}, + State); + L -> + handle_event({log, Dest, L, DateTime, + [["[",atom_to_list(Level),"] "], " ", Message]}, + State) + end; +handle_event(Event, State) -> + {ok, State#state{log = [Event|State#state.log]}}. + +-spec(handle_call(any(), #state{}) -> {ok, any(), #state{}}). +%% @private +%% @doc gets and sets loglevel. This is part of the lager backend api. +handle_call(get_loglevel, #state{level=Level} = State) -> + {ok, Level, State}; +handle_call({set_loglevel, Level}, State) -> + case lists:member(Level, ?LEVELS) of + true -> + {ok, ok, State#state{level=lager_util:level_to_num(Level)}}; + _ -> + {ok, {error, bad_log_level}, State} + end; +handle_call(get_logs, #state{log = Logs} = State) -> + {ok, lists:reverse(Logs), State}; +handle_call(_, State) -> + {ok, ok, State}. + +-spec(handle_info(any(), #state{}) -> {ok, #state{}}). +%% @private +%% @doc gen_event callback, does nothing. +handle_info(_, State) -> + {ok, State}. + +-spec(code_change(any(), #state{}, any()) -> {ok, #state{}}). +%% @private +%% @doc gen_event callback, does nothing. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +-spec(terminate(any(), #state{}) -> {ok, list()}). +%% @doc gen_event callback, does nothing. +terminate(_Reason, #state{log=Logs}) -> + {ok, lists:reverse(Logs)}. diff --git a/test/riak_repl_schema_tests.erl b/test/riak_repl_schema_tests.erl index c12aa5a6..9ab2778b 100644 --- a/test/riak_repl_schema_tests.erl +++ b/test/riak_repl_schema_tests.erl @@ -18,11 +18,14 @@ basic_schema_test() -> cuttlefish_unit:assert_config(Config, "riak_repl.max_fssource_node", 1), cuttlefish_unit:assert_config(Config, "riak_repl.max_fssink_node", 1), cuttlefish_unit:assert_config(Config, "riak_repl.fullsync_on_connect", true), - cuttlefish_unit:assert_not_configured(Config, "riak_repl.fullsync_interval"), + cuttlefish_unit:assert_config(Config, "riak_repl.fullsync_interval", 360), cuttlefish_unit:assert_config(Config, "riak_repl.rtq_max_bytes", 104857600), cuttlefish_unit:assert_config(Config, "riak_repl.proxy_get", disabled), cuttlefish_unit:assert_config(Config, "riak_repl.rt_heartbeat_interval", 15), cuttlefish_unit:assert_config(Config, "riak_repl.rt_heartbeat_timeout", 15), + cuttlefish_unit:assert_config(Config, "riak_repl.fullsync_use_background_manager", false), + cuttlefish_unit:assert_not_configured(Config, "riak_repl.fullsync_stat_refresh_interval"), + cuttlefish_unit:assert_config(Config, "riak_repl.fullsync_strategy", keylist), ok. override_schema_test() -> @@ -31,16 +34,20 @@ override_schema_test() -> Conf = [ {["mdc", "data_root"], "/some/repl/place"}, {["mdc", "cluster_manager"], {"4.3.2.1", 4321}}, - {["mdc", "max_fssource_cluster"], 10}, - {["mdc", "max_fssource_node"], 2}, - {["mdc", "max_fssink_node"], 4}, - {["mdc", "fullsync_on_connect"], false}, - {["mdc", "fullsync_interval", "cluster1"], "15m"}, - {["mdc", "fullsync_interval", "cluster2"], "1h"}, - {["mdc", "rtq_max_bytes"], "50MB"}, + {["mdc", "fullsync", "strategy"], aae}, + {["mdc", "fullsync", "source", "max_workers_per_cluster"], 10}, + {["mdc", "fullsync", "source", "max_workers_per_node"], 2}, + {["mdc", "fullsync", "sink", "max_workers_per_node"], 4}, + {["mdc", "fullsync", "start_on_connect"], off}, + {["mdc", "fullsync", "interval"], per_sink}, + {["mdc", "fullsync", "interval", "cluster1"], "15m"}, + {["mdc", "fullsync", "interval", "cluster2"], "1h"}, + {["mdc", "realtime", "queue_max_bytes"], "50MB"}, {["mdc", "proxy_get"], on}, - {["mdc", "realtime", "heartbeat_interval"], "15m"}, - {["mdc", "realtime", "heartbeat_timeout"], "15d"} + {["mdc", "realtime", "heartbeat", "interval"], "15m"}, + {["mdc", "realtime", "heartbeat", "timeout"], "15d"}, + {["mdc", "fullsync", "background_manager"], on}, + {["mdc", "fullsync", "source", "metrics_refresh_interval"], "30s"} ], %% The defaults are defined in ../priv/riak_repl.schema. @@ -60,8 +67,54 @@ override_schema_test() -> cuttlefish_unit:assert_config(Config, "riak_repl.proxy_get", enabled), cuttlefish_unit:assert_config(Config, "riak_repl.rt_heartbeat_interval", 900), cuttlefish_unit:assert_config(Config, "riak_repl.rt_heartbeat_timeout", 1296000), + cuttlefish_unit:assert_config(Config, "riak_repl.fullsync_use_background_manager", true), + cuttlefish_unit:assert_config(Config, "riak_repl.fullsync_stat_refresh_interval", 30000), + cuttlefish_unit:assert_config(Config, "riak_repl.fullsync_strategy", aae), ok. +heartbeat_interval_test() -> + Conf = [{["mdc", "realtime", "heartbeat"], false}, + {["mdc", "realtime", "heartbeat", "interval"], 300}], + Config = cuttlefish_unit:generate_templated_config(["../priv/riak_repl.schema"], Conf, context()), + cuttlefish_unit:assert_config(Config, "riak_repl.rt_heartbeat_interval", undefined). + +fullsync_interval_test_() -> + [ + {"interval=never but sink intervals set results in warning", + fun() -> + riak_repl_lager_test_backend:bounce(warning), + Conf = [{["mdc", "fullsync", "interval"], never}, + {["mdc", "fullsync", "interval", "cluster1"], 360}], + Config = cuttlefish_unit:generate_templated_config(["../priv/riak_repl.schema"], Conf, context()), + cuttlefish_unit:assert_config(Config, "riak_repl.fullsync_interval", disabled), + Logs = riak_repl_lager_test_backend:get_logs(), + ?assertMatch([_|_], Logs), + [ ?assertEqual("mdc.fullsync.interval is set to never, sink specific intervals are ignored", + Message) || [_Time, " ", ["[","warning","]"], Message] <- Logs ] + end}, + {"interval=per_sink but no sink intervals is invalid", + fun() -> + Conf = [{["mdc", "fullsync", "interval"], per_sink}], + Config = cuttlefish_unit:generate_templated_config(["../priv/riak_repl.schema"], Conf, context()), + cuttlefish_unit:assert_error_message(Config, + "Translation for 'riak_repl.fullsync_interval' found invalid configuration: " + "Cannot set mdc.fullsync.interval = per_sink and" + " omit sink-specific intervals, set sink-specific" + " intervals or use 'never'") + end}, + {"interval=Time and per-sink intervals is invalid", + fun() -> + Conf = [{["mdc", "fullsync", "interval"], 60}, + {["mdc", "fullsync", "interval", "cluster1"], 360}], + Config = cuttlefish_unit:generate_templated_config(["../priv/riak_repl.schema"], Conf, context()), + cuttlefish_unit:assert_error_message(Config, + "Translation for 'riak_repl.fullsync_interval' found invalid configuration: " + "Cannot set both mdc.fullsync.interval and sink-specific intervals") + end} + ]. + + + %% this context() represents the substitution variables that rebar %% will use during the build process. riak_jmx's schema file is %% written with some {{mustache_vars}} for substitution during diff --git a/tools.mk b/tools.mk index eecfbe4f..ab056496 100644 --- a/tools.mk +++ b/tools.mk @@ -1,8 +1,49 @@ +# ------------------------------------------------------------------- +# +# Copyright (c) 2014 Basho Technologies, Inc. +# +# This file is provided to you under the Apache License, +# Version 2.0 (the "License"); you may not use this file +# except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ------------------------------------------------------------------- + +# ------------------------------------------------------------------- +# NOTE: This file is is from https://github.com/basho/tools.mk. +# It should not be edited in a project. It should simply be updated +# wholesale when a new version of tools.mk is released. +# ------------------------------------------------------------------- + REBAR ?= ./rebar +REVISION ?= $(shell git rev-parse --short HEAD) +PROJECT ?= $(shell basename `find src -name "*.app.src"` .app.src) + +.PHONY: compile-no-deps test docs xref dialyzer-run dialyzer-quick dialyzer \ + cleanplt upload-docs + +compile-no-deps: + ${REBAR} compile skip_deps=true test: compile ${REBAR} eunit skip_deps=true +upload-docs: docs + @if [ -z "${BUCKET}" -o -z "${PROJECT}" -o -z "${REVISION}" ]; then \ + echo "Set BUCKET, PROJECT, and REVISION env vars to upload docs"; \ + exit 1; fi + @cd doc; s3cmd put -P * "s3://${BUCKET}/${PROJECT}/${REVISION}/" > /dev/null + @echo "Docs built at: http://${BUCKET}.s3-website-us-east-1.amazonaws.com/${PROJECT}/${REVISION}" + docs: ${REBAR} doc skip_deps=true @@ -31,8 +72,31 @@ ${LOCAL_PLT}: compile fi \ fi -dialyzer: ${PLT} ${LOCAL_PLT} +dialyzer-run: @echo "==> $(shell basename $(shell pwd)) (dialyzer)" +# The bulk of the code below deals with the dialyzer.ignore-warnings file +# which contains strings to ignore if output by dialyzer. +# Typically the strings include line numbers. Using them exactly is hard +# to maintain as the code changes. This approach instead ignores the line +# numbers, but takes into account the number of times a string is listed +# for a given file. So if one string is listed once, for example, and it +# appears twice in the warnings, the user is alerted. It is possible but +# unlikely that this approach could mask a warning if one ignored warning +# is removed and two warnings of the same kind appear in the file, for +# example. But it is a trade-off that seems worth it. +# Details of the cryptic commands: +# - Remove line numbers from dialyzer.ignore-warnings +# - Pre-pend duplicate count to each warning with sort | uniq -c +# - Remove annoying white space around duplicate count +# - Save in dialyer.ignore-warnings.tmp +# - Do the same to dialyzer_warnings +# - Remove matches from dialyzer.ignore-warnings.tmp from output +# - Remove duplicate count +# - Escape regex special chars to use lines as regex patterns +# - Add pattern to match any line number (file.erl:\d+:) +# - Anchor to match the entire line (^entire line$) +# - Save in dialyzer_unhandled_warnings +# - Output matches for those patterns found in the original warnings @if [ -f $(LOCAL_PLT) ]; then \ PLTS="$(PLT) $(LOCAL_PLT)"; \ else \ @@ -44,13 +108,37 @@ dialyzer: ${PLT} ${LOCAL_PLT} exit 1; \ fi; \ dialyzer $(DIALYZER_FLAGS) --plts $${PLTS} -c ebin > dialyzer_warnings ; \ - egrep -v "^[[:space:]]*(done|Checking|Proceeding|Compiling)" dialyzer_warnings | grep -F -f dialyzer.ignore-warnings -v > dialyzer_unhandled_warnings ; \ - cat dialyzer_unhandled_warnings ; \ - [ $$(cat dialyzer_unhandled_warnings | wc -l) -eq 0 ] ; \ + cat dialyzer.ignore-warnings \ + | sed -E 's/^([^:]+:)[^:]+:/\1/' \ + | sort \ + | uniq -c \ + | sed -E '/.*\.erl: /!s/^[[:space:]]*[0-9]+[[:space:]]*//' \ + > dialyzer.ignore-warnings.tmp ; \ + egrep -v "^[[:space:]]*(done|Checking|Proceeding|Compiling)" dialyzer_warnings \ + | sed -E 's/^([^:]+:)[^:]+:/\1/' \ + | sort \ + | uniq -c \ + | sed -E '/.*\.erl: /!s/^[[:space:]]*[0-9]+[[:space:]]*//' \ + | grep -F -f dialyzer.ignore-warnings.tmp -v \ + | sed -E 's/^[[:space:]]*[0-9]+[[:space:]]*//' \ + | sed -E 's/([]\^:+?|()*.$${}\[])/\\\1/g' \ + | sed -E 's/(\\\.erl\\\:)/\1\\d+:/g' \ + | sed -E 's/^(.*)$$/^\1$$/g' \ + > dialyzer_unhandled_warnings ; \ + rm dialyzer.ignore-warnings.tmp; \ + if [ $$(cat dialyzer_unhandled_warnings | wc -l) -gt 0 ]; then \ + egrep -f dialyzer_unhandled_warnings dialyzer_warnings ; \ + found_warnings=1; \ + fi; \ + [ "$$found_warnings" != 1 ] ; \ else \ dialyzer $(DIALYZER_FLAGS) --plts $${PLTS} -c ebin; \ fi +dialyzer-quick: compile-no-deps dialyzer-run + +dialyzer: ${PLT} ${LOCAL_PLT} dialyzer-run + cleanplt: @echo @echo "Are you sure? It takes several minutes to re-build." @@ -59,4 +147,3 @@ cleanplt: sleep 5 rm $(PLT) rm $(LOCAL_PLT) -