Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into readonlycrosschain
Browse files Browse the repository at this point in the history
  • Loading branch information
springrain committed Nov 19, 2021
2 parents 30ca9b3 + e9bc306 commit 0a5e94e
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 47 deletions.
27 changes: 26 additions & 1 deletion bcs/consensus/xpoa/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package xpoa
import (
"encoding/json"
"errors"
"strconv"
)

var (
Expand Down Expand Up @@ -30,7 +31,6 @@ const (
)

type xpoaConfig struct {
Version string `json:"version,omitempty"`
// 每个候选人每轮出块个数
BlockNum int64 `json:"block_num"`
// 单位为毫秒
Expand Down Expand Up @@ -103,3 +103,28 @@ func (a aksSlice) Less(i, j int) bool {
}
return a[j].Weight < a[i].Weight
}

type xpoaStringConfig struct {
Version string `json:"version,omitempty"`
}

type xpoaIntConfig struct {
Version int64 `json:"version,omitempty"`
}

// ParseVersion 支持string格式和int格式的version type
func ParseVersion(cfg string) (int64, error) {
intVersion := xpoaIntConfig{}
if err := json.Unmarshal([]byte(cfg), &intVersion); err == nil {
return intVersion.Version, nil
}
strVersion := xpoaStringConfig{}
if err := json.Unmarshal([]byte(cfg), &strVersion); err != nil {
return 0, err
}
version, err := strconv.ParseInt(strVersion.Version, 10, 64)
if err != nil {
return 0, err
}
return version, nil
}
37 changes: 37 additions & 0 deletions bcs/consensus/xpoa/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,40 @@ func TestLoadValidatorsMultiInfo(t *testing.T) {
t.Error("TestLoadValidatorsMultiInfo error 2.")
}
}

