Skip to content

Commit

Permalink
tt replicaset upgrade: support remote replicaset
Browse files Browse the repository at this point in the history
This patch adds the ability to update the schema
on a remote replicaset.

Closes #968
  • Loading branch information
mandesero committed Nov 21, 2024
1 parent c5c65e1 commit d6d911d
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 20 deletions.
18 changes: 16 additions & 2 deletions cli/cmd/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ var (
// newUpgradeCmd creates a "replicaset upgrade" command.
func newUpgradeCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "upgrade (<APP_NAME>) [flags]",
Use: "upgrade (<APP_NAME> | <URI>) [flags]\n\n" +
replicasetUriHelp,
DisableFlagsInUseLine: true,
Short: "Upgrade tarantool cluster",
Long: "Upgrade tarantool cluster.\n\n" +
Expand All @@ -79,6 +80,7 @@ func newUpgradeCmd() *cobra.Command {
"timeout for waiting the LSN synchronization (in seconds)")

addOrchestratorFlags(cmd)
addTarantoolConnectFlags(cmd)
return cmd
}

Expand Down Expand Up @@ -530,6 +532,18 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e
if ctx.IsInstanceConnect {
defer ctx.Conn.Close()
}

connectCtx := connect.ConnectCtx{
Username: replicasetUser,
Password: replicasetPassword,
SslKeyFile: replicasetSslKeyFile,
SslCertFile: replicasetSslCertFile,
SslCaFile: replicasetSslCaFile,
SslCiphers: replicasetSslCiphers,
}
var connOpts connector.ConnectOpts
connOpts, _, _ = resolveConnectOpts(cmdCtx, cliOpts, &connectCtx, args)

return replicasetcmd.Upgrade(replicasetcmd.DiscoveryCtx{
IsApplication: ctx.IsApplication,
RunningCtx: ctx.RunningCtx,
Expand All @@ -538,7 +552,7 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e
}, replicasetcmd.UpgradeOpts{
ChosenReplicasetAliases: chosenReplicasetAliases,
LsnTimeout: lsnTimeout,
})
}, connOpts)
}

// internalReplicasetPromoteModule is a "promote" command for the replicaset module.
Expand Down
56 changes: 39 additions & 17 deletions cli/replicaset/cmd/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func filterReplicasetsByAliases(replicasets replicaset.Replicasets,
return chosenReplicasets, nil
}

// Upgrade upgrades tarantool schema.
func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts) error {
func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts, connOpts connector.ConnectOpts) error {
replicasets, err := getReplicasets(discoveryCtx)
if err != nil {
return err
Expand All @@ -75,12 +74,13 @@ func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts) error {
return err
}

return internalUpgrade(replicasetsToUpgrade, opts.LsnTimeout)
return internalUpgrade(replicasetsToUpgrade, opts.LsnTimeout, connOpts)
}

