Skip to content

Commit

Permalink
add auto start from snapshot (#3193)
Browse files Browse the repository at this point in the history
Co-authored-by: KamiD <[email protected]>
  • Loading branch information
FineKe and KamiD authored Jul 11, 2023
1 parent 9483c69 commit c45db7d
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 1 deletion.
3 changes: 3 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"fmt"
"github.com/okex/exchain/libs/cosmos-sdk/client/flags"
"io"
"os"
"runtime/debug"
Expand Down Expand Up @@ -958,6 +959,8 @@ func NewAccNonceHandler(ak auth.AccountKeeper) sdk.AccNonceHandler {
}

func PreRun(ctx *server.Context, cmd *cobra.Command) error {
prepareSnapshotDataIfNeed(viper.GetString(server.FlagStartFromSnapshot), viper.GetString(flags.FlagHome), ctx.Logger)

// check start flag conflicts
err := sanity.CheckStart()
if err != nil {
Expand Down
199 changes: 199 additions & 0 deletions app/start_from_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package app

import (
"archive/tar"
"bytes"
"fmt"
"github.com/klauspost/pgzip"
"github.com/okex/exchain/libs/cosmos-sdk/types/errors"
"github.com/okex/exchain/libs/tendermint/libs/log"
"io"
"io/ioutil"
"net/url"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"time"
)

func prepareSnapshotDataIfNeed(snapshotURL string, home string, logger log.Logger) {
if snapshotURL == "" {
return
}

snapshotHome := filepath.Join(home, ".download_snapshots")

// check whether the snapshot file has been downloaded
byteData, err := os.ReadFile(filepath.Join(snapshotHome, ".record"))
if err == nil && strings.Contains(string(byteData), snapshotURL) {
return
}

if _, err := url.Parse(snapshotURL); err != nil {
panic(errors.Wrap(err, "invalid snapshot URL"))
}

// download snapshot
snapshotFile, err := downloadSnapshot(snapshotURL, snapshotHome, logger)
if err != nil {
panic(err)
}

// uncompress snapshot
logger.Info("start to uncompress snapshot")
if err := extractTarGz(snapshotFile, snapshotHome); err != nil {
panic(err)
}

// delete damaged data
logger.Info("start to delete damaged data")
if err := os.RemoveAll(filepath.Join(home, "data")); err != nil {
panic(err)
}

// move snapshot data
logger.Info("start to move snapshot data")
if err := moveDir(filepath.Join(snapshotHome, "data"), filepath.Join(home, "data")); err != nil {
panic(err)
}

os.Remove(snapshotFile)

os.WriteFile(filepath.Join(snapshotHome, ".record"), []byte(snapshotURL+"\n"), 0644)

logger.Info("snapshot data is ready, start node soon!")
}

func downloadSnapshot(url, outputPath string, logger log.Logger) (string, error) {
// create dir
_, err := os.Stat(outputPath)
if err != nil {
os.MkdirAll(outputPath, 0755)
}

fileName := url[strings.LastIndex(url, "/")+1:]
targetFile := filepath.Join(outputPath, fileName)

// check file exists
if _, err := os.Stat(targetFile); err == nil {
os.Remove(targetFile)
}

var stdoutProcessStatus bytes.Buffer

axel := exec.Command("axel", "-n", fmt.Sprintf("%d", runtime.NumCPU()), "-o", targetFile, url)
axel.Stdout = io.MultiWriter(ioutil.Discard, &stdoutProcessStatus)
done := make(chan struct{})
defer close(done)

// print download detail
go func() {
tick := time.NewTicker(time.Millisecond * 50)
defer tick.Stop()
for {
select {
case <-done:
return
case <-tick.C:
bts := make([]byte, stdoutProcessStatus.Len())
stdoutProcessStatus.Read(bts)
logger.Info(string(bts))
}
}
}()

// run and wait
err = axel.Run()
if err != nil {
return "", err
}

return targetFile, nil
}

func extractTarGz(tarGzFile, destinationDir string) error {
// open .tar.gz
file, err := os.Open(tarGzFile)
if err != nil {
return err
}
defer file.Close()

// use gzip.Reader
gzReader, err := pgzip.NewReaderN(file, 1<<22, runtime.NumCPU())
if err != nil {
return err
}
defer gzReader.Close()

// create tar.Reader
tarReader := tar.NewReader(gzReader)

// uncompress file back to back
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
if header == nil {
continue
}
target := filepath.Join(destinationDir, header.Name)

switch header.Typeflag {
case tar.TypeDir:
err = os.MkdirAll(target, 0755)
if err != nil {
return err
}
case tar.TypeReg:
parent := filepath.Dir(target)
err = os.MkdirAll(parent, 0755)
if err != nil {
return err
}

file, err := os.Create(target)
if err != nil {
return err
}
defer file.Close()

_, err = io.Copy(file, tarReader)
if err != nil {
return err
}
}
}

return nil
}

func moveDir(sourceDir, destinationDir string) error {
sourceInfo, err := os.Stat(sourceDir)
if err != nil {
return err
}

if !sourceInfo.IsDir() {
return fmt.Errorf("%s isn't dir", sourceDir)
}

_, err = os.Stat(destinationDir)
if err == nil {
return fmt.Errorf("dest dir %s exists", destinationDir)
}

// move
err = os.Rename(sourceDir, destinationDir)
if err != nil {
return err
}

return nil
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/jmhodges/levigo v1.0.0
github.com/json-iterator/go v1.1.12
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada
github.com/libp2p/go-buffer-pool v0.1.0
github.com/magiconair/properties v1.8.6
github.com/mattn/go-isatty v0.0.14
Expand Down Expand Up @@ -136,6 +137,8 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jonboulle/clockwork v0.2.0 // indirect
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 // indirect
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,13 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/kkdai/bstream v1.0.0/go.mod h1:FDnDOHt5Yx4p3FaHcioFT0QjDOtgUpvjeZqAs+NVZZA=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 h1:KAZ1BW2TCmT6PRihDPpocIy1QTtsAsrx6TneU/4+CMg=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada h1:3L+neHp83cTjegPdCiOxVOJtRIy7/8RldvMTsyPYH10=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/reedsolomon v1.9.2/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
3 changes: 2 additions & 1 deletion libs/cosmos-sdk/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ const (

FlagFastSyncGap = "fastsync-gap"

FlagEventBlockTime = "event-block-time"
FlagEventBlockTime = "event-block-time"
FlagStartFromSnapshot = "start-from-snapshot"
)

// StartCmd runs the service passed in, either stand-alone or in-process with
Expand Down
3 changes: 3 additions & 0 deletions libs/cosmos-sdk/server/start_okchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command {
viper.BindPFlag(FlagEvmImportMode, cmd.Flags().Lookup(FlagEvmImportMode))
viper.BindPFlag(FlagEvmImportPath, cmd.Flags().Lookup(FlagEvmImportPath))
viper.BindPFlag(FlagGoroutineNum, cmd.Flags().Lookup(FlagGoroutineNum))
viper.BindPFlag(FlagStartFromSnapshot, cmd.Flags().Lookup(FlagStartFromSnapshot))

cmd.Flags().Int(state.FlagDeliverTxsExecMode, 0, "Execution mode for deliver txs, (0:serial[default], 1:deprecated, 2:parallel)")
cmd.Flags().Bool(state.FlagEnableConcurrency, false, "Enable concurrency for deliver txs")
Expand Down Expand Up @@ -275,6 +276,8 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command {
cmd.Flags().Int64(FlagCommitGapHeight, 100, "Block interval to commit cached data into db, affects iavl & mpt")

cmd.Flags().Int64(FlagFastSyncGap, 20, "Block height interval to switch fast-sync mode")
cmd.Flags().String(FlagStartFromSnapshot, "", "Snapshot URL which uses to start node")
cmd.Flags().MarkHidden(FlagStartFromSnapshot)

return cmd
}
Expand Down

0 comments on commit c45db7d

Please sign in to comment.