func TestParseVersion(t *testing.T) {
strVersion := `{
"version": "2"
}`
v, err := ParseVersion(strVersion)
if err != nil {
t.Error("ParseVersion err, err: ", err)
return
}
if v != 2 {
t.Error("ParseVersion err, v: ", v)
return
}
intVersion := `{
"version": 3
}`
v, err = ParseVersion(intVersion)
if err != nil {
t.Error("ParseVersion err, err: ", err)
return
}
if v != 3 {
t.Error("ParseVersion err, v: ", v)
return
}
empryVersion := `{}`
v, err = ParseVersion(empryVersion)
if err != nil {
t.Error("ParseVersion err, err: ", err)
return
}
if v != 0 {
t.Error("ParseVersion err, v: ", v)
return
}
}
47 changes: 36 additions & 11 deletions bcs/consensus/xpoa/kernel_contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,25 @@ func (x *xpoaConsensus) methodEditValidates(contractCtx contract.KContext) (*con
if err != nil {
return common.NewContractErrResponse(common.StatusBadRequest, "invalid acl: pls check accept value."), err
}
if !x.isAuthAddress(aks, acceptValue) {

curValiBytes, err := contractCtx.Get(x.election.bindContractBucket,
[]byte(fmt.Sprintf("%d_%s", x.election.consensusVersion, validateKeys)))
curVali, err := func() ([]string, error) {
if err != nil || curValiBytes == nil {
return x.election.initValidators, nil
}
var curValiKey ProposerInfo
err = json.Unmarshal(curValiBytes, &curValiKey)
if err != nil {
x.log.Error("Unmarshal error")
return nil, err
}
return curValiKey.Address, nil
}()
if err != nil {
return common.NewContractErrResponse(common.StatusBadRequest, err.Error()), err
}
if !x.isAuthAddress(curVali, aks, acceptValue, x.election.enableBFT) {
return common.NewContractErrResponse(common.StatusBadRequest, aclErr.Error()), aclErr
}

Expand Down Expand Up @@ -87,10 +105,18 @@ func (x *xpoaConsensus) methodGetValidates(contractCtx contract.KContext) (*cont
}

// isAuthAddress 判断输入aks是否能在贪心下仍能满足签名数量>33%(Chained-BFT装载) or 50%(一般情况)
func (x *xpoaConsensus) isAuthAddress(aks map[string]float64, threshold float64) bool {
func (x *xpoaConsensus) isAuthAddress(validators []string, aks map[string]float64, threshold float64, enableBFT bool) bool {
// 0. 是否是单个候选人
if len(validators) == 1 {
weight, ok := aks[validators[0]]
if !ok {
return false
}
return weight >= threshold
}
// 1. 判断aks中的地址是否是当前集合地址
for addr, _ := range aks {
if !Find(addr, x.election.validators) {
if !Find(addr, validators) {
return false
}
}
Expand All @@ -106,15 +132,14 @@ func (x *xpoaConsensus) isAuthAddress(aks map[string]float64, threshold float64)
greedyCount := 0
sum := threshold
for i := 0; i < len(aks); i++ {
if sum > 0 {
sum -= s[i].Weight
greedyCount++
continue
if sum <= 0 {
break
}
break
sum -= s[i].Weight
greedyCount++
}
if !x.election.enableBFT {
return greedyCount >= len(x.election.validators)/2+1
if !enableBFT {
return greedyCount >= len(validators)/2+1
}
return CalFault(int64(greedyCount), int64(len(x.election.validators)))
return CalFault(int64(greedyCount), int64(len(validators)))
}
55 changes: 40 additions & 15 deletions bcs/consensus/xpoa/kernel_contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,15 @@ var (
"dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN": 0.5,
"WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT": 0.5,
}
)

func TestIsAuthAddress(t *testing.T) {
cCtx, err := prepare(getXpoaConsensusConf())
if err != nil {
t.Error("prepare error", "error", err)
return
}
i := NewXpoaConsensus(*cCtx, getConfig(getXpoaConsensusConf()))
xpoa, ok := i.(*xpoaConsensus)
if !ok {
t.Error("transfer err.")
aks2 = map[string]float64{
"dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN": 0.5,
"WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT": 0.6,
}
if !xpoa.isAuthAddress(aks, 0.6) {
t.Error("isAuthAddress err.")
aks3 = map[string]float64{
"dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN": 0.4,
"WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT": 0.6,
}
}
)

func NewEditArgs() map[string][]byte {
a := make(map[string][]byte)
Expand Down Expand Up @@ -85,3 +77,36 @@ func TestMethodGetValidates(t *testing.T) {
return
}
}

func TestIsAuthAddress(t *testing.T) {
cCtx, err := prepare(getXpoaConsensusConf())
if err != nil {
t.Error("prepare error", "error", err)
return
}
i := NewXpoaConsensus(*cCtx, getConfig(getXpoaConsensusConf()))
xpoa, ok := i.(*xpoaConsensus)
if !ok {
t.Error("transfer err.")
return
}
v1 := []string{"dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN", "WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT"}
if !xpoa.isAuthAddress(v1, aks, 0.6, false) {
t.Error("isAuthAddress err.")
return
}
v2 := []string{"dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN"}
if xpoa.isAuthAddress(v2, aks2, 0.6, true) {
t.Error("isAuthAddress err.")
return
}
v3 := []string{"WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT"}
if !xpoa.isAuthAddress(v3, aks2, 0.6, true) {
t.Error("isAuthAddress err.")
return
}
v4 := []string{"dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN", "WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT"}
if !xpoa.isAuthAddress(v4, aks2, 0.7, true) {
t.Error("isAuthAddress err.")
}
}
8 changes: 1 addition & 7 deletions bcs/consensus/xpoa/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package xpoa

import (
"fmt"
"strconv"
"time"

common "github.com/xuperchain/xupercore/kernel/consensus/base/common"
Expand Down Expand Up @@ -35,12 +34,7 @@ type xpoaSchedule struct {
ledger cctx.LedgerRely
}

func NewXpoaSchedule(xconfig *xpoaConfig, cCtx context.ConsensusCtx, startHeight int64) *xpoaSchedule {
version, err := strconv.ParseInt(xconfig.Version, 10, 64)
if err != nil {
cCtx.XLog.Error("Xpoa::NewXpoaSchedule::Parse version error.", "err", err)
return nil
}
func NewXpoaSchedule(xconfig *xpoaConfig, cCtx context.ConsensusCtx, startHeight, version int64) *xpoaSchedule {
s := xpoaSchedule{
address: cCtx.Network.PeerInfo().Account,
period: xconfig.Period,
Expand Down
11 changes: 4 additions & 7 deletions bcs/consensus/xpoa/xpoa.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package xpoa
import (
"bytes"
"encoding/json"
"strconv"
"time"

"github.com/xuperchain/xupercore/kernel/common/xcontext"
Expand Down Expand Up @@ -66,17 +65,15 @@ func NewXpoaConsensus(cCtx cctx.ConsensusCtx, cCfg def.ConsensusConfig) base.Con
cCtx.XLog.Error("consensus:xpoa:NewXpoaConsensus: xpoa struct unmarshal error", "error", err)
return nil
}
//兼容老版本配置文件
if len(xconfig.Version) < 1 {
xconfig.Version = "0"
}
version, err := strconv.ParseInt(xconfig.Version, 10, 64)

version, err := ParseVersion(cCfg.Config)

if err != nil {
cCtx.XLog.Error("consensus:xpoa:NewXpoaConsensus: version error", "error", err)
return nil
}
// create xpoaSchedule
schedule := NewXpoaSchedule(xconfig, cCtx, cCfg.StartHeight)
schedule := NewXpoaSchedule(xconfig, cCtx, cCfg.StartHeight, version)
if schedule == nil {
cCtx.XLog.Error("consensus:xpoa:NewXpoaSchedule error")
return nil
Expand Down
45 changes: 39 additions & 6 deletions kernel/consensus/base/driver/chained-bft/smr.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,11 @@ func (s *Smr) ResetProposerStatus(tipBlock cctx.BlockInterface,
s.mtx.Lock()
defer s.mtx.Unlock()

if bytes.Equal(s.getHighQC().GetProposalId(), tipBlock.GetBlockid()) {
if bytes.Equal(s.getHighQC().GetProposalId(), tipBlock.GetBlockid()) &&
s.validNewHighQC(tipBlock.GetBlockid(), validators) {
// 此处需要获取带签名的完整Justify
return false, s.getCompleteHighQC(), nil
}
// 单个节点不存在投票验证的hotstuff流程,因此返回true
if len(validators) == 1 {
return false, nil, nil
}

// 从当前TipBlock开始往前追溯,交给smr根据状态进行回滚。
// 在本地状态树上找到指代TipBlock的QC,若找不到,则在状态树上找和TipBlock同一分支上的最近值
Expand Down Expand Up @@ -384,10 +381,43 @@ func (s *Smr) ProcessProposal(viewNumber int64, proposalID []byte, parentID []by
}
go s.p2p.SendMessage(createNewBCtx(), netMsg, p2p.WithAccounts(s.removeLocalValidator(validatesIpInfo)))
s.localProposal.Store(utils.F(proposalID), proposal.Timestamp)
s.log.Debug("smr:ProcessProposal::new proposal has been made", "address", s.address, "proposalID", utils.F(proposalID))
// 若为单候选人情况,则此处需要特殊处理,矿工需要给自己提前签名
if len(validatesIpInfo) == 1 {
s.voteToSelf(viewNumber, proposalID, parentQuorumCert)
}
s.log.Debug("smr:ProcessProposal::new proposal has been made", "address", s.address, "proposalID", utils.F(proposalID), "target", validatesIpInfo)
return nil
}

func (s *Smr) voteToSelf(viewNumber int64, proposalID []byte, parent storage.QuorumCertInterface) {
selfVote := &storage.VoteInfo{
ProposalId: proposalID,
ProposalView: viewNumber,
ParentId: parent.GetProposalId(),
}
selfLedgerInfo := &storage.LedgerCommitInfo{
VoteInfoHash: proposalID,
}
selfQC := storage.NewQuorumCert(selfVote, selfLedgerInfo, nil)
selfSign, err := s.cryptoClient.SignVoteMsg(proposalID)
if err != nil {
s.log.Error("smr::voteProposal::voteToSelf error", "err", err)
return
}
s.qcVoteMsgs.LoadOrStore(utils.F(proposalID), []*chainedBftPb.QuorumCertSign{selfSign})
selfNode := &storage.ProposalNode{
In: selfQC,
}
if err := s.qcTree.UpdateQcStatus(selfNode); err != nil {
s.log.Error("smr::voteProposal::updateQcStatus error", "err", err)
return
}
// 更新本地smr状态机
s.pacemaker.AdvanceView(selfQC)
s.qcTree.UpdateHighQC(proposalID)
s.log.Debug("smr:voteProposal::done local voting", "address", s.address, "proposalID", utils.F(proposalID))
}

// reloadJustifyQC 与LibraBFT不同,返回一个指定的parentQC
func (s *Smr) reloadJustifyQC(parentID []byte) (storage.QuorumCertInterface, error) {
// 第一次proposal,highQC==rootQC==genesisQC
Expand Down Expand Up @@ -695,6 +725,9 @@ func (s *Smr) validNewHighQC(inProposalId []byte, validators []string) bool {
if !ok {
return false
}
if len(validators) == 1 {
return len(signs) == len(validators)
}
return s.saftyrules.CalVotesThreshold(len(signs), len(validators))
}

Expand Down

0 comments on commit 0a5e94e

Please sign in to comment.