func internalUpgrade(replicasets []replicaset.Replicaset, lsnTimeout int) error {
func internalUpgrade(replicasets []replicaset.Replicaset, lsnTimeout int,
connOpts connector.ConnectOpts) error {
for _, replicaset := range replicasets {
err := upgradeReplicaset(replicaset, lsnTimeout)
err := upgradeReplicaset(replicaset, lsnTimeout, connOpts)
if err != nil {
fmt.Printf("• %s: error\n", replicaset.Alias)
return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err)
Expand All @@ -99,7 +99,8 @@ func closeConnectors(master *instanceMeta, replicas []instanceMeta) {
}
}

func getInstanceConnector(instance replicaset.Instance) (connector.Connector, error) {
func getInstanceConnector(instance replicaset.Instance,
connOpts connector.ConnectOpts) (connector.Connector, error) {
run := instance.InstanceCtx
fullInstanceName := running.GetAppInstanceName(run)
if fullInstanceName == "" {
Expand All @@ -116,33 +117,53 @@ func getInstanceConnector(instance replicaset.Instance) (connector.Connector, er
})

if err != nil {
return nil, fmt.Errorf("instance %s failed to connect via UNIX socket "+
": %w", fullInstanceName, err)
fErr := err
conn, err = connector.Connect(connector.ConnectOpts{
Network: connOpts.Network,
Address: instance.URI,
Username: connOpts.Username,
Password: connOpts.Password,
Ssl: connOpts.Ssl,
})
if err != nil {
return nil, fmt.Errorf("instance %s failed to connect via UNIX socket "+
"and uri: %w %w", fullInstanceName, err, fErr)
}
}
return conn, nil
}

func collectRWROInfo(replset replicaset.Replicaset) (*instanceMeta, []instanceMeta,
func collectRWROInfo(replset replicaset.Replicaset,
connOpts connector.ConnectOpts) (*instanceMeta, []instanceMeta,
error) {
var master *instanceMeta = nil
var replicas []instanceMeta
for _, instance := range replset.Instances {
run := instance.InstanceCtx
fullInstanceName := running.GetAppInstanceName(run)
conn, err := getInstanceConnector(instance)
conn, err := getInstanceConnector(instance, connOpts)

if err != nil {
return nil, nil, err
}

isRW := false

if instance.Mode == replicaset.ModeUnknown {
closeConnectors(master, replicas)
return nil, nil, fmt.Errorf(
"can't determine RO/RW mode on instance: %s", fullInstanceName)
// Discovery cannot determine all instance modes of the remote replica set.
res, err := conn.Eval(
"return (type(box.cfg) == 'function') or box.info.ro",
[]any{}, connector.RequestOpts{})
if err != nil || len(res) == 0 {
return nil, nil, fmt.Errorf(
"can't determine RO/RW mode on instance: %s",
fullInstanceName)
}
isRW = !res[0].(bool)
} else {
isRW = instance.Mode.String() == "rw"
}

isRW := instance.Mode.String() == "rw"

if isRW && master != nil {
closeConnectors(master, replicas)
return nil, nil, fmt.Errorf("%s and %s are both masters",
Expand Down Expand Up @@ -231,8 +252,9 @@ func snapshot(instance *instanceMeta) error {
return nil
}

func upgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int) error {
master, replicas, err := collectRWROInfo(replicaset)
func upgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int,
connOpts connector.ConnectOpts) error {
master, replicas, err := collectRWROInfo(replicaset, connOpts)
if err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions test/integration/replicaset/small_cluster_app/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
credentials:
users:
client:
password: 'secret'
roles: [super]
guest:
roles: [super]

groups:
group-001:
replicasets:
replicaset-001:
instances:
storage-master:
iproto:
listen:
- uri: '127.0.0.1:3301'
database:
mode: rw
storage-replica:
iproto:
listen:
- uri: '127.0.0.1:3302'
database:
mode: ro
4 changes: 4 additions & 0 deletions test/integration/replicaset/small_cluster_app/instances.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
storage-master:

storage-replica:

54 changes: 53 additions & 1 deletion test/integration/replicaset/test_replicaset_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import shutil
import subprocess
import tempfile
import time

import pytest
from replicaset_helpers import stop_application
from vshard_cluster import VshardCluster

from utils import get_tarantool_version, run_command_and_get_output, wait_file
from utils import (control_socket, get_tarantool_version,
run_command_and_get_output, run_path, wait_file)

tarantool_major_version, _ = get_tarantool_version()

Expand Down Expand Up @@ -192,3 +194,53 @@ def test_upgrade_downgraded_cluster_replicasets(tt_cmd, tmp_path):

finally:
app.stop()


def start_application(cmd, workdir, app_name, instances):
instance_process = subprocess.Popen(
cmd,
cwd=workdir,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
text=True
)
start_output = instance_process.stdout.read()
for inst in instances:
assert f"Starting an instance [{app_name}:{inst}]" in start_output


@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip cluster instances test for Tarantool < 3")
def test_upgrade_remote_replicasets(tt_cmd, tmpdir_with_cfg):
tmpdir = tmpdir_with_cfg
app_name = "small_cluster_app"
app_path = os.path.join(tmpdir, app_name)
shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path)

run_dir = os.path.join(tmpdir, app_name, run_path)
instances = ['storage-master', 'storage-replica']

try:
# Start an instance.
start_cmd = [tt_cmd, "start", app_name]
start_application(start_cmd, tmpdir, app_name, instances)

# Check status.
for inst in instances:
file = wait_file(os.path.join(run_dir, inst), 'tarantool.pid', [])
assert file != ""
file = wait_file(os.path.join(run_dir, inst), control_socket, [])
assert file != ""
# App is ready, but It is necessary to wait for some time
# to be able to establish a connection.
time.sleep(3)
uri = "tcp://client:[email protected]:3301"
upgrade_cmd = [tt_cmd, "replicaset", "upgrade", uri, "-t=15"]
rc, out = run_command_and_get_output(upgrade_cmd, cwd=tmpdir)
assert rc == 0
assert "ok" in out

finally:
stop_cmd = [tt_cmd, "stop", app_name, "-y"]
stop_rc, stop_out = run_command_and_get_output(stop_cmd, cwd=tmpdir)
assert stop_rc == 0

0 comments on commit d6d911d

Please sign in to comment.