From 04564fb1a3b4f2765ac8e049a80490007f10e43b Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Tue, 12 Jul 2022 14:53:13 +0900 Subject: [PATCH 01/27] Make errors.Unwrap,errors.Is compatible with standard errors package --- common/errors/errors.go | 45 +++++++++++++++++++++--------------- common/errors/errors_test.go | 12 +++++----- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/common/errors/errors.go b/common/errors/errors.go index 4deccec3e..16e62897f 100644 --- a/common/errors/errors.go +++ b/common/errors/errors.go @@ -2,6 +2,7 @@ package errors import ( "fmt" + "reflect" "github.com/pkg/errors" ) @@ -311,22 +312,37 @@ type Unwrapper interface { Unwrap() error } +func Unwrap(err error) error { + switch obj := err.(type) { + case interface{ Unwrap() error }: + return obj.Unwrap() + case interface{ Cause() error }: + return obj.Cause() + default: + return nil + } +} + // Is checks whether err is caused by the target. func Is(err, target error) bool { - cause := FindCause(err, func(err error) bool { + if target == nil { return err == target - }) - return cause != nil + } + isComparable := reflect.TypeOf(target).Comparable() + for { + if isComparable && err == target { + return true + } + if x, ok := err.(interface{ Is(error) bool }); ok && x.Is(target) { + return true + } + if err = Unwrap(err); err == nil { + return false + } + } } func FindCause(err error, cb func(err error) bool) error { - type causer interface { - Cause() error - } - - type unwrapper interface { - Unwrap() error - } for { if err == nil { return nil @@ -334,14 +350,7 @@ func FindCause(err error, cb func(err error) bool) error { if cb(err) { return err } - switch obj := err.(type) { - case causer: - err = obj.Cause() - case unwrapper: - err = obj.Unwrap() - default: - return nil - } + err = Unwrap(err) } } diff --git a/common/errors/errors_test.go b/common/errors/errors_test.go index dc0e74c34..fd4ee710e 100644 --- a/common/errors/errors_test.go +++ b/common/errors/errors_test.go @@ -136,18 +136,18 @@ func TestIs(t *testing.T) { e := Errorc(IllegalArgumentError, "IllegalArgument") e2 := Wrap(e, "MyTest") - if Is(e, e2) { + if Is(e, e2) || errors.Is(e, e2) { t.Error("Fail to check !Is(origin, Wrap(origin)) is FALSE") } - if !Is(e2, e) { - t.Error("Fail to check Is(Wrap(origin) origin) is TRUE") + if !Is(e2, e) || !errors.Is(e2, e) { + t.Error("Fail to check Is(Wrap(origin), origin) is TRUE") } e3 := Wrapc(e, UnsupportedError, "MyTest2") - if Is(e, e3) { + if Is(e, e3) || errors.Is(e, e3) { t.Error("Fail to check !Is(origin, Wrapc(origin)) is FALSE") } - if !Is(e3, e) { - t.Error("Fail to check Is(Wrapc(origin) origin) is TRUE") + if !Is(e3, e) || !errors.Is(e3, e) { + t.Error("Fail to check Is(Wrapc(origin), origin) is TRUE") } } From c60bfdff3ac6deb9db02879aa7581b6d7a015920 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Tue, 12 Jul 2022 16:44:23 +0900 Subject: [PATCH 02/27] Reduce number of GetSnapshot() for WorldState --- service/manager.go | 5 +++-- service/transaction/genesis_v3.go | 2 +- service/transaction/transaction_v2.go | 2 +- service/transaction/transactionhandler.go | 16 +++++++++++----- service/transition_pe.go | 2 +- service/transition_se.go | 2 +- test/transaction.go | 8 ++++---- 7 files changed, 22 insertions(+), 15 deletions(-) diff --git a/service/manager.go b/service/manager.go index f603106b4..2faea6454 100644 --- a/service/manager.go +++ b/service/manager.go @@ -693,7 +693,8 @@ func (m *manager) ExecuteTransaction(result []byte, vh []byte, js []byte, bi mod defer txh.Dispose() var wc state.WorldContext - if wss, err := m.trc.GetWorldSnapshot(result, vh); err == nil { + wss, err := m.trc.GetWorldSnapshot(result, vh) + if err == nil { ws, err := state.WorldStateFromSnapshot(wss) if err != nil { return nil, err @@ -713,7 +714,7 @@ func (m *manager) ExecuteTransaction(result []byte, vh []byte, js []byte, bi mod }) ctx.UpdateSystemInfo() - return txh.Execute(ctx, true) + return txh.Execute(ctx, wss, true) } func (m *manager) AddSyncRequest(id db.BucketID, key []byte) error { diff --git a/service/transaction/genesis_v3.go b/service/transaction/genesis_v3.go index df77d590b..412e1d231 100644 --- a/service/transaction/genesis_v3.go +++ b/service/transaction/genesis_v3.go @@ -238,7 +238,7 @@ func (g *genesisV3) Prepare(ctx contract.Context) (state.WorldContext, error) { return ctx.GetFuture(lq), nil } -func (g *genesisV3) Execute(ctx contract.Context, estimate bool) (txresult.Receipt, error) { +func (g *genesisV3) Execute(ctx contract.Context, wcs state.WorldSnapshot, estimate bool) (txresult.Receipt, error) { cc := contract.NewCallContext(ctx, ctx.GetStepLimit(state.StepLimitTypeInvoke), false) defer cc.Dispose() diff --git a/service/transaction/transaction_v2.go b/service/transaction/transaction_v2.go index 23285162c..e665a69d0 100644 --- a/service/transaction/transaction_v2.go +++ b/service/transaction/transaction_v2.go @@ -142,7 +142,7 @@ func (tx *transactionV2) Prepare(ctx contract.Context) (state.WorldContext, erro return ctx.GetFuture(lq), nil } -func (tx *transactionV2) Execute(ctx contract.Context, estimate bool) (txresult.Receipt, error) { +func (tx *transactionV2) Execute(ctx contract.Context, wcs state.WorldSnapshot, estimate bool) (txresult.Receipt, error) { r := txresult.NewReceipt(ctx.Database(), ctx.Revision(), tx.To()) trans := new(big.Int).Add(&tx.Value.Int, version2FixedFee) as1 := ctx.GetAccountState(tx.From().ID()) diff --git a/service/transaction/transactionhandler.go b/service/transaction/transactionhandler.go index 4a9c8bbd4..7a5d17148 100644 --- a/service/transaction/transactionhandler.go +++ b/service/transaction/transactionhandler.go @@ -16,7 +16,10 @@ import ( type Handler interface { Prepare(ctx contract.Context) (state.WorldContext, error) - Execute(ctx contract.Context, estimate bool) (txresult.Receipt, error) + // Execute executes transaction in the Handler. + // wcs is a snapshot of the current state. + // estimate would be true if it's executed for estimating steps. + Execute(ctx contract.Context, wcs state.WorldSnapshot, estimate bool) (txresult.Receipt, error) Dispose() } @@ -157,10 +160,7 @@ func (th *transactionHandler) DoExecute(cc contract.CallContext, estimate, isPat return status, addr, nil } -func (th *transactionHandler) Execute(ctx contract.Context, estimate bool) (txresult.Receipt, error) { - // Make a copy of initial state - wcs := ctx.GetSnapshot() - +func (th *transactionHandler) Execute(ctx contract.Context, wcs state.WorldSnapshot, estimate bool) (txresult.Receipt, error) { isPatch := th.group == module.TransactionGroupPatch limit := th.stepLimit if invokeLimit := ctx.GetStepLimit(state.StepLimitTypeInvoke); isPatch || estimate || limit.Cmp(invokeLimit) > 0 { @@ -212,6 +212,12 @@ func (th *transactionHandler) Execute(ctx contract.Context, estimate bool) (txre stepToPay, redeemed, old) } } + if stepPrice == nil { + logger.Debugf("MKSONG StepPrice is NIL") + } + if stepToPay == nil { + logger.Debugf("MKSONG StepToPay is NIL") + } fee := new(big.Int).Mul(stepToPay, stepPrice) as := ctx.GetAccountState(th.from.ID()) diff --git a/service/transition_pe.go b/service/transition_pe.go index 8508042c1..d6768ec9c 100644 --- a/service/transition_pe.go +++ b/service/transition_pe.go @@ -92,7 +92,7 @@ func (t *transition) executeTxsConcurrent(level int, l module.TransactionList, c From: txo.From(), }) ctx.UpdateSystemInfo() - rct, err := txh.Execute(ctx, false) + rct, err := txh.Execute(ctx, wvss, false) txh.Dispose() if err == nil { err = t.plt.OnTransactionEnd(ctx, t.log, rct) diff --git a/service/transition_se.go b/service/transition_se.go index 36e501a73..c2c1d62e1 100644 --- a/service/transition_se.go +++ b/service/transition_se.go @@ -52,7 +52,7 @@ func (t *transition) executeTxsSequential(l module.TransactionList, ctx contract return err } ctx.UpdateSystemInfo() - rct, err := txh.Execute(ctx, false) + rct, err := txh.Execute(ctx, wcs, false) txh.Dispose() if err == nil { err = t.plt.OnTransactionEnd(ctx, t.log, rct) diff --git a/test/transaction.go b/test/transaction.go index 0b831f9eb..3a23468e4 100644 --- a/test/transaction.go +++ b/test/transaction.go @@ -113,7 +113,7 @@ func (t *Transaction) Prepare(ctx contract.Context) (state.WorldContext, error) return ctx.GetFuture(lq), nil } -func (t *Transaction) Execute(ctx contract.Context, estimate bool) (txresult.Receipt, error) { +func (t *Transaction) Execute(ctx contract.Context, wcs state.WorldSnapshot, estimate bool) (txresult.Receipt, error) { if t.json.Validators != nil { var vl []module.Validator for _, addr := range t.json.Validators { @@ -186,7 +186,7 @@ func (t *Transaction) Version() int { func (t *Transaction) ToJSON(version module.JSONVersion) (interface{}, error) { res := map[string]interface{}{ "timestamp": &t.json.TimeStamp, - "type": "test", + "type": "test", } if t.json.Validators != nil { res["validators"] = t.json.Validators @@ -255,7 +255,7 @@ func (t *Transaction) ClearCache() { func checkJSONTX(tx map[string]interface{}) bool { val, ok := tx["type"] - return ok && val=="test" + return ok && val == "test" } func parseJSONTX(js []byte, raw bool) (transaction.Transaction, error) { @@ -271,7 +271,7 @@ var once sync.Once func RegisterTransactionFactory() { once.Do(func() { transaction.RegisterFactory(&transaction.Factory{ - Priority: 5, + Priority: 5, CheckJSON: checkJSONTX, ParseJSON: parseJSONTX, }) From bd72ba3f6e5ab78a8e27282801f0766df765d57e Mon Sep 17 00:00:00 2001 From: Hyunil Jung Date: Thu, 14 Jul 2022 17:45:03 +0900 Subject: [PATCH 03/27] Call Regulator.OnPropose if cs enters/passes propose step There are cases cs skips propose step and those case. Even in the case, call Regulator.OnPropose for more correct proposal interval and timing regulation. BDT-230 --- consensus/consensus.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/consensus/consensus.go b/consensus/consensus.go index 1d841f33b..12b7e0af2 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -288,6 +288,11 @@ func (cs *consensus) resetForNewRound(round int32) { func (cs *consensus) resetForNewStep(step step) { cs.endStep() + if cs.step < stepPropose && step > stepPropose { + now := time.Now() + cs.nextProposeTime = now + cs.c.Regulator().OnPropose(now) + } cs.beginStep(step) } From 691c166c972915d107badcaf7dd8ee11266a03c6 Mon Sep 17 00:00:00 2001 From: Jaechang Namgoong Date: Thu, 21 Jul 2022 15:15:04 +0900 Subject: [PATCH 04/27] Propagate error when it failed to process genesis template --- chain/gs/genesis.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/chain/gs/genesis.go b/chain/gs/genesis.go index 1d896594b..be2692851 100644 --- a/chain/gs/genesis.go +++ b/chain/gs/genesis.go @@ -348,9 +348,11 @@ func WriteFromPath(w io.Writer, p string) error { writer: gsw, path: genesisDir, }, genesisObj) + if err != nil { + return errors.Wrap(err, "Fail to process content") + } // write genesis data at last - genesis, err = json.Marshal(genesisObj) if err != nil { return errors.Wrap(err, "Fail to marshal JSON") From 309855d9ad76a777280238182cd6622e93b4bbdd Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Fri, 22 Jul 2022 13:23:14 +0900 Subject: [PATCH 05/27] Supress logs in rocksdb implementation --- common/db/rocks_db.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/db/rocks_db.go b/common/db/rocks_db.go index 5d6493bb0..4dd1e6b15 100644 --- a/common/db/rocks_db.go +++ b/common/db/rocks_db.go @@ -1,3 +1,4 @@ +//go:build rocksdb // +build rocksdb /* @@ -75,7 +76,7 @@ func NewRocksDB(name string, dir string) (*RocksDB, error) { if cfs := C.rocksdb_list_column_families(opts, cName, &cfsLen, &cErr); cErr != nil { errMsg := C.GoString(cErr) C.rocksdb_free(unsafe.Pointer(cErr)) - log.Infoln("fail to rocksdb_list_column_families", errMsg) + log.Traceln("fail to rocksdb_list_column_families", errMsg) //ignore and try open cErr = nil @@ -88,7 +89,7 @@ func NewRocksDB(name string, dir string) (*RocksDB, error) { } } else { numOfCfs := int(cfsLen) - log.Infoln("rocksdb_list_column_families returns num:", numOfCfs) + log.Traceln("rocksdb_list_column_families returns num:", numOfCfs) cfOpts := make([]*C.rocksdb_options_t, numOfCfs) for i := 0; i < numOfCfs; i++ { From a4b9c21bdf656618c517d86205de52692815b433 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Fri, 22 Jul 2022 13:23:40 +0900 Subject: [PATCH 06/27] Fix problem, failing without missing directory in backup --- chain/taskbackup.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/chain/taskbackup.go b/chain/taskbackup.go index f3079f806..87efde050 100644 --- a/chain/taskbackup.go +++ b/chain/taskbackup.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io" + "io/fs" "io/ioutil" "os" "path" @@ -138,7 +139,9 @@ func (t *taskBackup) Start() (ret error) { func zipWrite(writer *zip.Writer, p, n string, on func(int64) error) error { p2 := path.Join(p, n) st, err := os.Stat(p2) - if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil + } else if err != nil { return errors.Wrap(err, "writeToZip: FAIL on os.State") } if st.Mode().IsRegular() { @@ -190,7 +193,9 @@ func zipWrite(writer *zip.Writer, p, n string, on func(int64) error) error { func countFiles(p string) (int, error) { st, err := os.Stat(p) - if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return 0, nil + } else if err != nil { return 0, err } if !st.IsDir() { From f9755ffc7b1e6aab8b52a85d9e39b4acd445ddaf Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Fri, 22 Jul 2022 13:33:31 +0900 Subject: [PATCH 07/27] Rebuild build-deps on change of go-deps or rocksdb-deps --- docker/build-deps/update.sh | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/docker/build-deps/update.sh b/docker/build-deps/update.sh index de308dd48..d0bf9c16b 100755 --- a/docker/build-deps/update.sh +++ b/docker/build-deps/update.sh @@ -28,22 +28,28 @@ get_label() { echo $LABEL } -get_dockerfile() { - local TARGET=$1 - case $TARGET in - build) - echo Dockerfile - ;; - *) - echo ${TARGET}.Dockerfile - ;; +dockerfile_for() { + local TARGET="$1" + shift 1 + local PREFIX="" + if [ "$#" -gt 0 ] ; then + PREFIX="$1/docker/build-deps/" + shift 1 + fi + case ${TARGET} in + build) + echo "${PREFIX}Dockerfile" + ;; + *) + echo "${PREFIX}${TARGET}.Dockerfile" + ;; esac } get_hash_of_dir() { local TARGET=$1 local SRC_DIR=$2 - local DOCKERFILE=${SRC_DIR}/docker/build-deps/$(get_dockerfile ${TARGET}) + local DOCKERFILE=$(dockerfile_for ${TARGET} ${SRC_DIR}) local SUM local HASH_OF_DIR case $TARGET in @@ -72,6 +78,9 @@ get_hash_of_dir() { ;; build) SUM=$(get_hash_of_files \ + "${SRC_DIR}/go.mod" "${SRC_DIR}/go.sum" \ + "$(dockerfile_for go ${SRC_DIR})" \ + "$(dockerfile_for rocksdb ${SRC_DIR})" \ "${DOCKERFILE}") \ HASH_OF_DIR="go${GOLANG_VERSION}-rocksdb${ROCKSDB_VERSION}-alpine${ALPINE_VERSION}-${SUM}" ;; @@ -184,7 +193,7 @@ update_image() { cd ${BUILD_DIR} extra_files cp ${TARGET} ${SRC_DIR} - local DOCKERFILE=$(get_dockerfile ${TARGET}) + local DOCKERFILE=$(dockerfile_for ${TARGET}) echo "Building image ${TARGET_IMAGE} for ${HASH_OF_DIR}" docker build \ --build-arg ${LABEL}=${HASH_OF_DIR} \ From a7a9325ea721efcaca740a851eed65a4bdc1ae1c Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Thu, 28 Jul 2022 11:59:57 +0900 Subject: [PATCH 08/27] Fix problems in JSON RPC error messages --- server/jsonrpc/error.go | 6 ++--- server/jsonrpc/error_test.go | 46 ++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) create mode 100644 server/jsonrpc/error_test.go diff --git a/server/jsonrpc/error.go b/server/jsonrpc/error.go index aa56dc04c..f4ae05798 100644 --- a/server/jsonrpc/error.go +++ b/server/jsonrpc/error.go @@ -74,11 +74,11 @@ func (c ErrorCode) String() string { return "SystemTimeout" default: switch { - case c >= ErrorCodeServer && c < ErrorCodeServer+1000: + case c < ErrorCodeServer && c > ErrorCodeServer-1000: return fmt.Sprintf("ServerError(%d)", c) - case c >= ErrorCodeSystem && c < ErrorCodeSystem+1000: + case c < ErrorCodeSystem && c > ErrorCodeSystem-1000: return fmt.Sprintf("SystemError(%d)", c) - case c >= ErrorCodeScore && c < ErrorCodeScore+1000: + case c <= ErrorCodeScore && c > ErrorCodeScore-1000: return fmt.Sprintf("SCOREError(%d)", c) } return fmt.Sprintf("UnknownError(%d)", c) diff --git a/server/jsonrpc/error_test.go b/server/jsonrpc/error_test.go new file mode 100644 index 000000000..128788187 --- /dev/null +++ b/server/jsonrpc/error_test.go @@ -0,0 +1,46 @@ +/* + * Copyright 2022 ICON Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package jsonrpc + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestErrorCode_String(t *testing.T) { + tests := []struct { + name string + c ErrorCode + want string + }{ + {"ServerError", ErrorCodeServer, "ServerError"}, + {"ServerError(001)", ErrorCodeServer - 1, "ServerError(-32001)"}, + {"ServerError(999)", ErrorCodeServer - 999, "ServerError(-32999)"}, + {"SystemError", ErrorCodeSystem, "SystemError"}, + {"SystemError(010)", ErrorCodeSystem - 10, "SystemError(-31010)"}, + {"SystemError(999)", ErrorCodeSystem - 999, "SystemError(-31999)"}, + {"SCOREError(0)", ErrorCodeScore, "SCOREError(-30000)"}, + {"SCOREError(1)", ErrorCodeScore - 1, "SCOREError(-30001)"}, + {"SCOREError(999)", ErrorCodeScore - 999, "SCOREError(-30999)"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, tt.c.String(), "String()") + }) + } +} From c20000d1d3e3ead162d4f71187cf0de074dce34a Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Thu, 28 Jul 2022 12:00:04 +0900 Subject: [PATCH 09/27] Consider pruned height for JSON RPC APIs --- server/v3/api_v3.go | 72 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/server/v3/api_v3.go b/server/v3/api_v3.go index 88161f537..fedcaa50b 100644 --- a/server/v3/api_v3.go +++ b/server/v3/api_v3.go @@ -73,6 +73,18 @@ func fillTransactions(blockJson interface{}, b module.Block, v module.JSONVersio return nil } +func checkBaseHeight(c module.Chain, height int64) error { + if height < 0 { + return errors.NotFoundError.Errorf("NegativeHeight(height=%d)", height) + } + base := c.GenesisStorage().Height() + if height < base { + return errors.NotFoundError.Errorf( + "PrunedBlock(height=%d,base=%d)", height, base) + } + return nil +} + func getLastBlock(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{}, error) { debug := ctx.IncludeDebug() var param struct{} @@ -123,6 +135,10 @@ func getBlockByHeight(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{} return nil, jsonrpc.ErrorCodeServer.Wrap(err, debug) } + if err := checkBaseHeight(chain, height); err != nil { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } + bm := chain.BlockManager() if bm == nil { return nil, jsonrpc.ErrorCodeServer.New("Stopped") @@ -170,6 +186,9 @@ func getBlockByHash(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{}, } else if err != nil { return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) } + if err := checkBaseHeight(chain, block.Height()); err != nil { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } blockJson, err := block.ToJSON(module.JSONVersion3) if err != nil { @@ -201,7 +220,13 @@ func call(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{}, error) { return nil, jsonrpc.ErrorCodeServer.New("Stopped") } - block, err := getBlock(bm, param.Height) + block, err := getBlock(chain, bm, param.Height) + if err != nil { + if errors.NotFoundError.Equals(err) { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } + return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) + } bi := common.NewBlockInfo(block.Height(), block.Timestamp()) result, err := sm.Call(block.Result(), block.NextValidators(), params.RawMessage(), bi) if err != nil { @@ -217,11 +242,14 @@ func call(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{}, error) { } } -func getBlock(bm module.BlockManager, height jsonrpc.HexInt) (block module.Block, err error) { +func getBlock(chain module.Chain, bm module.BlockManager, height jsonrpc.HexInt) (block module.Block, err error) { if height == "" { block, err = bm.GetLastBlock() } else { h, _ := height.Int64() + if err := checkBaseHeight(chain, h); err != nil { + return nil, err + } block, err = bm.GetBlockByHeight(h) } return @@ -246,7 +274,7 @@ func getBalance(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{}, erro } var balance common.HexInt - block, err := getBlock(bm, param.Height) + block, err := getBlock(chain, bm, param.Height) if err != nil { return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) } @@ -273,7 +301,7 @@ func getScoreApi(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{}, err if bm == nil || sm == nil { return nil, jsonrpc.ErrorCodeServer.New("Stopped") } - b, err := getBlock(bm, param.Height) + b, err := getBlock(chain, bm, param.Height) if err != nil { return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) } @@ -313,7 +341,7 @@ func getTotalSupply(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{}, return nil, jsonrpc.ErrorCodeServer.New("Stopped") } - b, err := getBlock(bm, height) + b, err := getBlock(chain, bm, height) if err != nil { return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) } @@ -358,6 +386,9 @@ func getTransactionResult(ctx *jsonrpc.Context, params *jsonrpc.Params) (interfa } blk := txInfo.Block() + if err := checkBaseHeight(chain, blk.Height()); err != nil { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } receipt, err := txInfo.GetReceipt() if block.ResultNotFinalizedError.Equals(err) { return nil, jsonrpc.ErrorCodeExecuting.New("Executing") @@ -512,6 +543,10 @@ func getBlockHeaderByHeight(ctx *jsonrpc.Context, params *jsonrpc.Params) (inter return nil, jsonrpc.ErrorCodeServer.Wrap(err, debug) } + if err := checkBaseHeight(chain, height); err != nil { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } + bm := chain.BlockManager() if bm == nil { return nil, jsonrpc.ErrorCodeServer.New("Stopped") @@ -549,6 +584,10 @@ func getVotesByHeight(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{} return nil, jsonrpc.ErrorCodeServer.Wrap(err, debug) } + if err := checkBaseHeight(chain, height); err != nil { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } + cs := chain.Consensus() votes, err := cs.GetVotesByHeight(height) @@ -592,6 +631,9 @@ func getProofForResult(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{ } else if err != nil { return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) } + if err := checkBaseHeight(chain, block.Height()); err != nil { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } blockResult := block.Result() receiptList, err := sm.ReceiptListFromResult(blockResult, module.TransactionGroupNormal) @@ -642,6 +684,9 @@ func getProofForEvents(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{ } else if err != nil { return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) } + if err := checkBaseHeight(chain, block.Height()); err != nil { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } blockResult := block.Result() receiptList, err := sm.ReceiptListFromResult(blockResult, module.TransactionGroupNormal) @@ -752,7 +797,7 @@ func sendTransactionAndWait(ctx *jsonrpc.Context, params *jsonrpc.Params) (inter return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) } - return waitTransactionResultOnChannel(ctx, bm, hash, debug, timeout, maxLimit, fc) + return waitTransactionResultOnChannel(ctx, chain, bm, hash, debug, timeout, maxLimit, fc) } func waitTransactionResult(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{}, error) { @@ -796,13 +841,10 @@ func waitTransactionResult(ctx *jsonrpc.Context, params *jsonrpc.Params) (interf return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) } - return waitTransactionResultOnChannel(ctx, bm, hash, debug, timeout, maxLimit, fc) + return waitTransactionResultOnChannel(ctx, chain, bm, hash, debug, timeout, maxLimit, fc) } -func waitTransactionResultOnChannel(ctx *jsonrpc.Context, bm module.BlockManager, - id []byte, debug bool, timeout time.Duration, maxLimit bool, - fc <-chan interface{}, -) (interface{}, error) { +func waitTransactionResultOnChannel(ctx *jsonrpc.Context, chain module.Chain, bm module.BlockManager, id []byte, debug bool, timeout time.Duration, maxLimit bool, fc <-chan interface{}) (interface{}, error) { tc := time.After(timeout) var err error @@ -843,12 +885,15 @@ func waitTransactionResultOnChannel(ctx *jsonrpc.Context, bm module.BlockManager return nil, nil } + blk := txInfo.Block() + if err := checkBaseHeight(chain, blk.Height()); err != nil { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } res, err := receipt.ToJSON(module.JSONVersion3) if err != nil { return nil, jsonrpc.ErrorCodeSystem.Wrap(err, debug) } result := res.(map[string]interface{}) - blk := txInfo.Block() result["blockHash"] = "0x" + hex.EncodeToString(blk.ID()) result["blockHeight"] = "0x" + strconv.FormatInt(int64(blk.Height()), 16) result["txIndex"] = "0x" + strconv.FormatInt(int64(txInfo.Index()), 16) @@ -956,6 +1001,9 @@ func getTrace(ctx *jsonrpc.Context, params *jsonrpc.Params) (interface{}, error) } blk := txInfo.Block() + if err := checkBaseHeight(chain, blk.Height()); err != nil { + return nil, jsonrpc.ErrorCodeNotFound.Wrap(err, debug) + } _, err = txInfo.GetReceipt() if block.ResultNotFinalizedError.Equals(err) { return nil, jsonrpc.ErrorCodeExecuting.New("Executing") From f277fcb595cd6e8949c3af226b3011e72898f7bb Mon Sep 17 00:00:00 2001 From: Hyunil Jung Date: Tue, 12 Jul 2022 14:56:52 +0900 Subject: [PATCH 10/27] Feature reset with height and hash --- block/blockdatafactory.go | 57 ++++++++ block/manager.go | 24 ++-- block/unsafefinalize.go | 107 +++++++++++++++ chain/chain.go | 11 +- chain/taskpruning.go | 2 +- chain/taskreset.go | 223 ++++++++++++++++++++++++++++++- cmd/cli/admin.go | 17 ++- cmd/gochain/main.go | 22 +++ consensus/consensus.go | 4 +- consensus/fastsync/blockfetch.go | 76 +++++++++++ consensus/fastsync/client.go | 4 +- consensus/fastsync/manager.go | 29 +++- module/block.go | 6 +- module/chain.go | 8 +- node/node.go | 6 +- node/rest.go | 15 ++- service/manager.go | 4 +- test/chain.go | 2 +- 18 files changed, 579 insertions(+), 38 deletions(-) create mode 100644 block/blockdatafactory.go create mode 100644 block/unsafefinalize.go create mode 100644 consensus/fastsync/blockfetch.go diff --git a/block/blockdatafactory.go b/block/blockdatafactory.go new file mode 100644 index 000000000..37469bf70 --- /dev/null +++ b/block/blockdatafactory.go @@ -0,0 +1,57 @@ +/* + * Copyright 2022 ICON Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package block + +import ( + "io" + + "github.com/icon-project/goloop/chain/base" + "github.com/icon-project/goloop/common/errors" + "github.com/icon-project/goloop/module" +) + +type blockDataFactory struct { + sm module.ServiceManager + handlers handlerList +} + +func NewBlockDataFactory( + c module.Chain, + sm module.ServiceManager, + handlers []base.BlockHandler, +) (module.BlockDataFactory, error) { + if handlers == nil { + handlers = []base.BlockHandler{NewBlockV2Handler(c)} + } + + return &blockDataFactory{ + sm: sm, + handlers: handlers, + }, nil +} + +func (f *blockDataFactory) NewBlockDataFromReader(r io.Reader) (module.BlockData, error) { + v, r, err := PeekVersion(r) + if err != nil { + return nil, err + } + h, ok := f.handlers.forVersion(v) + if !ok { + return nil, errors.UnsupportedError.Errorf("unsupported block version %d", v) + } + return h.NewBlockDataFromReader(r) +} diff --git a/block/manager.go b/block/manager.go index 99eed3f4f..444ba3b46 100644 --- a/block/manager.go +++ b/block/manager.go @@ -1303,7 +1303,7 @@ func (m *manager) getTransactionLocator(id []byte) (*transactionLocator, error) loc := new(transactionLocator) err = tlb.Get(db.Raw(id), loc) if err != nil { - return nil, errors.ErrNotFound + return nil, errors.NotFoundError.Errorf("not found tx id=%x", id) } return loc, nil } @@ -1515,14 +1515,15 @@ func hasBits(v int, bits int) bool { return (v & bits) == bits } -func (m *manager) ExportGenesis(blk module.Block, gsw module.GenesisStorageWriter) error { +func (m *manager) ExportGenesis(blk module.Block, votes module.CommitVoteSet, gsw module.GenesisStorageWriter) error { height := blk.Height() - var votes module.CommitVoteSet - if nblk, err := m.GetBlockByHeight(height + 1); err != nil { - return errors.Wrapf(err, "fail to get next block(height=%d) for votes", height+1) - } else { - votes = nblk.Votes() + if votes == nil { + if nblk, err := m.GetBlockByHeight(height + 1); err != nil { + return errors.Wrapf(err, "fail to get next block(height=%d) for votes", height+1) + } else { + votes = nblk.Votes() + } } cid, err := m.sm.GetChainID(blk.Result()) @@ -1829,10 +1830,17 @@ func GetLastHeightOf(dbase db.Database) int64 { } func ResetDB(d db.Database, c codec.Codec, height int64) error { - bk, err := d.GetBucket(db.ChainProperty) + return SetLastHeight(d, c, height) +} + +func SetLastHeight(dbase db.Database, c codec.Codec, height int64) error { + bk, err := dbase.GetBucket(db.ChainProperty) if err != nil { return err } + if c == nil { + c = dbCodec + } err = bk.Set([]byte(keyLastBlockHeight), c.MustMarshalToBytes(height)) if err != nil { return err diff --git a/block/unsafefinalize.go b/block/unsafefinalize.go new file mode 100644 index 000000000..a06fdd8a7 --- /dev/null +++ b/block/unsafefinalize.go @@ -0,0 +1,107 @@ +/* + * Copyright 2022 ICON Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package block + +import ( + "github.com/icon-project/goloop/chain/base" + "github.com/icon-project/goloop/common" + "github.com/icon-project/goloop/common/db" + "github.com/icon-project/goloop/common/errors" + "github.com/icon-project/goloop/common/log" + "github.com/icon-project/goloop/module" + "github.com/icon-project/goloop/service" +) + +type finalizeRequest struct { + sm ServiceManager + syncTr module.Transition + dbase db.Database + resCh chan error + cancelCh <-chan struct{} +} + +func (r *finalizeRequest) finalize(blk module.BlockData) error { + ntr, err := r.sm.CreateTransition(r.syncTr, blk.NormalTransactions(), blk, nil, true) + if err != nil { + return nil + } + if err = service.FinalizeTransition(ntr, module.FinalizeNormalTransaction, false); err != nil { + return err + } + if err = service.FinalizeTransition(r.syncTr, module.FinalizePatchTransaction|module.FinalizeResult, false); err != nil { + return err + } + + if err = blk.(base.BlockVersionSpec).FinalizeHeader(r.dbase); err != nil { + return err + } + if err = WriteTransactionLocators(r.dbase, blk.Height(), blk.PatchTransactions(), blk.NormalTransactions()); err != nil { + return err + } + return nil +} + +func (r *finalizeRequest) OnValidate(t module.Transition, err error) { + if err != nil { + log.Warnf("unexpected error during forced finalization: %+v", err) + r.resCh <- err + } +} + +func (r *finalizeRequest) OnExecute(t module.Transition, err error) { + r.resCh <- err +} + +func UnsafeFinalize( + sm ServiceManager, + c module.Chain, + blk module.BlockData, + cancelCh <-chan struct{}, +) error { + initTr, err := sm.CreateInitialTransition(nil, nil) + if err != nil { + return err + } + bi := common.NewBlockInfo(blk.Height()-1, blk.Timestamp()-1) + tr, err := sm.CreateTransition(initTr, nil, bi, nil, true) + if err != nil { + return err + } + tr = sm.PatchTransition(tr, blk.PatchTransactions(), blk) + syncTr := sm.CreateSyncTransition(tr, blk.Result(), blk.NextValidatorsHash()) + r := &finalizeRequest{ + sm: sm, + syncTr: syncTr, + dbase: c.Database(), + resCh: make(chan error, 2), + cancelCh: cancelCh, + } + canceler, err := syncTr.Execute(r) + if err != nil { + return err + } + select { + case err := <-r.resCh: + if err != nil { + return err + } + return r.finalize(blk) + case <-r.cancelCh: + canceler() + return errors.Errorf("sync canceled height=%d hash=%x", blk.Height(), blk.Height()) + } +} diff --git a/chain/chain.go b/chain/chain.go index 900b14820..98128d3d4 100644 --- a/chain/chain.go +++ b/chain/chain.go @@ -507,7 +507,7 @@ func (c *singleChain) _runTask(task chainTask, wait bool) error { func (c *singleChain) _waitResultOf(task chainTask) error { result := task.Wait() - c.logger.Infof("DONE %s err=%v", task.String(), result) + c.logger.Infof("DONE %s err=%+v", task.String(), result) if result == nil { c._transitOrTerminate(Finished, nil, Started, Stopping) @@ -639,8 +639,13 @@ func (c *singleChain) Verify() error { return errors.UnsupportedError.New("UnsupportedFeatureVerify") } -func (c *singleChain) Reset() error { - task := newTaskReset(c) +func (c *singleChain) Reset(gs string, height int64, blockHash []byte) error { + if len(gs) == 0 { + chainDir := c.cfg.AbsBaseDir() + const chainGenesisZipFileName = "genesis.zip" + gs = path.Join(chainDir, chainGenesisZipFileName) + } + task := newTaskReset(c, gs, height, blockHash) return c._runTask(task, false) } diff --git a/chain/taskpruning.go b/chain/taskpruning.go index ffa9e65fe..2d458dbbb 100644 --- a/chain/taskpruning.go +++ b/chain/taskpruning.go @@ -118,7 +118,7 @@ func (t *taskPruning) _exportGenesis(blk module.Block, votes module.CommitVoteSe os.Remove(gsfile) } }() - if err := t.chain.bm.ExportGenesis(blk, gsw); err != nil { + if err := t.chain.bm.ExportGenesis(blk, votes, gsw); err != nil { return errors.Wrap(err, "fail on exporting genesis storage") } return nil diff --git a/chain/taskreset.go b/chain/taskreset.go index 5e7959aac..5767b336f 100644 --- a/chain/taskreset.go +++ b/chain/taskreset.go @@ -19,11 +19,24 @@ package chain import ( "os" "path" + + "github.com/icon-project/goloop/block" + "github.com/icon-project/goloop/chain/gs" + "github.com/icon-project/goloop/common/crypto" + "github.com/icon-project/goloop/common/errors" + "github.com/icon-project/goloop/consensus/fastsync" + "github.com/icon-project/goloop/module" + "github.com/icon-project/goloop/network" + "github.com/icon-project/goloop/service" ) type taskReset struct { - chain *singleChain - result resultStore + chain *singleChain + result resultStore + gsfile string + height int64 + blockHash []byte + cancelCh chan struct{} } var resetStates = map[State]string{ @@ -47,6 +60,18 @@ func (t *taskReset) DetailOf(s State) string { } func (t *taskReset) Start() error { + if t.height < 0 || t.height == 1 { + return errors.IllegalArgumentError.Errorf("InvalidHeight(height=%d)", t.height) + } + if len(t.blockHash) != 0 && len(t.blockHash) != crypto.HashLen { + return errors.IllegalArgumentError.Errorf("InvalidBlockHash(hash=%#x len=%d)", t.blockHash, len(t.blockHash)) + } + if t.height == 0 && len(t.blockHash) == crypto.HashLen { + return errors.IllegalArgumentError.Errorf("BlockHashForZeroHeight") + } + if t.height != 0 && len(t.blockHash) == 0 { + return errors.IllegalArgumentError.Errorf("NoBlockHash(height=%d)", t.height) + } go t.doReset() return nil } @@ -56,7 +81,7 @@ func (t *taskReset) doReset() { t.result.SetValue(err) } -func (t *taskReset) _reset() error { +func (t *taskReset) _cleanUp() error { c := t.chain chainDir := c.cfg.AbsBaseDir() @@ -81,20 +106,206 @@ func (t *taskReset) _reset() error { if err := os.RemoveAll(ContractDir); err != nil { return err } + return nil +} + +func (t *taskReset) _fetchBlock(fsm fastsync.Manager, h int64, hash []byte) (module.BlockData, module.CommitVoteSet, error) { + blk, voteBytes, err := fastsync.FetchBlockByHeightAndHash(fsm, h, hash, t.cancelCh) + if err != nil { + return nil, nil, err + } + votes := t.chain.CommitVoteSetDecoder()(voteBytes) + return blk, votes, nil +} + +func (t *taskReset) _prepareBlocks() (module.BlockData, module.CommitVoteSet, error) { + c := t.chain + defer c.releaseManagers() + + chainDir := c.cfg.AbsBaseDir() + + pr := network.PeerRoleFlag(c.cfg.Role) + c.nm = network.NewManager(c, c.nt, c.cfg.SeedAddr, pr.ToRoles()...) + + ContractDir := path.Join(chainDir, DefaultContractDir) + var err error + c.sm, err = service.NewManager(c, c.nm, c.pm, c.plt, ContractDir) + if err != nil { + return nil, nil, err + } + bdf, err := block.NewBlockDataFactory(c, c.sm, nil) + if err != nil { + return nil, nil, err + } + fsm, err := fastsync.NewManagerOnlyForClient(c.nm, bdf, c.logger) + if err != nil { + return nil, nil, err + } + + c.sm.Start() + + if err = c.nm.Start(); err != nil { + return nil, nil, err + } + + blk, votes, err := t._fetchBlock(fsm, t.height, t.blockHash) + if err != nil { + return nil, nil, err + } + pBlk, _, err := t._fetchBlock(fsm, t.height-1, blk.PrevID()) + if err != nil { + return nil, nil, err + } + ppBlk, _, err := t._fetchBlock(fsm, t.height-2, pBlk.PrevID()) + if err != nil { + return nil, nil, err + } + + if err = block.UnsafeFinalize(c.sm, c, ppBlk, t.cancelCh); err != nil { + return nil, nil, err + } + if err = block.UnsafeFinalize(c.sm, c, pBlk, t.cancelCh); err != nil { + return nil, nil, err + } + if err = block.UnsafeFinalize(c.sm, c, blk, t.cancelCh); err != nil { + return nil, nil, err + } + + if err = block.SetLastHeight(c.Database(), nil, t.height); err != nil { + return nil, nil, err + } + + return blk, votes, nil +} + +func (t *taskReset) _exportGenesis(blk module.Block, votes module.CommitVoteSet, gsfile string) (rerr error) { + _ = os.RemoveAll(gsfile) + fd, err := os.OpenFile(gsfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL|os.O_TRUNC, 0700) + if err != nil { + return err + } + gsw := gs.NewGenesisStorageWriter(fd) + defer func() { + _ = gsw.Close() + _ = fd.Close() + if rerr != nil { + _ = os.Remove(gsfile) + } + }() + if err := t.chain.bm.ExportGenesis(blk, votes, gsw); err != nil { + return errors.Wrap(err, "fail on exporting genesis storage") + } + return nil +} + +func (t *taskReset) _makePrunedGenesis(blkData module.BlockData, votes module.CommitVoteSet) (err error) { + c := t.chain + if err := c.prepareManagers(); err != nil { + return err + } + defer c.releaseManagers() + + blk, err := t.chain.bm.GetBlockByHeight(blkData.Height()) + if err != nil { + return err + } + if cid, err := c.sm.GetChainID(blk.Result()); err != nil { + return errors.InvalidStateError.New("No ChainID is recorded (require Revision 8)") + } else { + if cid != int64(c.CID()) { + return errors.InvalidStateError.Errorf("Invalid chain ID real=%d exp=%d", cid, c.CID()) + } + } + + gsTmp := t.gsfile + ".tmp" + if err := t._exportGenesis(blk, votes, gsTmp); err != nil { + return err + } + defer func() { + if err != nil { + _ = os.Remove(gsTmp) + } + }() + + _, err = os.Stat(t.gsfile) + if err == nil { + gsbk := t.gsfile + ".bk" + _ = os.RemoveAll(gsbk) + if err := os.Rename(t.gsfile, gsbk); err != nil { + return errors.UnknownError.Wrapf(err, "fail on backup %s to %s", + t.gsfile, gsbk) + } + defer func() { + if err != nil { + _ = os.RemoveAll(t.gsfile) + _ = os.Rename(gsbk, t.gsfile) + } else { + _ = os.RemoveAll(gsbk) + } + }() + } else if !errors.Is(err, os.ErrNotExist) { + return errors.UnknownError.Wrapf(err, "cannot stat %s", t.gsfile) + } + if err := os.Rename(gsTmp, t.gsfile); err != nil { + return errors.UnknownError.Errorf("fail to rename %s to %s", + gsTmp, t.gsfile) + } + + // replace genesis + fd, err := os.Open(t.gsfile) + if err != nil { + return errors.UnknownError.Wrapf(err, "fail to open file=%s", t.gsfile) + } + g, err := gs.NewFromFile(fd) + if err != nil { + return errors.UnknownError.Wrapf(err, "fail to parse gs=%s", t.gsfile) + } + + c.cfg.GenesisStorage = g + c.cfg.Genesis = g.Genesis() return nil } +func (t *taskReset) _reset() (ret error) { + err := t._cleanUp() + if err != nil { + return err + } + if t.height == 0 { + return nil + } + + // do clean up again on failure + defer func() { + if ret != nil { + _ = t._cleanUp() + } + }() + blk, votes, err := t._prepareBlocks() + if err != nil { + return err + } + return t._makePrunedGenesis(blk, votes) +} + func (t *taskReset) Stop() { - // do nothing (it's hard to stop ) + select { + case t.cancelCh <- struct{}{}: + default: + } } func (t *taskReset) Wait() error { return t.result.Wait() } -func newTaskReset(chain *singleChain) chainTask { +func newTaskReset(chain *singleChain, gsfile string, height int64, blockHash []byte) chainTask { return &taskReset{ - chain: chain, + chain: chain, + gsfile: gsfile, + height: height, + blockHash: blockHash, + cancelCh: make(chan struct{}), } } diff --git a/cmd/cli/admin.go b/cmd/cli/admin.go index 91c920463..bc475aeec 100644 --- a/cmd/cli/admin.go +++ b/cmd/cli/admin.go @@ -283,12 +283,6 @@ func NewChainCmd(parentCmd *cobra.Command, parentVc *viper.Viper) (*cobra.Comman Args: ArgsWithDefaultErrorFunc(cobra.ExactArgs(1)), RunE: opFunc("stop"), }, - &cobra.Command{ - Use: "reset CID", - Short: "Chain data reset", - Args: ArgsWithDefaultErrorFunc(cobra.ExactArgs(1)), - RunE: opFunc("reset"), - }, &cobra.Command{ Use: "verify CID", Short: "Chain data verify", @@ -296,6 +290,17 @@ func NewChainCmd(parentCmd *cobra.Command, parentVc *viper.Viper) (*cobra.Comman RunE: opFunc("verify"), }) + resetCmd := &cobra.Command{ + Use: "reset CID", + Short: "Chain data reset", + Args: ArgsWithDefaultErrorFunc(cobra.ExactArgs(1)), + RunE: opFunc("reset"), + } + rootCmd.AddCommand(resetCmd) + resetFlags := resetCmd.Flags() + resetFlags.Int64("height", 0, "Block Height") + resetFlags.String("block_hash", "", "Hash of the block at the given height") + importCmd := &cobra.Command{ Use: "import CID", Short: "Start to import legacy database", diff --git a/cmd/gochain/main.go b/cmd/gochain/main.go index b4f49928a..5714bcd0c 100644 --- a/cmd/gochain/main.go +++ b/cmd/gochain/main.go @@ -159,6 +159,16 @@ func main() { flag.Int64Var(&importMaxHeight, "import_max_height", 0, "Import max height") flag.StringVar(&importDataSource, "import_data_source", "datasource/", "Import data source") + resetCmd := &cobra.Command{ + Use: "reset CID", + Short: "Chain data reset", + Run: Execute, + } + resetFlags := resetCmd.Flags() + resetFlags.Int64("height", 0, "Block Height") + resetFlags.String("block_hash", "", "Hash of the block at the given height") + cmd.AddCommand(resetCmd) + cmd.Run = Execute cmd.Execute() } @@ -484,6 +494,18 @@ func Execute(cmd *cobra.Command, args []string) { if err != nil { log.Panicf("FAIL to import Chain err=%+v", err) } + } else if cmd.Name() == "reset" { + flags := cmd.Flags() + height, _ := flags.GetInt64("height") + blockHashStr, _ := flags.GetString("block_hash") + blockHash, err := hex.DecodeString(blockHashStr) + if err != nil { + log.Panicf("FAIL to decode blockHash hash=%s err=%+v", blockHashStr, err) + } + err = c.Reset("", height, blockHash) + if err != nil { + log.Panicf("FAIL to reset Chain err=%+v", err) + } } else { err = c.Start() if err != nil { diff --git a/consensus/consensus.go b/consensus/consensus.go index 12b7e0af2..d9a819290 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -1757,7 +1757,7 @@ func (cs *consensus) getVotesByHeight(height int64) (module.CommitVoteSet, error return nil, err } if c.commitVotes == nil { - return nil, errors.ErrNotFound + return nil, errors.NotFoundError.Errorf("not found vote height=%d", height) } return c.commitVotes, nil } @@ -1770,7 +1770,7 @@ func (cs *consensus) GetVotesByHeight(height int64) (module.CommitVoteSet, error func (cs *consensus) getCommit(h int64) (*commit, error) { if h > cs.height || (h == cs.height && cs.step < stepCommit) { - return nil, errors.ErrNotFound + return nil, errors.NotFoundError.Errorf("not found commit height=%d", h) } c := cs.commitCache.GetByHeight(h) diff --git a/consensus/fastsync/blockfetch.go b/consensus/fastsync/blockfetch.go new file mode 100644 index 000000000..ef14ad118 --- /dev/null +++ b/consensus/fastsync/blockfetch.go @@ -0,0 +1,76 @@ +/* + * Copyright 2022 ICON Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fastsync + +import ( + "bytes" + + "github.com/icon-project/goloop/common/errors" + "github.com/icon-project/goloop/module" +) + +type blockByHeightAndHashRequest struct { + fsm Manager + h int64 + hash []byte + brCh chan BlockResult + errCh chan error + cancelCh <-chan struct{} +} + +func (r *blockByHeightAndHashRequest) OnBlock(br BlockResult) { + if bytes.Equal(br.Block().Hash(), r.hash) { + br.Consume() + r.brCh <- br + } else { + br.Reject() + r.errCh <- errors.Errorf("unexpected hash height=%d expHash=%x actHash=%x", r.h, r.hash, br.Block().Hash()) + } +} + +func (r *blockByHeightAndHashRequest) OnEnd(err error) { + if err != nil { + r.errCh <- err + } +} + +func FetchBlockByHeightAndHash( + fsm Manager, h int64, hash []byte, cancelCh <-chan struct{}, +) (module.BlockData, []byte, error) { + r := &blockByHeightAndHashRequest{ + fsm: fsm, + h: h, + hash: hash, + brCh: make(chan BlockResult, 1), + errCh: make(chan error, 2), + cancelCh: cancelCh, + } + canceler, err := fsm.FetchBlocks(h, h, r) + if err != nil { + return nil, nil, err + } + + select { + case br := <-r.brCh: + return br.Block(), br.Votes(), nil + case err := <-r.errCh: + return nil, nil, err + case <-r.cancelCh: + canceler() + return nil, nil, errors.Errorf("blockRequestCanceled height=%d hash=%x", r.h, r.hash) + } +} diff --git a/consensus/fastsync/client.go b/consensus/fastsync/client.go index 058bd72ea..b5e1b2a41 100644 --- a/consensus/fastsync/client.go +++ b/consensus/fastsync/client.go @@ -23,7 +23,7 @@ type client struct { common.Mutex nm module.NetworkManager ph module.ProtocolHandler - bm module.BlockManager + bm module.BlockDataFactory log log.Logger fetchID uint16 @@ -135,7 +135,7 @@ type fetchRequest struct { } func newClient(nm module.NetworkManager, ph module.ProtocolHandler, - bm module.BlockManager, logger log.Logger) *client { + bm module.BlockDataFactory, logger log.Logger) *client { cl := &client{} cl.nm = nm cl.ph = ph diff --git a/consensus/fastsync/manager.go b/consensus/fastsync/manager.go index 17593dc9f..b3e0cbd69 100644 --- a/consensus/fastsync/manager.go +++ b/consensus/fastsync/manager.go @@ -56,7 +56,7 @@ func (m *manager) OnReceive(pi module.ProtocolInfo, b []byte, id module.PeerID) } func (m *manager) OnFailure(err error, pi module.ProtocolInfo, b []byte) { - panic("not implemented") + log.Warnf("OnFailure pi=%d: %+v", pi, err) } func (m *manager) OnJoin(id module.PeerID) { @@ -111,7 +111,7 @@ func NewManager( logger log.Logger, ) (Manager, error) { m := &manager{ - nm : nm, + nm: nm, } m.server = newServer(nm, nil, bm, bpp, logger) m.client = newClient(nm, nil, bm, logger) @@ -130,6 +130,31 @@ func NewManager( return m, nil } +func NewManagerOnlyForClient( + nm module.NetworkManager, + bdf module.BlockDataFactory, + logger log.Logger, +) (Manager, error) { + m := &manager{ + nm: nm, + } + m.server = newServer(nm, nil, nil, nil, logger) + m.client = newClient(nm, nil, bdf, logger) + + // lock to prevent enter server.onJoin / client.onJoin + m.server.Lock() + defer m.server.Unlock() + m.client.Lock() + defer m.client.Unlock() + ph, err := nm.RegisterReactorForStreams("fastsync", module.ProtoFastSync, m, protocols, configFastSyncPriority, module.NotRegisteredProtocolPolicyClose) + if err != nil { + return nil, err + } + m.server.ph = ph + m.client.ph = ph + return m, nil +} + type BlockProofProvider interface { GetBlockProof(h int64, opt int32) (proof []byte, err error) } diff --git a/module/block.go b/module/block.go index aa80c902f..2ad788fb4 100644 --- a/module/block.go +++ b/module/block.go @@ -60,6 +60,10 @@ const ( ImportByForce = 0x1 ) +type BlockDataFactory interface { + NewBlockDataFromReader(r io.Reader) (BlockData, error) +} + type BlockManager interface { GetBlockByHeight(height int64) (Block, error) GetLastBlock() (Block, error) @@ -114,7 +118,7 @@ type BlockManager interface { ExportBlocks(from, to int64, dst db.Database, on func(height int64) error) error // ExportGenesis exports genesis to the writer based on the block. - ExportGenesis(blk Block, writer GenesisStorageWriter) error + ExportGenesis(blk Block, votes CommitVoteSet, writer GenesisStorageWriter) error // GetGenesisVotes returns available votes from genesis storage. // They are available only when it starts from genesis. diff --git a/module/chain.go b/module/chain.go index 1b11efb7a..12378f119 100644 --- a/module/chain.go +++ b/module/chain.go @@ -55,7 +55,13 @@ type Chain interface { IsStarted() bool IsStopped() bool - Reset() error + // Reset resets chain. height must be 0 or greater than 1. + // If height == 0, blockHash shall be nil or zero length + // bytes and the function cleans up database and file systems for the chain. + // If height > 1, blockHash shall be the hash of correct block with the + // height and the function cleans up database and file systems for the chain + // and prepare pruned genesis block of the height. + Reset(gs string, height int64, blockHash []byte) error Verify() error MetricContext() context.Context diff --git a/node/node.go b/node/node.go index 7002a613f..b5213fe47 100644 --- a/node/node.go +++ b/node/node.go @@ -482,7 +482,7 @@ func (n *Node) StopChain(cid int) error { return c.Stop() } -func (n *Node) ResetChain(cid int) error { +func (n *Node) ResetChain(cid int, height int64, blockHash []byte) error { defer n.mtx.RUnlock() n.mtx.RLock() @@ -490,7 +490,9 @@ func (n *Node) ResetChain(cid int) error { if err != nil { return err } - return c.Reset() + chainDir := c.cfg.AbsBaseDir() + gs := path.Join(chainDir, ChainGenesisZipFileName) + return c.Reset(gs, height, blockHash) } func (n *Node) VerifyChain(cid int) error { diff --git a/node/rest.go b/node/rest.go index 421424949..31ca60536 100644 --- a/node/rest.go +++ b/node/rest.go @@ -99,6 +99,12 @@ type ChainConfig struct { ValidateTxOnSend bool `json:"validateTxOnSend,omitempty"` } +type ChainResetParam struct { + DBType string `json:"dbType,omitempty"` + Height int64 `json:"height,omitempty"` + BlockHash common.HexBytes `json:"blockHash,omitempty"` +} + type ChainImportParam struct { DBPath string `json:"dbPath"` Height int64 `json:"height"` @@ -355,7 +361,14 @@ func (r *Rest) StopChain(ctx echo.Context) error { func (r *Rest) ResetChain(ctx echo.Context) error { c := ctx.Get("chain").(*Chain) - if err := r.n.ResetChain(c.CID()); err != nil { + param := &ChainResetParam{} + if err := ctx.Bind(param); err != nil { + return echo.ErrBadRequest + } + if param.Height < 1 { + return echo.ErrBadRequest + } + if err := r.n.ResetChain(c.CID(), param.Height, param.BlockHash); err != nil { return err } return ctx.String(http.StatusOK, "OK") diff --git a/service/manager.go b/service/manager.go index 2faea6454..57d988f7c 100644 --- a/service/manager.go +++ b/service/manager.go @@ -549,7 +549,7 @@ func (m *manager) GetNetworkID(result []byte) (int64, error) { } nidVar := scoredb.NewVarDB(as, state.VarNetwork) if nidVar.Bytes() == nil { - return 0, errors.ErrNotFound + return 0, errors.NotFoundError.New("no network ID") } return nidVar.Int64(), nil } @@ -561,7 +561,7 @@ func (m *manager) GetChainID(result []byte) (int64, error) { } nidVar := scoredb.NewVarDB(as, state.VarChainID) if nidVar.Bytes() == nil { - return 0, errors.ErrNotFound + return 0, errors.NotFoundError.New("no chain ID") } return nidVar.Int64(), nil } diff --git a/test/chain.go b/test/chain.go index 57a93092e..f9bac0914 100644 --- a/test/chain.go +++ b/test/chain.go @@ -191,7 +191,7 @@ func (c *Chain) IsStopped() bool { panic("implement me") } -func (c *Chain) Reset() error { +func (c *Chain) Reset(gs string, height int64, blockHash []byte) error { panic("implement me") } From 97743e7912a071c430af10d423ef2e06d6f2b792 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Tue, 12 Jul 2022 15:18:31 +0900 Subject: [PATCH 11/27] Support noBuffer option for sync transition --- block/transition.go | 2 +- block/unsafefinalize.go | 2 +- chain/imports/servicemanager.go | 4 ++-- module/service.go | 2 +- service/manager.go | 13 ++----------- service/sync/manager.go | 3 ++- service/sync/sync_test.go | 6 +++--- service/sync/syncer.go | 18 ++++++++++++++---- service/transition.go | 5 +++-- test/servicemanager.go | 2 +- 10 files changed, 30 insertions(+), 27 deletions(-) diff --git a/block/transition.go b/block/transition.go index 25f9a4ced..3c5d9c3f7 100644 --- a/block/transition.go +++ b/block/transition.go @@ -265,7 +265,7 @@ func (ti *transitionImpl) propose(bi module.BlockInfo, csi module.ConsensusInfo, } func (ti *transitionImpl) sync(result []byte, vlHash []byte, cb transitionCallback) (*transition, error) { - cmtr := ti._chainContext.sm.CreateSyncTransition(ti._mtransition, result, vlHash) + cmtr := ti._chainContext.sm.CreateSyncTransition(ti._mtransition, result, vlHash, false) if cmtr == nil { return nil, errors.New("fail to createSyncTransition") } diff --git a/block/unsafefinalize.go b/block/unsafefinalize.go index a06fdd8a7..2d98aff3e 100644 --- a/block/unsafefinalize.go +++ b/block/unsafefinalize.go @@ -82,7 +82,7 @@ func UnsafeFinalize( return err } tr = sm.PatchTransition(tr, blk.PatchTransactions(), blk) - syncTr := sm.CreateSyncTransition(tr, blk.Result(), blk.NextValidatorsHash()) + syncTr := sm.CreateSyncTransition(tr, blk.Result(), blk.NextValidatorsHash(), true) r := &finalizeRequest{ sm: sm, syncTr: syncTr, diff --git a/chain/imports/servicemanager.go b/chain/imports/servicemanager.go index 669bec3d3..46cade0ea 100644 --- a/chain/imports/servicemanager.go +++ b/chain/imports/servicemanager.go @@ -176,8 +176,8 @@ func (m *managerForImport) PatchTransition( } } -func (m *managerForImport) CreateSyncTransition(transition module.Transition, result []byte, vlHash []byte) module.Transition { - otr := m.ServiceManager.CreateSyncTransition(unwrap(transition), result, vlHash) +func (m *managerForImport) CreateSyncTransition(transition module.Transition, result []byte, vlHash []byte, noBuffer bool) module.Transition { + otr := m.ServiceManager.CreateSyncTransition(unwrap(transition), result, vlHash, noBuffer) if otr == nil { return nil } diff --git a/module/service.go b/module/service.go index accfbdc3e..f1823ccf9 100644 --- a/module/service.go +++ b/module/service.go @@ -275,7 +275,7 @@ type TransitionManager interface { // bi is the block info of the block that contains the patches, // or nil if the patches are already prevalidated. PatchTransition(transition Transition, patches TransactionList, bi BlockInfo) Transition - CreateSyncTransition(transition Transition, result []byte, vlHash []byte) Transition + CreateSyncTransition(transition Transition, result []byte, vlHash []byte, noBuffer bool) Transition // Finalize finalizes data related to the transition. It usually stores // data to a persistent storage. opt indicates which data are finalized. // It should be called for every transition. diff --git a/service/manager.go b/service/manager.go index 57d988f7c..2172d2dc2 100644 --- a/service/manager.go +++ b/service/manager.go @@ -258,18 +258,9 @@ func (m *manager) PatchTransition(t module.Transition, patchTxList module.Transa return patchTransition(pt, bi, patchTxList) } -func (m *manager) CreateSyncTransition(t module.Transition, result []byte, vlHash []byte) module.Transition { +func (m *manager) CreateSyncTransition(t module.Transition, result []byte, vlHash []byte, noBuffer bool) module.Transition { m.log.Debugf("CreateSyncTransition result(%#x), vlHash(%#x)\n", result, vlHash) - tr, ok := t.(*transition) - if !ok { - m.log.Panicf("Illegal transition for CreateSyncTransition type=%T", t) - return nil - } - ntr := newTransition(tr.parent, tr.patchTransactions, tr.normalTransactions, tr.bi, tr.csi, true) - r, _ := newTransitionResultFromBytes(result) - ntr.syncer = m.syncer.NewSyncer(r.StateHash, - r.PatchReceiptHash, r.NormalReceiptHash, vlHash, r.ExtensionData) - return ntr + return NewSyncTransition(t, m.syncer, result, vlHash, noBuffer) } // Finalize finalizes data related to the transition. It usually stores diff --git a/service/sync/manager.go b/service/sync/manager.go index 0580a22a5..9a7c4fcb4 100644 --- a/service/sync/manager.go +++ b/service/sync/manager.go @@ -127,10 +127,11 @@ func (m *Manager) AddRequest(id db.BucketID, key []byte) error { return m.ds.AddRequest(id, key) } -func (m *Manager) NewSyncer(ah, prh, nrh, vh, ed []byte) Syncer { +func (m *Manager) NewSyncer(ah, prh, nrh, vh, ed []byte, noBuffer bool) Syncer { return newSyncer( m.db, m.client, m.pool, m.plt, ah, prh, nrh, vh, ed, m.log, + noBuffer, m.SetSyncHandler) } diff --git a/service/sync/sync_test.go b/service/sync/sync_test.go index b092422e8..0bdd16433 100644 --- a/service/sync/sync_test.go +++ b/service/sync/sync_test.go @@ -223,7 +223,7 @@ func TestSync_SimpleAccountSync(t *testing.T) { ws.GetSnapshot().Flush() vh := ws.GetValidatorState().GetSnapshot().Hash() - syncer1 := syncm2.NewSyncer(acHash, nil, nil, vh, nil) + syncer1 := syncm2.NewSyncer(acHash, nil, nil, vh, nil, false) r, _ := syncer1.ForceSync() logger.Printf("END\n") @@ -350,7 +350,7 @@ func TestSync_AccountSync(t *testing.T) { for i := 0; i < cSyncPeers; i++ { func(index int) { syncM[cPeers-cSyncPeers+index]. - NewSyncer(prevHash, nil, nil, nil, nil). + NewSyncer(prevHash, nil, nil, nil, nil, false). ForceSync() log.Printf("Finish (%d)\n", index) }(i) @@ -412,7 +412,7 @@ func testReceiptSyncByRev(t *testing.T, rev module.Revision) { nHash := normalReceiptsList.Hash() normalReceiptsList.Flush() - syncer := syncm2.NewSyncer(nil, pHash, nHash, nil, nil) + syncer := syncm2.NewSyncer(nil, pHash, nHash, nil, nil, false) syncer.ForceSync() syncer.Finalize() diff --git a/service/sync/syncer.go b/service/sync/syncer.go index d46dd4e7f..47d7eb95b 100644 --- a/service/sync/syncer.go +++ b/service/sync/syncer.go @@ -75,6 +75,7 @@ type syncer struct { client *client database db.Database plt Platform + noBuffer bool pool *peerPool vpool *peerPool @@ -470,6 +471,14 @@ func (s *syncer) onResult(status errCode, p *peer) { } } +func (s *syncer) newMerkleBuilder() merkle.Builder { + if s.noBuffer { + return merkle.NewBuilderWithRawDatabase(s.database) + } else { + return merkle.NewBuilder(s.database) + } +} + func (s *syncer) ForceSync() (*Result, error) { s.log.Debugln("ForceSync") startTime := time.Now() @@ -496,7 +505,7 @@ func (s *syncer) ForceSync() (*Result, error) { var ess state.ExtensionSnapshot if len(s.ed) > 0 { - eb := merkle.NewBuilder(s.database) + eb := s.newMerkleBuilder() s.builder[syncExtensionState.toIndex()] = eb s.reqValue[syncExtensionState.toIndex()] = make(map[string]bool) ess = s.plt.NewExtensionWithBuilder(eb, s.ed) @@ -505,7 +514,7 @@ func (s *syncer) ForceSync() (*Result, error) { s.Complete(syncExtensionState) } - builder := merkle.NewBuilder(s.database) + builder := s.newMerkleBuilder() s.builder[syncWorldState.toIndex()] = builder s.reqValue[syncWorldState.toIndex()] = make(map[string]bool) if wss, err := state.NewWorldSnapshotWithBuilder(builder, s.ah, s.vlh, ess); err == nil { @@ -517,7 +526,7 @@ func (s *syncer) ForceSync() (*Result, error) { rf := func(t syncType, rl *module.ReceiptList, rh []byte) { if len(rh) != 0 { - builder := merkle.NewBuilder(s.database) + builder := s.newMerkleBuilder() s.builder[t.toIndex()] = builder s.reqValue[t.toIndex()] = make(map[string]bool) *rl = txresult.NewReceiptListWithBuilder(builder, rh) @@ -561,7 +570,7 @@ func (s *syncer) Finalize() error { func newSyncer(database db.Database, c *client, p *peerPool, plt Platform, accountsHash, pReceiptsHash, nReceiptsHash, validatorListHash, extensionData []byte, - log log.Logger, cb func(syncer SyncerImpl, syncing bool)) *syncer { + log log.Logger, noBuffer bool, cb func(syncer SyncerImpl, syncing bool)) *syncer { log.Debugf("newSyncer ah(%#x), prh(%#x), nrh(%#x), vlh(%#x), ed(%#x)", accountsHash, pReceiptsHash, nReceiptsHash, validatorListHash, extensionData) @@ -570,6 +579,7 @@ func newSyncer(database db.Database, c *client, p *peerPool, plt Platform, pool: p, client: c, plt: plt, + noBuffer: noBuffer, vpool: newPeerPool(), ivpool: newPeerPool(), sentReq: make(map[module.PeerID]*peer), diff --git a/service/transition.go b/service/transition.go index 0dcb005cb..2bc10aae1 100644 --- a/service/transition.go +++ b/service/transition.go @@ -893,18 +893,19 @@ func FinalizeTransition(tr module.Transition, opt int, noFlush bool) error { } type SyncManager interface { - NewSyncer(ah, prh, nrh, vh, ed []byte) ssync.Syncer + NewSyncer(ah, prh, nrh, vh, ed []byte, noBuffer bool) ssync.Syncer } func NewSyncTransition( tr module.Transition, sm SyncManager, result []byte, vl []byte, + noBuffer bool, ) module.Transition { tst := tr.(*transition) ntr := newTransition(tst.parent, tst.patchTransactions, tst.normalTransactions, tst.bi, tst.csi, true) r, _ := newTransitionResultFromBytes(result) - ntr.syncer = sm.NewSyncer(r.StateHash, r.PatchReceiptHash, r.NormalReceiptHash, vl, r.ExtensionData) + ntr.syncer = sm.NewSyncer(r.StateHash, r.PatchReceiptHash, r.NormalReceiptHash, vl, r.ExtensionData, noBuffer) return ntr } diff --git a/test/servicemanager.go b/test/servicemanager.go index 48208b675..4af3b0eac 100644 --- a/test/servicemanager.go +++ b/test/servicemanager.go @@ -116,7 +116,7 @@ func (sm *ServiceManager) PatchTransition(transition module.Transition, patches return transition } -func (sm *ServiceManager) CreateSyncTransition(transition module.Transition, result []byte, vlHash []byte) module.Transition { +func (sm *ServiceManager) CreateSyncTransition(transition module.Transition, result []byte, vlHash []byte, noBuffer bool) module.Transition { panic("implement me") } From 72aac1eadece48c1bc5a918c85baf70c4c64a254 Mon Sep 17 00:00:00 2001 From: Jeongseo Park Date: Tue, 12 Jul 2022 17:34:47 +0900 Subject: [PATCH 12/27] Support p2p connection between different protocol sets with 'other' connection type --- network/channelnegotiator.go | 27 +++++++++++++++------------ network/p2p.go | 33 +++++++++++++++++++++------------ network/protocolinfos.go | 12 ++++++++++++ 3 files changed, 48 insertions(+), 24 deletions(-) diff --git a/network/channelnegotiator.go b/network/channelnegotiator.go index 13b64b500..159b0602e 100644 --- a/network/channelnegotiator.go +++ b/network/channelnegotiator.go @@ -114,26 +114,33 @@ func (cn *ChannelNegotiator) ProtocolInfos(channel string) *ProtocolInfos { return cn.m[channel] } -func (cn *ChannelNegotiator) resolveProtocols(p *Peer, channel string, protocols []module.ProtocolInfo) (*ProtocolInfos, error) { +func (cn *ChannelNegotiator) resolveProtocols(p *Peer, channel string, protocols []module.ProtocolInfo) error { if p.Channel() != channel { - return nil, errors.Errorf("invalid channel") + return errors.Errorf("invalid channel") } pis := cn.ProtocolInfos(channel) if pis == nil { - return nil, errors.Errorf("not exists channel") + return errors.Errorf("not exists channel") } rpis := newProtocolInfos() if len(protocols) == 0 { protocols = defaultProtocols + p.PutAttr(AttrP2PLegacy, true) } rpis.Set(protocols) rpis.Resolve(pis) - if rpis.LenOfIDSet() < pis.LenOfIDSet() { - return nil, errors.Errorf("not supported protocols exists") + + if !rpis.ExistsByID(module.ProtoP2P) { + return errors.Errorf("not support p2p protocol") + } + if pis.ExistsByID(defaultProtocols...) { + p.PutAttr(AttrSupportDefaultProtocols, rpis.ExistsByID(defaultProtocols...)) + cn.logger.Debugln("support defaultProtocols :", rpis.ExistsByID(defaultProtocols...)) } - return rpis, nil + p.setProtocolInfos(rpis) + return nil } func (cn *ChannelNegotiator) sendJoinRequest(p *Peer) { @@ -160,14 +167,12 @@ func (cn *ChannelNegotiator) handleJoinRequest(pkt *Packet, p *Peer) { } cn.logger.Traceln("handleJoinRequest", rm, p) - pis, err := cn.resolveProtocols(p, rm.Channel, rm.Protocols) - if err != nil { + if err := cn.resolveProtocols(p, rm.Channel, rm.Protocols); err != nil { err = fmt.Errorf("handleJoinRequest error[%v]", err.Error()) cn.logger.Infoln("handleJoinRequest", p.ConnString(), "ChannelNegotiatorError", err) p.CloseByError(err) return } - p.setProtocolInfos(pis) p.setNetAddress(rm.Addr) m := &JoinResponse{Channel: p.Channel(), Addr: cn.netAddress, Protocols: p.ProtocolInfos().Array()} @@ -187,14 +192,12 @@ func (cn *ChannelNegotiator) handleJoinResponse(pkt *Packet, p *Peer) { } cn.logger.Traceln("handleJoinResponse", rm, p) - pis, err := cn.resolveProtocols(p, rm.Channel, rm.Protocols) - if err != nil { + if err := cn.resolveProtocols(p, rm.Channel, rm.Protocols); err != nil { err = fmt.Errorf("handleJoinResponse error[%v]", err.Error()) cn.logger.Infoln("handleJoinResponse", p.ConnString(), "ChannelNegotiatorError", err) p.CloseByError(err) return } - p.setProtocolInfos(pis) p.setNetAddress(rm.Addr) cn.nextOnPeer(p) diff --git a/network/p2p.go b/network/p2p.go index 265a7ec9b..bda31ccc4 100644 --- a/network/p2p.go +++ b/network/p2p.go @@ -55,6 +55,8 @@ const ( DefaultDuplicatedPeerTime = 1 * time.Second DefaultMaxRetryClose = 10 AttrP2PConnectionRequest = "P2PConnectionRequest" + AttrP2PLegacy = "P2PLegacy" + AttrSupportDefaultProtocols = "SupportDefaultProtocols" DefaultQueryElementLength = 200 ) @@ -91,7 +93,7 @@ type PeerToPeer struct { uncles *PeerSet nephews *PeerSet friends *PeerSet //Only for root, parents and uncles is empty - others *PeerSet //Only for root, assume peer is root + others *PeerSet //Ambiguous connection, different states or different protocol set orphanages *PeerSet //Not joined transiting *PeerSet reject *PeerSet @@ -1451,12 +1453,6 @@ Loop: p2p.tryTransitPeerConnection(p, p2pConnTypeNone) } } - if p2p.others.Len() > 0 { - ps := p2p.others.Array() - for _, p := range ps { - p2p.tryTransitPeerConnection(p, p2pConnTypeNone) - } - } complete := p2p.discoverParents(rr) if complete { @@ -1817,6 +1813,9 @@ func (p2p *PeerToPeer) tryTransitPeerConnection(p *Peer, connType PeerConnection p2p.sendP2PConnectionRequest(p2pConnTypeNone, p) return true default: + if p.EqualsAttr(AttrSupportDefaultProtocols, false) { + return false + } if !p2p.reject.Contains(p) && !p2p.transiting.Contains(p) { p.PutAttr(AttrP2PConnectionRequest, connType) p2p.transiting.Add(p) @@ -1948,6 +1947,10 @@ func (p2p *PeerToPeer) handleP2PConnectionRequest(pkt *Packet, p *Peer) { } else if invalidReq { p2p.logger.Infoln("handleP2PConnectionRequest", "invalid reqConnType", req.ConnType, "from", p.ID(), p.ConnType()) } else { + if rc != p2pConnTypeNone && !p.EqualsAttr(AttrSupportDefaultProtocols, true) { + rc = p2pConnTypeOther + p2p.logger.Debugln("handleP2PConnectionResponse", "not support defaultProtocols", p.ID()) + } switch rc { case p2pConnTypeParent: if !p2p.updatePeerConnectionType(p, p2pConnTypeParent) && @@ -1968,11 +1971,13 @@ func (p2p *PeerToPeer) handleP2PConnectionRequest(pkt *Packet, p *Peer) { m := &P2PConnectionResponse{ReqConnType: req.ConnType, ConnType: p.ConnType()} if m.ConnType == p2pConnTypeOther { //for legacy which is not supported p2pConnTypeOther response - switch req.ConnType { - case p2pConnTypeParent: - m.ConnType = p2pConnTypeChildren - case p2pConnTypeUncle: - m.ConnType = p2pConnTypeNephew + if p.EqualsAttr(AttrP2PLegacy, true) { + switch req.ConnType { + case p2pConnTypeParent: + m.ConnType = p2pConnTypeChildren + case p2pConnTypeUncle: + m.ConnType = p2pConnTypeNephew + } } } rpkt := newPacket(p2pProtoControl, p2pProtoConnResp, p2p.encodeMsgpack(m), p2p.ID()) @@ -2102,6 +2107,10 @@ func (p2p *PeerToPeer) handleP2PConnectionResponse(pkt *Packet, p *Peer) { } else { p2p.logger.Debugln("handleP2PConnectionResponse", "resolvedConnType", strPeerConnectionType[resp.ConnType], "from", p.ID(), p.ConnType()) + if rc != p2pConnTypeNone && !p.EqualsAttr(AttrSupportDefaultProtocols, true) { + rc = p2pConnTypeOther + p2p.logger.Debugln("handleP2PConnectionResponse", "not support defaultProtocols", p.ID()) + } switch rc { case p2pConnTypeFriend, p2pConnTypeOther, p2pConnTypeNone: p2p.updatePeerConnectionType(p, rc) diff --git a/network/protocolinfos.go b/network/protocolinfos.go index 4a128cd00..2ad767260 100644 --- a/network/protocolinfos.go +++ b/network/protocolinfos.go @@ -118,6 +118,18 @@ func (pis *ProtocolInfos) Exists(pi module.ProtocolInfo) bool { return false } +func (pis *ProtocolInfos) ExistsByID(piList ...module.ProtocolInfo) bool { + pis.mtx.RLock() + defer pis.mtx.RUnlock() + + for _, pi := range piList { + if l, ok := pis.m[pi.ID()]; !ok || len(l) == 0 { + return false + } + } + return true +} + func (pis *ProtocolInfos) Array() []module.ProtocolInfo { pis.mtx.RLock() defer pis.mtx.RUnlock() From 38b0f466fa686b525ee50c9585649a677f78dccf Mon Sep 17 00:00:00 2001 From: Jeongseo Park Date: Tue, 12 Jul 2022 17:40:14 +0900 Subject: [PATCH 13/27] Using height and block_hash in goloop reset command --- cmd/cli/admin.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/cmd/cli/admin.go b/cmd/cli/admin.go index bc475aeec..524e06f74 100644 --- a/cmd/cli/admin.go +++ b/cmd/cli/admin.go @@ -2,6 +2,7 @@ package cli import ( "bytes" + "encoding/hex" "encoding/json" "fmt" "io" @@ -294,12 +295,31 @@ func NewChainCmd(parentCmd *cobra.Command, parentVc *viper.Viper) (*cobra.Comman Use: "reset CID", Short: "Chain data reset", Args: ArgsWithDefaultErrorFunc(cobra.ExactArgs(1)), - RunE: opFunc("reset"), + RunE: func(cmd *cobra.Command, args []string) error { + param := &node.ChainResetParam{} + var err error + + if param.Height, err = cmd.Flags().GetInt64("height"); err != nil { + return err + } + if param.BlockHash, err = hex.DecodeString(cmd.Flag("block_hash").Value.String()); err != nil { + return err + } + + var v string + reqUrl := node.UrlChain + "/" + args[0] + "/reset" + if _, err = adminClient.PostWithJson(reqUrl, param, &v); err != nil { + return err + } + fmt.Println(v) + return nil + }, } rootCmd.AddCommand(resetCmd) resetFlags := resetCmd.Flags() resetFlags.Int64("height", 0, "Block Height") resetFlags.String("block_hash", "", "Hash of the block at the given height") + MarkAnnotationRequired(resetFlags, "height", "block_hash") importCmd := &cobra.Command{ Use: "import CID", From 822291abb1210c085abc7b3129e49b6e93505556 Mon Sep 17 00:00:00 2001 From: Jeongseo Park Date: Wed, 13 Jul 2022 16:12:28 +0900 Subject: [PATCH 14/27] Handle other connection response for normal node --- network/p2p.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/network/p2p.go b/network/p2p.go index bda31ccc4..96ac9b162 100644 --- a/network/p2p.go +++ b/network/p2p.go @@ -2075,6 +2075,8 @@ func (p2p *PeerToPeer) handleP2PConnectionResponse(pkt *Packet, p *Peer) { rc = p2pConnTypeParent case p2pConnTypeNephew: rejectResp = true + case p2pConnTypeOther: + rc = p2pConnTypeOther case p2pConnTypeNone: rc = p2pConnTypeNone rejectResp = true @@ -2085,6 +2087,8 @@ func (p2p *PeerToPeer) handleP2PConnectionResponse(pkt *Packet, p *Peer) { switch resp.ConnType { case p2pConnTypeNephew: rc = p2pConnTypeUncle + case p2pConnTypeOther: + rc = p2pConnTypeOther case p2pConnTypeNone: rc = p2pConnTypeNone rejectResp = true From e3ebeb8d017c126603a98ca5cd0faa1087c923c5 Mon Sep 17 00:00:00 2001 From: Jeongseo Park Date: Thu, 14 Jul 2022 13:57:05 +0900 Subject: [PATCH 15/27] Update ChainResetParam in admin API and cli --- cmd/cli/admin.go | 24 ++++++++++++++++++------ doc/goloop_admin_api.yaml | 19 +++++++++++++++++++ node/rest.go | 3 +-- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/cmd/cli/admin.go b/cmd/cli/admin.go index 524e06f74..b0103e79f 100644 --- a/cmd/cli/admin.go +++ b/cmd/cli/admin.go @@ -298,12 +298,25 @@ func NewChainCmd(parentCmd *cobra.Command, parentVc *viper.Viper) (*cobra.Comman RunE: func(cmd *cobra.Command, args []string) error { param := &node.ChainResetParam{} var err error - - if param.Height, err = cmd.Flags().GetInt64("height"); err != nil { + fs := cmd.Flags() + if param.Height, err = fs.GetInt64("height"); err != nil { return err } - if param.BlockHash, err = hex.DecodeString(cmd.Flag("block_hash").Value.String()); err != nil { - return err + blockHash := cmd.Flag("block_hash").Value.String() + if blockHash[:2] == "0x" { + blockHash = blockHash[2:] + } + if len(blockHash) > 0 { + if param.BlockHash, err = hex.DecodeString(blockHash); err != nil { + return err + } + } + if param.Height < 0 { + return fmt.Errorf("height should be zero or positive value") + } else if param.Height == 0 && len(blockHash) > 0 { + return fmt.Errorf("block_hash should be empty value") + } else if param.Height > 0 && len(blockHash) == 0 { + return fmt.Errorf("block_hash required") } var v string @@ -318,8 +331,7 @@ func NewChainCmd(parentCmd *cobra.Command, parentVc *viper.Viper) (*cobra.Comman rootCmd.AddCommand(resetCmd) resetFlags := resetCmd.Flags() resetFlags.Int64("height", 0, "Block Height") - resetFlags.String("block_hash", "", "Hash of the block at the given height") - MarkAnnotationRequired(resetFlags, "height", "block_hash") + resetFlags.String("block_hash", "", "Hash of the block at the given height, If height is zero, shall be empty") importCmd := &cobra.Command{ Use: "import CID", diff --git a/doc/goloop_admin_api.yaml b/doc/goloop_admin_api.yaml index ac71e77fe..f4bf099c1 100644 --- a/doc/goloop_admin_api.yaml +++ b/doc/goloop_admin_api.yaml @@ -174,6 +174,12 @@ paths: description: Reset Chain. parameters: - <<: *path__cid + requestBody: + required: true + content: + 'application/json': + schema: + $ref: "#/components/schemas/ChainResetParam" responses: "200": description: Success @@ -604,6 +610,19 @@ components: platform: "basic" childrenLimit: -1 nephewsLimit: -1 + ChainResetParam: + type: object + properties: + height: + type: int64 + description: "Block Height" + blockHash: + type: string + format: "\"0x\" + lowercase HEX string" + description: "Block Hash" + example: + height: 1 + blockHash: "0x77ae0f77a345b3e5e8b65f6084cee34d04f037b1b6213134a463781b84006fcc" ChainImportParam: type: object properties: diff --git a/node/rest.go b/node/rest.go index 31ca60536..8a737e292 100644 --- a/node/rest.go +++ b/node/rest.go @@ -100,7 +100,6 @@ type ChainConfig struct { } type ChainResetParam struct { - DBType string `json:"dbType,omitempty"` Height int64 `json:"height,omitempty"` BlockHash common.HexBytes `json:"blockHash,omitempty"` } @@ -365,7 +364,7 @@ func (r *Rest) ResetChain(ctx echo.Context) error { if err := ctx.Bind(param); err != nil { return echo.ErrBadRequest } - if param.Height < 1 { + if param.Height < 0 { return echo.ErrBadRequest } if err := r.n.ResetChain(c.CID(), param.Height, param.BlockHash); err != nil { From b48202b16496007b2bd15921c864e242c48d6750 Mon Sep 17 00:00:00 2001 From: Jeongseo Park Date: Thu, 14 Jul 2022 15:17:29 +0900 Subject: [PATCH 16/27] Fix for cleanup registered protocols --- network/network.go | 5 +++++ network/protocolinfos.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/network/network.go b/network/network.go index 5b8169ed8..7ff894052 100644 --- a/network/network.go +++ b/network/network.go @@ -107,6 +107,11 @@ func (m *manager) Term() { for _, ph := range m.protocolHandlers { m.logger.Debugln("Term", ph.name) ph.Term() + m.cn.removeProtocol(m.channel, ph.protocol) + } + + for _, pi := range m.p2p.supportedProtocols() { + m.cn.removeProtocol(m.channel, pi) } } diff --git a/network/protocolinfos.go b/network/protocolinfos.go index 2ad767260..efd5820c5 100644 --- a/network/protocolinfos.go +++ b/network/protocolinfos.go @@ -101,7 +101,7 @@ func (pis *ProtocolInfos) Remove(pi module.ProtocolInfo) { pis.m[pi.ID()] = l } idx = pis.indexOf(pis.l, pi) - pis.remove(pis.l, idx) + pis.l = pis.remove(pis.l, idx) } } } From 53899534d0f1d63c8725302fedefcd2cdfbacace Mon Sep 17 00:00:00 2001 From: Jeongseo Park Date: Mon, 18 Jul 2022 12:03:34 +0900 Subject: [PATCH 17/27] Fix problem in goloop reset command --- cmd/cli/admin.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/cli/admin.go b/cmd/cli/admin.go index b0103e79f..8f911809a 100644 --- a/cmd/cli/admin.go +++ b/cmd/cli/admin.go @@ -303,10 +303,10 @@ func NewChainCmd(parentCmd *cobra.Command, parentVc *viper.Viper) (*cobra.Comman return err } blockHash := cmd.Flag("block_hash").Value.String() - if blockHash[:2] == "0x" { - blockHash = blockHash[2:] - } if len(blockHash) > 0 { + if len(blockHash) >= 2 && blockHash[:2] == "0x" { + blockHash = blockHash[2:] + } if param.BlockHash, err = hex.DecodeString(blockHash); err != nil { return err } From 54719af9dfdc168073cb68ad9ec2c2456a704181 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Thu, 21 Jul 2022 16:54:50 +0900 Subject: [PATCH 18/27] Improve merkle.Builder for optimized download --- common/merkle/builder.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/common/merkle/builder.go b/common/merkle/builder.go index c62740585..2fca3bcc5 100644 --- a/common/merkle/builder.go +++ b/common/merkle/builder.go @@ -37,6 +37,7 @@ type merkleBuilder struct { store db.Database requests *list.List requestMap map[string]*list.Element + onDataMark *list.Element } type requestIterator struct { @@ -79,6 +80,10 @@ func (b *merkleBuilder) OnData(value []byte) error { reqID := string(key) if e, ok := b.requestMap[reqID]; ok { req := e.Value.(*request) + b.onDataMark = e + defer func() { + b.onDataMark = nil + }() for i, requester := range req.requesters { bkID := req.bucketIDs[i] bk, err := b.store.GetBucket(bkID) @@ -114,7 +119,12 @@ func (b *merkleBuilder) RequestData(id db.BucketID, key []byte, requester DataRe bucketIDs: []db.BucketID{id}, requesters: []DataRequester{requester}, } - e := b.requests.PushBack(req) + if b.onDataMark != nil { + e = b.requests.InsertAfter(req, b.onDataMark) + b.onDataMark = e + } else { + e = b.requests.PushBack(req) + } b.requestMap[reqID] = e } } From c5f2b85d884884119c1a57ddef2a80353b1e618b Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Thu, 21 Jul 2022 20:25:33 +0900 Subject: [PATCH 19/27] Fix problem, calling GenesisStorageWriter.Close() twice --- block/manager.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/block/manager.go b/block/manager.go index 444ba3b46..0d2ccf8b0 100644 --- a/block/manager.go +++ b/block/manager.go @@ -1551,9 +1551,6 @@ func (m *manager) ExportGenesis(blk module.Block, votes module.CommitVoteSet, gs if err := gsw.WriteGenesis(g); err != nil { return errors.Wrap(err, "fail to write genesis") } - defer func() { - m.log.Must(gsw.Close()) - }() if _, err := gsw.WriteData(votes.Bytes()); err != nil { return errors.Wrap(err, "fail to write votes") From 7141d6a6e8e6db0cf208cf0f90a46f479115efa8 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Fri, 22 Jul 2022 13:19:32 +0900 Subject: [PATCH 20/27] Enhance merkle.Copy for limited memory usage --- common/merkle/copy.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/common/merkle/copy.go b/common/merkle/copy.go index 393c8837b..8badae996 100644 --- a/common/merkle/copy.go +++ b/common/merkle/copy.go @@ -5,6 +5,10 @@ import ( "github.com/icon-project/goloop/common/errors" ) +const ( + MaxNumberOfItemsToCopyInRow = 50 +) + type CopyContext struct { builder Builder src db.Database @@ -19,6 +23,7 @@ func (e *CopyContext) Builder() Builder { func (e *CopyContext) Run() error { for e.builder.UnresolvedCount() > 0 { itr := e.builder.Requests() + processed := 0 for itr.Next() { found := false for _, id := range itr.BucketIDs() { @@ -42,6 +47,16 @@ func (e *CopyContext) Run() error { if !found { return errors.NotFoundError.Errorf("FailToFindValue(key=%x", itr.Key()) } + + // Prevent massive memory usage by cumulated requests. + // New requests are inserted before the next, so if it continues + // to process the next, then the number of requests increases until + // it reaches the end of the iteration. + // It could be very big if we sync large tree structure. + // So, let's stop after process some items. + if processed += 1; processed >= MaxNumberOfItemsToCopyInRow { + break + } } } return nil From c1a7acea6e976645220c2eb661f3b41edb3d57a4 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Fri, 22 Jul 2022 13:41:51 +0900 Subject: [PATCH 21/27] Fix bug, not removing the request from map on merkle.Builder.OnData --- common/merkle/builder.go | 1 + 1 file changed, 1 insertion(+) diff --git a/common/merkle/builder.go b/common/merkle/builder.go index 2fca3bcc5..22b35a4dd 100644 --- a/common/merkle/builder.go +++ b/common/merkle/builder.go @@ -98,6 +98,7 @@ func (b *merkleBuilder) OnData(value []byte) error { } } b.requests.Remove(e) + delete(b.requestMap, reqID) return nil } else { return errors.New("IllegalArguments") From 5b26ada3084883f4528b96e5e0454894276ba6e9 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Fri, 22 Jul 2022 13:54:17 +0900 Subject: [PATCH 22/27] Disable linking node on resolving nodes by builder --- common/trie/ompt/mpt.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/common/trie/ompt/mpt.go b/common/trie/ompt/mpt.go index 894f54e13..d286f5b15 100644 --- a/common/trie/ompt/mpt.go +++ b/common/trie/ompt/mpt.go @@ -506,7 +506,6 @@ func (m *mpt) Resolve(bd merkle.Builder) { type nodeRequester struct { mpt *mpt - node *node hash []byte } @@ -515,7 +514,6 @@ func (r *nodeRequester) OnData(bs []byte, bd merkle.Builder) error { if err != nil { return err } - *r.node = node return node.resolve(r.mpt, bd) } @@ -529,7 +527,6 @@ func (m *mpt) resolve(d merkle.Builder, pNode *node) { hash := node.hash() d.RequestData(db.MerkleTrie, hash, &nodeRequester{ mpt: m, - node: pNode, hash: hash, }) } From fae75289f8f0bda5380fa6ad3a47a851847c97ef Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Mon, 25 Jul 2022 16:59:22 +0900 Subject: [PATCH 23/27] Refactor test cases for common/database_test.go --- common/db/database_test.go | 57 +++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/common/db/database_test.go b/common/db/database_test.go index 6bcf6079e..652482745 100644 --- a/common/db/database_test.go +++ b/common/db/database_test.go @@ -17,21 +17,14 @@ package db import ( - "io/ioutil" - "os" "testing" "github.com/stretchr/testify/assert" ) -func testDatabase_GetSetDelete(t *testing.T, backend BackendType) { - dir, err := ioutil.TempDir("", string(backend)) - if err != nil { - panic(err) - } - defer os.RemoveAll(dir) - - testDB, _ := openDatabase(backend, "test", dir) +func testDatabase_GetSetDelete(t *testing.T, creator dbCreator) { + dir := t.TempDir() + testDB, _ := creator("test", dir) defer testDB.Close() key := []byte("hello") @@ -76,36 +69,41 @@ func testDatabase_GetSetDelete(t *testing.T, backend BackendType) { err = bucket.Set(key2, []byte{}) assert.NoError(t, err) - // HAS returns false + // HAS returns true has, err = bucket.Has(key2) assert.NoError(t, err) assert.True(t, has) + + // GET returns non-nil(empty) + value, err = bucket.Get(key2) + assert.NoError(t, err) + assert.True(t, value != nil) + assert.Zero(t, len(value)) } func TestDatabase_GetSetDelete(t *testing.T) { - for be, _ := range backends { - t.Run(string(be), func(t *testing.T) { + for name, be := range backends { + t.Run(string(name), func(t *testing.T) { testDatabase_GetSetDelete(t, be) }) } + t.Run("layerdb", func(t *testing.T) { + var creator dbCreator = func(name string, dir string) (Database, error) { + origin := NewMapDB() + return NewLayerDB(origin), nil + } + testDatabase_GetSetDelete(t, creator) + }) } -func testDatabase_SetReopenGet(t *testing.T, backend BackendType) { - if backend == MapDBBackend { - return - } - dir, err := ioutil.TempDir("", string(backend)) - if err != nil { - panic(err) - } - defer os.RemoveAll(dir) - +func testDatabase_SetReopenGet(t *testing.T, creator dbCreator) { + dir := t.TempDir() key := []byte("hello") key2 := []byte("hell") value := []byte("world") buckets := []BucketID{"hello", MerkleTrie, BytesByHash} - testDB, err := openDatabase(backend, "test", dir) + testDB, err := creator("test", dir) assert.NoError(t, err) defer func() { if testDB != nil { @@ -123,7 +121,7 @@ func testDatabase_SetReopenGet(t *testing.T, backend BackendType) { testDB = nil assert.NoError(t, err) - testDB, err = openDatabase(backend, "test", dir) + testDB, err = creator("test", dir) for _, id := range buckets { bucket, err := testDB.GetBucket(id) @@ -143,9 +141,12 @@ func testDatabase_SetReopenGet(t *testing.T, backend BackendType) { } func TestDatabase_SetReopenGet(t *testing.T) { - for be, _ := range backends { - t.Run(string(be), func(t *testing.T) { - testDatabase_SetReopenGet(t, be) + for name, creator := range backends { + if name == MapDBBackend { + continue + } + t.Run(string(name), func(t *testing.T) { + testDatabase_SetReopenGet(t, creator) }) } } From 899fddae79bb37d6231b1ec5939c9307367fbf40 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Mon, 25 Jul 2022 17:01:12 +0900 Subject: [PATCH 24/27] Fix bugs in layer_db.go --- common/db/layer_db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/db/layer_db.go b/common/db/layer_db.go index a8d11c0db..298a5406d 100644 --- a/common/db/layer_db.go +++ b/common/db/layer_db.go @@ -30,7 +30,7 @@ func (bk *layerBucket) Has(key []byte) (bool, error) { if bk.data != nil { if value, ok := bk.data[string(key)]; ok { - return len(value) > 0, nil + return value != nil, nil } } return bk.real.Has(key) From 3b3beff0bdd2cf22234d4c088168de77e9a54e34 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Wed, 27 Jul 2022 14:55:33 +0900 Subject: [PATCH 25/27] Expose gs.NewPrunedGenesis --- block/manager.go | 4 ++-- chain/gs/genesis.go | 2 +- chain/gs/pruned.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/block/manager.go b/block/manager.go index 0d2ccf8b0..e96fcb24e 100644 --- a/block/manager.go +++ b/block/manager.go @@ -869,8 +869,8 @@ func (m *manager) _importBlockByID(src db.Database, id []byte) (module.Block, er func (m *manager) finalizePrunedBlock() error { s := m.chain.GenesisStorage() - g := new(gs.PrunedGenesis) - if err := json.Unmarshal(s.Genesis(), g); err != nil { + g, err := gs.NewPrunedGenesis(s.Genesis()) + if err != nil { return transaction.InvalidGenesisError.Wrap(err, "InvalidGenesis") } d := gs.NewDatabaseWithStorage(s) diff --git a/chain/gs/genesis.go b/chain/gs/genesis.go index be2692851..4ed92cbd9 100644 --- a/chain/gs/genesis.go +++ b/chain/gs/genesis.go @@ -37,7 +37,7 @@ type genesisStorage struct { func (gs *genesisStorage) ensureTypeAndIDs() error { if gs.cid == 0 { - if pg, err := newPrunedGenesis(gs.Genesis()); err == nil { + if pg, err := NewPrunedGenesis(gs.Genesis()); err == nil { gs.cid = int(pg.CID.Value) gs.nid = int(pg.NID.Value) gs.gType = module.GenesisPruned diff --git a/chain/gs/pruned.go b/chain/gs/pruned.go index f95b8ab57..6d197167a 100644 --- a/chain/gs/pruned.go +++ b/chain/gs/pruned.go @@ -32,7 +32,7 @@ func (g *PrunedGenesis) Verify() error { return nil } -func newPrunedGenesis(js []byte) (*PrunedGenesis, error) { +func NewPrunedGenesis(js []byte) (*PrunedGenesis, error) { g := new(PrunedGenesis) if err := json.Unmarshal(js, g); err != nil { return nil, err From d43a309f52361c58a5cc3fff3ac8605f3a2db5f9 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Wed, 27 Jul 2022 15:03:34 +0900 Subject: [PATCH 26/27] Use BlockData instead of Block for BlockManager.ExportGenesis --- block/manager.go | 2 +- module/block.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/block/manager.go b/block/manager.go index e96fcb24e..7b2dd688a 100644 --- a/block/manager.go +++ b/block/manager.go @@ -1515,7 +1515,7 @@ func hasBits(v int, bits int) bool { return (v & bits) == bits } -func (m *manager) ExportGenesis(blk module.Block, votes module.CommitVoteSet, gsw module.GenesisStorageWriter) error { +func (m *manager) ExportGenesis(blk module.BlockData, votes module.CommitVoteSet, gsw module.GenesisStorageWriter) error { height := blk.Height() if votes == nil { diff --git a/module/block.go b/module/block.go index 2ad788fb4..17beae762 100644 --- a/module/block.go +++ b/module/block.go @@ -118,7 +118,7 @@ type BlockManager interface { ExportBlocks(from, to int64, dst db.Database, on func(height int64) error) error // ExportGenesis exports genesis to the writer based on the block. - ExportGenesis(blk Block, votes CommitVoteSet, writer GenesisStorageWriter) error + ExportGenesis(blk BlockData, votes CommitVoteSet, writer GenesisStorageWriter) error // GetGenesisVotes returns available votes from genesis storage. // They are available only when it starts from genesis. From 8286593a3f85725fa52634221e916670546fa969 Mon Sep 17 00:00:00 2001 From: MoonKyu Song Date: Wed, 27 Jul 2022 15:03:43 +0900 Subject: [PATCH 27/27] Change pruned genesis storage usage and format --- block/manager.go | 2 +- chain/revertible.go | 96 ++++++++++++++ chain/taskreset.go | 301 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 335 insertions(+), 64 deletions(-) create mode 100644 chain/revertible.go diff --git a/block/manager.go b/block/manager.go index 7b2dd688a..0b67deb4e 100644 --- a/block/manager.go +++ b/block/manager.go @@ -1555,7 +1555,7 @@ func (m *manager) ExportGenesis(blk module.BlockData, votes module.CommitVoteSet if _, err := gsw.WriteData(votes.Bytes()); err != nil { return errors.Wrap(err, "fail to write votes") } - return m._exportBlocks(height, height, gs.NewDatabaseWithWriter(gsw), exportHashable, nil) + return nil } func (m *manager) ExportBlocks(from, to int64, dst db.Database, on func(h int64) error) error { diff --git a/chain/revertible.go b/chain/revertible.go new file mode 100644 index 000000000..1a9df3345 --- /dev/null +++ b/chain/revertible.go @@ -0,0 +1,96 @@ +/* + * Copyright 2022 ICON Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package chain + +import ( + "io/fs" + "os" + + "github.com/icon-project/goloop/common/errors" + "github.com/icon-project/goloop/common/log" +) + +const BackupSuffix = ".bk" + +type RevertHandler func(revert bool) +type Revertible []RevertHandler + +func (h *Revertible) Delete(p string) error { + p2 := p + BackupSuffix + err := os.Rename(p, p2) + if err == nil { + *h = append(*h, func(revert bool) { + if revert { + // log.Tracef("Revertible.Delete os.Rename(%s,%s)", p2, p) + log.Must(os.Rename(p2, p)) + } else { + // log.Tracef("Revertible.Delete os.RemoveAll(%s)", p2) + log.Must(os.RemoveAll(p2)) + } + }) + return nil + } else if errors.Is(err, fs.ErrNotExist) { + return nil + } else { + return err + } +} + +func (h *Revertible) Rename(p1, p2 string) error { + err := os.Rename(p1, p2) + if err == nil { + *h = append(*h, func(revert bool) { + if revert { + // log.Tracef("Revertible.Rename os.Rename(%s,%s)", p2, p1) + log.Must(os.Rename(p2, p1)) + } + }) + return nil + } else if errors.Is(err, fs.ErrNotExist) { + return nil + } else { + return err + } +} + +func (h *Revertible) Append(handler RevertHandler) int { + *h = append(*h, handler) + return len(*h) - 1 +} + +func (h *Revertible) RevertOrCommitOne(index int, revert bool) bool { + items := *h + if index < 0 || index >= len(items) { + return false + } + if handler := items[index]; handler != nil { + items[index] = nil + handler(revert) + return true + } + return false +} + +func (h *Revertible) RevertOrCommit(revert bool) { + rec := *h + *h = nil + for idx := len(rec) - 1; idx >= 0; idx -= 1 { + if handler := rec[idx]; handler != nil { + handler(revert) + } + } +} diff --git a/chain/taskreset.go b/chain/taskreset.go index 5767b336f..119e3fc94 100644 --- a/chain/taskreset.go +++ b/chain/taskreset.go @@ -17,6 +17,8 @@ package chain import ( + "bytes" + "fmt" "os" "path" @@ -24,12 +26,17 @@ import ( "github.com/icon-project/goloop/chain/gs" "github.com/icon-project/goloop/common/crypto" "github.com/icon-project/goloop/common/errors" + "github.com/icon-project/goloop/common/log" "github.com/icon-project/goloop/consensus/fastsync" "github.com/icon-project/goloop/module" "github.com/icon-project/goloop/network" "github.com/icon-project/goloop/service" ) +const ( + TempSuffix = ".tmp" +) + type taskReset struct { chain *singleChain result resultStore @@ -48,6 +55,9 @@ var resetStates = map[State]string{ } func (t *taskReset) String() string { + if t.height != 0 { + return fmt.Sprintf("Reset(height=%d,blockHash=%#x)", t.height, t.blockHash) + } return "Reset" } @@ -118,7 +128,7 @@ func (t *taskReset) _fetchBlock(fsm fastsync.Manager, h int64, hash []byte) (mod return blk, votes, nil } -func (t *taskReset) _prepareBlocks() (module.BlockData, module.CommitVoteSet, error) { +func (t *taskReset) _prepareBlocks(height int64, blockHash []byte) (module.BlockData, module.CommitVoteSet, error) { c := t.chain defer c.releaseManagers() @@ -148,15 +158,15 @@ func (t *taskReset) _prepareBlocks() (module.BlockData, module.CommitVoteSet, er return nil, nil, err } - blk, votes, err := t._fetchBlock(fsm, t.height, t.blockHash) + blk, votes, err := t._fetchBlock(fsm, height, blockHash) if err != nil { return nil, nil, err } - pBlk, _, err := t._fetchBlock(fsm, t.height-1, blk.PrevID()) + pBlk, _, err := t._fetchBlock(fsm, height-1, blk.PrevID()) if err != nil { return nil, nil, err } - ppBlk, _, err := t._fetchBlock(fsm, t.height-2, pBlk.PrevID()) + ppBlk, _, err := t._fetchBlock(fsm, height-2, pBlk.PrevID()) if err != nil { return nil, nil, err } @@ -171,15 +181,18 @@ func (t *taskReset) _prepareBlocks() (module.BlockData, module.CommitVoteSet, er return nil, nil, err } - if err = block.SetLastHeight(c.Database(), nil, t.height); err != nil { + if err = block.SetLastHeight(c.Database(), nil, height); err != nil { return nil, nil, err } return blk, votes, nil } -func (t *taskReset) _exportGenesis(blk module.Block, votes module.CommitVoteSet, gsfile string) (rerr error) { - _ = os.RemoveAll(gsfile) +func (t *taskReset) _exportGenesis(blk module.BlockData, votes module.CommitVoteSet, gsfile string) (rerr error) { + if err := t.chain.prepareManagers(); err != nil { + return err + } + defer t.chain.releaseManagers() fd, err := os.OpenFile(gsfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL|os.O_TRUNC, 0700) if err != nil { return err @@ -198,95 +211,257 @@ func (t *taskReset) _exportGenesis(blk module.Block, votes module.CommitVoteSet, return nil } -func (t *taskReset) _makePrunedGenesis(blkData module.BlockData, votes module.CommitVoteSet) (err error) { - c := t.chain - if err := c.prepareManagers(); err != nil { - return err +func (t *taskReset) _exportBlocks(dbDirNew string, dbTypeNew string, height int64, blockHash []byte, votes module.CommitVoteSet) (rblk module.Block, rvotes module.CommitVoteSet, ret error) { + // open database for export + _ = os.RemoveAll(dbDirNew) + newDB, err := t.chain.openDatabase(dbDirNew, dbTypeNew) + if err != nil { + return nil, nil, err } - defer c.releaseManagers() + defer func() { + log.Must(newDB.Close()) + if ret != nil { + log.Must(os.RemoveAll(dbDirNew)) + } + }() + + // prepare managers + if err := t.chain.prepareManagers(); err != nil { + return nil, nil, err + } + defer t.chain.releaseManagers() - blk, err := t.chain.bm.GetBlockByHeight(blkData.Height()) + // check block hash with height + blk, err := t.chain.bm.GetBlockByHeight(height) if err != nil { - return err + return nil, nil, err } - if cid, err := c.sm.GetChainID(blk.Result()); err != nil { - return errors.InvalidStateError.New("No ChainID is recorded (require Revision 8)") - } else { - if cid != int64(c.CID()) { - return errors.InvalidStateError.Errorf("Invalid chain ID real=%d exp=%d", cid, c.CID()) + if !bytes.Equal(blk.ID(), blockHash) { + return nil, nil, errors.InvalidStateError.Errorf("BlockIDInvalid(exp=%#x,real=%#x)", + blockHash, blk.ID()) + } + + // copy blocks for new genesis + if err := t.chain.bm.ExportBlocks(height, height, newDB, func(height int64) error { + return nil + }); err != nil { + return nil, nil, err + } + + // use given votes or get votes from the next block + if votes == nil { + if nblk, err := t.chain.bm.GetBlockByHeight(height + 1); err != nil { + return nil, nil, err + } else { + votes = nblk.Votes() } } + return blk, votes, nil +} - gsTmp := t.gsfile + ".tmp" - if err := t._exportGenesis(blk, votes, gsTmp); err != nil { - return err +func (t *taskReset) _syncBlocks(height int64, blockHash []byte, votes module.CommitVoteSet) (rblk module.BlockData, rvotes module.CommitVoteSet, rrb Revertible, ret error) { + logger := t.chain.Logger() + logger.Debugf("syncBlocks: START height=%d blockHash=%#x", height, blockHash) + defer logger.Debugf("syncBlocks: DONE err=%+v", ret) + rblk, rvotes, rrb, ret = t._syncBlocksWithDB(height, blockHash, votes) + if ret == nil { + return } + logger.Debugf("syncBlocks: syncBlocksWithDB fails err=%v continue with syncBlocksWithNetwork", ret) + return t._syncBlocksWithNetwork(height, blockHash) +} + +func (t *taskReset) _syncBlocksWithDB(height int64, blockHash []byte, votes module.CommitVoteSet) (rblk module.BlockData, rvotes module.CommitVoteSet, rrb Revertible, ret error) { + var rb Revertible defer func() { - if err != nil { - _ = os.Remove(gsTmp) + if ret != nil { + rb.RevertOrCommit(true) + } else { + rrb = rb } }() - _, err = os.Stat(t.gsfile) - if err == nil { - gsbk := t.gsfile + ".bk" - _ = os.RemoveAll(gsbk) - if err := os.Rename(t.gsfile, gsbk); err != nil { - return errors.UnknownError.Wrapf(err, "fail on backup %s to %s", - t.gsfile, gsbk) + // prepare database for exporting blocks + chainDir := t.chain.cfg.AbsBaseDir() + dbDir := path.Join(chainDir, DefaultDBDir) + + dbDirNew := dbDir + TempSuffix + dbTypeNew := t.chain.cfg.DBType + rblk, rvotes, ret = t._exportBlocks(dbDirNew, dbTypeNew, height, blockHash, votes) + if ret != nil { + return + } + rb.Append(func(revert bool) { + if revert { + log.Must(os.RemoveAll(dbDirNew)) } - defer func() { - if err != nil { - _ = os.RemoveAll(t.gsfile) - _ = os.Rename(gsbk, t.gsfile) - } else { - _ = os.RemoveAll(gsbk) - } - }() - } else if !errors.Is(err, os.ErrNotExist) { - return errors.UnknownError.Wrapf(err, "cannot stat %s", t.gsfile) + }) + + // replace with new database + t.chain.releaseDatabase() + rb.Append(func(revert bool) { + if revert { + t.chain.ensureDatabase() + } + }) + if ret = rb.Delete(dbDir); ret != nil { + return } - if err := os.Rename(gsTmp, t.gsfile); err != nil { - return errors.UnknownError.Errorf("fail to rename %s to %s", - gsTmp, t.gsfile) + if ret = rb.Rename(dbDirNew, dbDir); ret != nil { + return } + t.chain.ensureDatabase() + rb.Append(func(revert bool) { + if revert { + t.chain.releaseDatabase() + } + }) - // replace genesis - fd, err := os.Open(t.gsfile) - if err != nil { - return errors.UnknownError.Wrapf(err, "fail to open file=%s", t.gsfile) + // remove other directories + contractDir := path.Join(chainDir, DefaultContractDir) + if ret = rb.Delete(contractDir); ret != nil { + return } - g, err := gs.NewFromFile(fd) - if err != nil { - return errors.UnknownError.Wrapf(err, "fail to parse gs=%s", t.gsfile) + walDir := path.Join(chainDir, DefaultWALDir) + if ret = rb.Delete(walDir); ret != nil { + return + } + cacheDir := path.Join(chainDir, DefaultCacheDir) + if ret = rb.Delete(cacheDir); ret != nil { + return } + return +} - c.cfg.GenesisStorage = g - c.cfg.Genesis = g.Genesis() +func (t *taskReset) _syncBlocksWithNetwork(height int64, blockHash []byte) (rblk module.BlockData, rvotes module.CommitVoteSet, rrb Revertible, ret error) { + c := t.chain + chainDir := c.cfg.AbsBaseDir() - return nil + // prepare revert + var rb Revertible + defer func() { + if ret != nil { + rb.RevertOrCommit(true) + } else { + rrb = rb + } + }() + + // reset database + c.releaseDatabase() + rb.Append(func(revert bool) { + if revert { + c.ensureDatabase() + } + }) + dbDir := path.Join(chainDir, DefaultDBDir) + if ret = rb.Delete(dbDir); ret != nil { + return + } + c.ensureDatabase() + rb.Append(func(revert bool) { + if revert { + c.releaseDatabase() + log.Must(os.RemoveAll(dbDir)) + } + }) + + // remove other directories + contractDir := path.Join(chainDir, DefaultContractDir) + if ret = rb.Delete(contractDir); ret != nil { + return + } + WALDir := path.Join(chainDir, DefaultWALDir) + if ret = rb.Delete(WALDir); ret != nil { + return + } + CacheDir := path.Join(chainDir, DefaultCacheDir) + if ret = rb.Delete(CacheDir); ret != nil { + return + } + + rblk, rvotes, ret = t._prepareBlocks(height, blockHash) + return } -func (t *taskReset) _reset() (ret error) { - err := t._cleanUp() +func (t *taskReset) _resetToGenesis() (ret error) { + gsType, err := t.chain.GenesisStorage().Type() if err != nil { return err } - if t.height == 0 { + switch gsType { + case module.GenesisNormal: + return t._cleanUp() + case module.GenesisPruned: + c := t.chain + genesis, err := gs.NewPrunedGenesis(c.GenesisStorage().Genesis()) + if err != nil { + return err + } + votesBytes, err := c.GenesisStorage().Get(genesis.Votes.Bytes()) + if err != nil { + return err + } + votes := c.CommitVoteSetDecoder()(votesBytes) + _, _, rb, err := t._syncBlocks(genesis.Height.Value, genesis.Block.Bytes(), votes) + if err != nil { + return err + } + defer func() { + rb.RevertOrCommit(ret != nil) + }() return nil + default: + return errors.InvalidStateError.Errorf("UnknownGenesisType(type=%d)", gsType) } +} + +func loadGenesisStorage(file string) (g module.GenesisStorage, ret error) { + fd, err := os.OpenFile(file, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + return gs.NewFromFile(fd) +} - // do clean up again on failure +func (t *taskReset) _resetToHeight(height int64, blockHash []byte) (ret error) { + blk, votes, rb, err := t._syncBlocks(height, blockHash, nil) + if err != nil { + return err + } defer func() { - if ret != nil { - _ = t._cleanUp() - } + rb.RevertOrCommit(ret != nil) }() - blk, votes, err := t._prepareBlocks() + + // create pruned genesis + if err := rb.Delete(t.gsfile); err != nil { + return err + } + if err := t._exportGenesis(blk, votes, t.gsfile); err != nil { + return err + } + rb.Append(func(revert bool) { + if revert { + _ = os.Remove(t.gsfile) + } + }) + + // reload new genesis + g, err := loadGenesisStorage(t.gsfile) if err != nil { return err } - return t._makePrunedGenesis(blk, votes) + t.chain.cfg.GenesisStorage = g + t.chain.cfg.Genesis = g.Genesis() + return nil +} + +func (t *taskReset) _reset() (ret error) { + if t.height == 0 { + return t._resetToGenesis() + } else { + return t._resetToHeight(t.height, t.blockHash) + } } func (t *taskReset) Stop() {