Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tt replicaset upgrade: support remote replica set #1030

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions cli/cmd/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ var (
// newUpgradeCmd creates a "replicaset upgrade" command.
func newUpgradeCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "upgrade (<APP_NAME>) [flags]",
Use: "upgrade (<APP_NAME> | <URI>) [flags]\n\n" +
replicasetUriHelp,
DisableFlagsInUseLine: true,
Short: "Upgrade tarantool cluster",
Long: "Upgrade tarantool cluster.\n\n" +
Expand All @@ -79,6 +80,7 @@ func newUpgradeCmd() *cobra.Command {
"timeout for waiting the LSN synchronization (in seconds)")

addOrchestratorFlags(cmd)
addTarantoolConnectFlags(cmd)
return cmd
}

Expand Down Expand Up @@ -530,6 +532,18 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e
if ctx.IsInstanceConnect {
defer ctx.Conn.Close()
}

connectCtx := connect.ConnectCtx{
Username: replicasetUser,
Password: replicasetPassword,
SslKeyFile: replicasetSslKeyFile,
SslCertFile: replicasetSslCertFile,
SslCaFile: replicasetSslCaFile,
SslCiphers: replicasetSslCiphers,
}
var connOpts connector.ConnectOpts
connOpts, _, _ = resolveConnectOpts(cmdCtx, cliOpts, &connectCtx, args)

return replicasetcmd.Upgrade(replicasetcmd.DiscoveryCtx{
IsApplication: ctx.IsApplication,
RunningCtx: ctx.RunningCtx,
Expand All @@ -538,7 +552,7 @@ func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) e
}, replicasetcmd.UpgradeOpts{
ChosenReplicasetAliases: chosenReplicasetAliases,
LsnTimeout: lsnTimeout,
})
}, connOpts)
}

// internalReplicasetPromoteModule is a "promote" command for the replicaset module.
Expand Down
55 changes: 39 additions & 16 deletions cli/replicaset/cmd/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func filterReplicasetsByAliases(replicasets replicaset.Replicasets,
}

// Upgrade upgrades tarantool schema.
func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts) error {
func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts, connOpts connector.ConnectOpts) error {
replicasets, err := getReplicasets(discoveryCtx)
if err != nil {
return err
Expand All @@ -75,12 +75,13 @@ func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts) error {
return err
}

return internalUpgrade(replicasetsToUpgrade, opts.LsnTimeout)
return internalUpgrade(replicasetsToUpgrade, opts.LsnTimeout, connOpts)
}

