Skip to content

Commit

Permalink
feat(scyllaclient): set transfers in rcloneMoveOrCopyDir
Browse files Browse the repository at this point in the history
This commit requires changes in the usage of RcloneMoveDir in backup pkg.
  • Loading branch information
Michal-Leszczynski committed Oct 11, 2024
1 parent 15b86ec commit 1fc71e9
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 20 deletions.
11 changes: 6 additions & 5 deletions pkg/scyllaclient/client_rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ func (c *Client) rcloneMoveOrCopyFile(ctx context.Context, host, dstRemotePath,
// Returns ID of the asynchronous job.
// Remote path format is "name:bucket/path".
// If specified, a suffix will be added to otherwise overwritten or deleted files.
func (c *Client) RcloneMoveDir(ctx context.Context, host, dstRemotePath, srcRemotePath, suffix string) (int64, error) {
return c.rcloneMoveOrCopyDir(ctx, host, dstRemotePath, srcRemotePath, true, suffix)
func (c *Client) RcloneMoveDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath, suffix string) (int64, error) {
return c.rcloneMoveOrCopyDir(ctx, host, transfers, dstRemotePath, srcRemotePath, true, suffix)
}

// RcloneCopyDir copies contents of the directory pointed by srcRemotePath to
Expand All @@ -243,11 +243,11 @@ func (c *Client) RcloneMoveDir(ctx context.Context, host, dstRemotePath, srcRemo
// Returns ID of the asynchronous job.
// Remote path format is "name:bucket/path".
// If specified, a suffix will be added to otherwise overwritten or deleted files.
func (c *Client) RcloneCopyDir(ctx context.Context, host, dstRemotePath, srcRemotePath, suffix string) (int64, error) {
return c.rcloneMoveOrCopyDir(ctx, host, dstRemotePath, srcRemotePath, false, suffix)
func (c *Client) RcloneCopyDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath, suffix string) (int64, error) {
return c.rcloneMoveOrCopyDir(ctx, host, transfers, dstRemotePath, srcRemotePath, false, suffix)
}

func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host, dstRemotePath, srcRemotePath string, doMove bool, suffix string) (int64, error) {
func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath string, doMove bool, suffix string) (int64, error) {
dstFs, dstRemote, err := rcloneSplitRemotePath(dstRemotePath)
if err != nil {
return 0, err
Expand All @@ -262,6 +262,7 @@ func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host, dstRemotePath, s
SrcFs: srcFs,
SrcRemote: srcRemote,
Suffix: suffix,
Transfers: int64(transfers),
}

var jobID int64
Expand Down
20 changes: 10 additions & 10 deletions pkg/scyllaclient/client_rclone_agent_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestRcloneDeletePathsInBatchesAgentIntegration(t *testing.T) {
t.Fatalf("Create files on Scylla node, err = {%s}, stdOut={%s}, stdErr={%s}", err, stdOut, stdErr)
}
}
id, err := client.RcloneCopyDir(ctx, testHost, remotePath(dirName), "data:"+dirName, "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(dirName), "data:"+dirName, "")
if err != nil {
t.Fatal(errors.Wrap(err, "copy created files to backup location"))
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestRcloneSkippingFilesAgentIntegration(t *testing.T) {
if err != nil {
t.Fatal(err)
}
id, err := client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp/copy", "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp/copy", "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -223,7 +223,7 @@ func TestRcloneSkippingFilesAgentIntegration(t *testing.T) {
}
}

id, err = client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp/copy", "")
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp/copy", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestRcloneStoppingTransferIntegration(t *testing.T) {
}
}()

id, err := client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp", "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) {
}()

Print("When: first batch upload")
id, err := client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp", "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -383,7 +383,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) {
}

Print("When: second batch upload")
id, err = client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp", "")
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -408,7 +408,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) {
}

Print("When: third batch upload")
id, err = client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp", "")
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -487,7 +487,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) {

Print("Copy src into dst")

id, err := client.RcloneCopyDir(ctx, testHost, dstPath, srcPath, "")
id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) {

Print("Copy src into dst after file modification")

id, err = client.RcloneCopyDir(ctx, testHost, dstPath, srcPath, firstSuffix)
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, firstSuffix)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) {

Print("Copy src into dst after another file modification")

id, err = client.RcloneCopyDir(ctx, testHost, dstPath, srcPath, secondSuffix)
id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, secondSuffix)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scyllaclient/client_rclone_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestRcloneLocalToS3CopyDirIntegration(t *testing.T) {
ctx := context.Background()

copyDir := func(dir string) (*scyllaclient.RcloneJobInfo, error) {
id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, remotePath("/copy"), "rclonetest:"+dir, "")
id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, remotePath("/copy"), "rclonetest:"+dir, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestRcloneS3ToLocalCopyDirIntegration(t *testing.T) {
defer closeServer()
ctx := context.Background()

id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, "rclonetest:foo", remotePath("/copy"), "")
id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, "rclonetest:foo", remotePath("/copy"), "")
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/service/backup/worker_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (w *worker) uploadSnapshotDir(ctx context.Context, h hostInfo, d snapshotDi
retries = 10
)
for i := 0; i < retries; i++ {
if err := w.uploadDataDir(ctx, dataDst, dataSrc, d); err != nil {
if err := w.uploadDataDir(ctx, h, dataDst, dataSrc, d); err != nil {
if errors.Is(err, errJobNotFound) {
continue
}
Expand All @@ -184,9 +184,9 @@ func (w *worker) uploadSnapshotDir(ctx context.Context, h hostInfo, d snapshotDi
return nil
}

func (w *worker) uploadDataDir(ctx context.Context, dst, src string, d snapshotDir) error {
func (w *worker) uploadDataDir(ctx context.Context, hi hostInfo, dst, src string, d snapshotDir) error {
// Ensure file versioning during upload
id, err := w.Client.RcloneMoveDir(ctx, d.Host, dst, src, VersionedFileExt(w.SnapshotTag))
id, err := w.Client.RcloneMoveDir(ctx, d.Host, hi.Transfers, dst, src, VersionedFileExt(w.SnapshotTag))
if err != nil {
return err
}
Expand Down

0 comments on commit 1fc71e9

Please sign in to comment.