func internalUpgrade(replicasets []replicaset.Replicaset, lsnTimeout int) error {
func internalUpgrade(replicasets []replicaset.Replicaset, lsnTimeout int,
connOpts connector.ConnectOpts) error {
for _, replicaset := range replicasets {
err := upgradeReplicaset(replicaset, lsnTimeout)
err := upgradeReplicaset(replicaset, lsnTimeout, connOpts)
if err != nil {
fmt.Printf("• %s: error\n", replicaset.Alias)
return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err)
Expand All @@ -99,7 +100,8 @@ func closeConnectors(master *instanceMeta, replicas []instanceMeta) {
}
}

func getInstanceConnector(instance replicaset.Instance) (connector.Connector, error) {
func getInstanceConnector(instance replicaset.Instance,
connOpts connector.ConnectOpts) (connector.Connector, error) {
run := instance.InstanceCtx
fullInstanceName := running.GetAppInstanceName(run)
if fullInstanceName == "" {
Expand All @@ -116,33 +118,53 @@ func getInstanceConnector(instance replicaset.Instance) (connector.Connector, er
})

if err != nil {
return nil, fmt.Errorf("instance %s failed to connect via UNIX socket "+
": %w", fullInstanceName, err)
fErr := err
conn, err = connector.Connect(connector.ConnectOpts{
Network: connOpts.Network,
Address: instance.URI,
Username: connOpts.Username,
Password: connOpts.Password,
Ssl: connOpts.Ssl,
})
if err != nil {
return nil, fmt.Errorf("instance %s failed to connect via UNIX socket "+
"and uri: %w %w", fullInstanceName, err, fErr)
}
}
return conn, nil
}

func collectRWROInfo(replset replicaset.Replicaset) (*instanceMeta, []instanceMeta,
func collectRWROInfo(replset replicaset.Replicaset,
connOpts connector.ConnectOpts) (*instanceMeta, []instanceMeta,
error) {
var master *instanceMeta = nil
var replicas []instanceMeta
for _, instance := range replset.Instances {
run := instance.InstanceCtx
fullInstanceName := running.GetAppInstanceName(run)
conn, err := getInstanceConnector(instance)
conn, err := getInstanceConnector(instance, connOpts)

if err != nil {
return nil, nil, err
}

isRW := false

if instance.Mode == replicaset.ModeUnknown {
closeConnectors(master, replicas)
return nil, nil, fmt.Errorf(
"can't determine RO/RW mode on instance: %s", fullInstanceName)
// Discovery cannot determine all instance modes of the remote replica set.
Copy link
Contributor

@oleg-jukovec oleg-jukovec Nov 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such cases, we need to update the discovery code rather than create workrounds and spread it across the code.

res, err := conn.Eval(
"return (type(box.cfg) == 'function') or box.info.ro",
[]any{}, connector.RequestOpts{})
if err != nil || len(res) == 0 {
return nil, nil, fmt.Errorf(
"can't determine RO/RW mode on instance: %s",
fullInstanceName)
}
isRW = !res[0].(bool)
} else {
isRW = instance.Mode.String() == "rw"
}

isRW := instance.Mode.String() == "rw"

if isRW && master != nil {
closeConnectors(master, replicas)
return nil, nil, fmt.Errorf("%s and %s are both masters",
Expand Down Expand Up @@ -231,8 +253,9 @@ func snapshot(instance *instanceMeta) error {
return nil
}

func upgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int) error {
master, replicas, err := collectRWROInfo(replicaset)
func upgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int,
connOpts connector.ConnectOpts) error {
master, replicas, err := collectRWROInfo(replicaset, connOpts)
if err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions test/integration/replicaset/small_cluster_app/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
credentials:
users:
client:
password: 'secret'
roles: [super]
guest:
roles: [super]

groups:
group-001:
replicasets:
replicaset-001:
instances:
storage-master:
iproto:
listen:
- uri: '127.0.0.1:3301'
database:
mode: rw
storage-replica:
iproto:
listen:
- uri: '127.0.0.1:3302'
database:
mode: ro
4 changes: 4 additions & 0 deletions test/integration/replicaset/small_cluster_app/instances.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
storage-master:

storage-replica:

54 changes: 53 additions & 1 deletion test/integration/replicaset/test_replicaset_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import shutil
import subprocess
import tempfile
import time

import pytest
from replicaset_helpers import stop_application
from vshard_cluster import VshardCluster

from utils import get_tarantool_version, run_command_and_get_output, wait_file
from utils import (control_socket, get_tarantool_version,
run_command_and_get_output, run_path, wait_file)

tarantool_major_version, _ = get_tarantool_version()

Expand Down Expand Up @@ -192,3 +194,53 @@ def test_upgrade_downgraded_cluster_replicasets(tt_cmd, tmp_path):

finally:
app.stop()


def start_application(cmd, workdir, app_name, instances):
instance_process = subprocess.Popen(
cmd,
cwd=workdir,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
text=True
)
start_output = instance_process.stdout.read()
for inst in instances:
assert f"Starting an instance [{app_name}:{inst}]" in start_output


@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip cluster instances test for Tarantool < 3")
def test_upgrade_remote_replicasets(tt_cmd, tmpdir_with_cfg):
tmpdir = tmpdir_with_cfg
app_name = "small_cluster_app"
app_path = os.path.join(tmpdir, app_name)
shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path)

run_dir = os.path.join(tmpdir, app_name, run_path)
instances = ['storage-master', 'storage-replica']

try:
# Start an instance.
start_cmd = [tt_cmd, "start", app_name]
start_application(start_cmd, tmpdir, app_name, instances)

# Check status.
for inst in instances:
file = wait_file(os.path.join(run_dir, inst), 'tarantool.pid', [])
assert file != ""
file = wait_file(os.path.join(run_dir, inst), control_socket, [])
assert file != ""
# App is ready, but It is necessary to wait for some time
# to be able to establish a connection.
time.sleep(3)
uri = "tcp://client:[email protected]:3301"
upgrade_cmd = [tt_cmd, "replicaset", "upgrade", uri, "-t=15"]
rc, out = run_command_and_get_output(upgrade_cmd, cwd=tmpdir)
assert rc == 0
assert "ok" in out

finally:
stop_cmd = [tt_cmd, "stop", app_name, "-y"]
stop_rc, stop_out = run_command_and_get_output(stop_cmd, cwd=tmpdir)
assert stop_rc == 0
Loading