Skip to content

Commit

Permalink
Merge PR: deduplicate (#1514)
Browse files Browse the repository at this point in the history
add test case

Co-authored-by: KamiD <[email protected]>
  • Loading branch information
ItsFunny and KamiD authored Jan 30, 2022
1 parent f460333 commit 2162ce3
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 1 deletion.
126 changes: 125 additions & 1 deletion x/evm/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func (w *Watcher) Commit() {
batch := w.batch
w.dispatchJob(func() { w.commitBatch(batch) })

// we dont do deduplicatie here,we do it in `commit routine`
// get centerBatch for sending to DataCenter
ddsBatch := make([]*Batch, len(batch))
for i, b := range batch {
Expand Down Expand Up @@ -412,7 +413,12 @@ func (w *Watcher) CommitWatchData(data WatchData) {
}

func (w *Watcher) commitBatch(batch []WatchMessage) {
filterMap := make(map[string]WatchMessage)
for _, b := range batch {
filterMap[bytes2Key(b.GetKey())] = b
}

for _, b := range filterMap {
key := b.GetKey()
value := []byte(b.GetValue())
typeValue := b.GetType()
Expand Down Expand Up @@ -472,7 +478,8 @@ func (w *Watcher) GetWatchDataFunc() func() ([]byte, error) {
value.DelayEraseKey = w.delayEraseKey

return func() ([]byte, error) {
valueByte, err := value.MarshalToAmino(nil)
filterWatcher := filterCopy(value)
valueByte, err := filterWatcher.MarshalToAmino(nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -526,6 +533,123 @@ func (w *Watcher) CheckWatchDB(keys [][]byte, mode string) {
w.log.Info("watchDB delta", "mode", mode, "height", w.height, "hash", hex.EncodeToString(kvHash.Sum(nil)), "kv", output)
}

func bytes2Key(keyBytes []byte) string {
return string(keyBytes)
}

func key2Bytes(key string) []byte {
return []byte(key)
}

func filterCopy(origin *WatchData) *WatchData {
return &WatchData{
DirtyAccount: filterAccount(origin.DirtyAccount),
Batches: filterBatch(origin.Batches),
DelayEraseKey: filterDelayEraseKey(origin.DelayEraseKey),
BloomData: filterBloomData(origin.BloomData),
DirtyList: filterDirtyList(origin.DirtyList),
}
}

func filterAccount(accounts []*sdk.AccAddress) []*sdk.AccAddress {
if len(accounts) == 0 {
return nil
}

filterAccountMap := make(map[string]*sdk.AccAddress)
for _, account := range accounts {
filterAccountMap[bytes2Key(account.Bytes())] = account
}

ret := make([]*sdk.AccAddress, len(filterAccountMap))
i := 0
for _, acc := range filterAccountMap {
ret[i] = acc
i++
}

return ret
}

func filterBatch(datas []*Batch) []*Batch {
if len(datas) == 0 {
return nil
}

filterBatch := make(map[string]*Batch)
for _, b := range datas {
filterBatch[bytes2Key(b.Key)] = b
}

ret := make([]*Batch, len(filterBatch))
i := 0
for _, b := range filterBatch {
ret[i] = b
i++
}

return ret
}

func filterDelayEraseKey(datas [][]byte) [][]byte {
if len(datas) == 0 {
return nil
}

filterDelayEraseKey := make(map[string][]byte, 0)
for _, b := range datas {
filterDelayEraseKey[bytes2Key(b)] = b
}

ret := make([][]byte, len(filterDelayEraseKey))
i := 0
for _, k := range filterDelayEraseKey {
ret[i] = k
i++
}

return ret
}
func filterBloomData(datas []*evmtypes.KV) []*evmtypes.KV {
if len(datas) == 0 {
return nil
}

filterBloomData := make(map[string]*evmtypes.KV, 0)
for _, k := range datas {
filterBloomData[bytes2Key(k.Key)] = k
}

ret := make([]*evmtypes.KV, len(filterBloomData))
i := 0
for _, k := range filterBloomData {
ret[i] = k
i++
}

return ret
}

func filterDirtyList(datas [][]byte) [][]byte {
if len(datas) == 0 {
return nil
}

filterDirtyList := make(map[string][]byte, 0)
for _, k := range datas {
filterDirtyList[bytes2Key(k)] = k
}

ret := make([][]byte, len(filterDirtyList))
i := 0
for _, k := range filterDirtyList {
ret[i] = k
i++
}

return ret
}

/////////// job
func (w *Watcher) jobRoutine() {
if !w.Enabled() {
Expand Down
86 changes: 86 additions & 0 deletions x/evm/watcher/watcher_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package watcher_test

import (
"encoding/hex"
"fmt"
"github.com/okex/exchain/libs/cosmos-sdk/x/auth"
"github.com/okex/exchain/libs/tendermint/libs/log"
"math/big"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -308,3 +313,84 @@ func TestDeployAndCallContract(t *testing.T) {

testWatchData(t, w)
}

type mockDuplicateAccount struct {
*auth.BaseAccount
Addr byte
Seq byte
}

func (a *mockDuplicateAccount) GetAddress() sdk.AccAddress {
return []byte{a.Addr}
}

func newMockAccount(byteAddr, seq byte) *mockDuplicateAccount {
ret := &mockDuplicateAccount{Addr: byteAddr, Seq: seq}
pubkey := secp256k1.GenPrivKey().PubKey()
addr := sdk.AccAddress(pubkey.Address())
baseAcc := auth.NewBaseAccount(addr, nil, pubkey, 0, 0)
ret.BaseAccount = baseAcc
return ret
}

func TestDuplicateAddress(t *testing.T) {
accAdds := make([]*sdk.AccAddress, 0)
for i := 0; i < 10; i++ {
adds := hex.EncodeToString([]byte(fmt.Sprintf("addr-%d", i)))
a, _ := sdk.AccAddressFromHex(adds)
accAdds = append(accAdds, &a)
}
adds := hex.EncodeToString([]byte(fmt.Sprintf("addr-%d", 1)))
a, _ := sdk.AccAddressFromHex(adds)
accAdds = append(accAdds, &a)
filterM := make(map[string]struct{})
count := 0
for _, add := range accAdds {
_, exist := filterM[string(add.Bytes())]
if exist {
count++
continue
}
filterM[string(add.Bytes())] = struct{}{}
}
require.Equal(t, 1, count)
}

func TestDuplicateWatchMessage(t *testing.T) {
w := setupTest()
a1 := newMockAccount(1, 1)
w.app.EvmKeeper.Watcher.SaveAccount(a1, true)
a2 := newMockAccount(1, 2)
w.app.EvmKeeper.Watcher.SaveAccount(a2, true)
w.app.EvmKeeper.Watcher.Commit()
time.Sleep(time.Second)
store := watcher.InstanceOfWatchStore()
pWd := getDBKV(store)
require.Equal(t, 1, len(pWd))
}

func TestWriteLatestMsg(t *testing.T) {
viper.Set(watcher.FlagFastQuery, true)
viper.Set(watcher.FlagDBBackend, "memdb")
w := watcher.NewWatcher(log.NewTMLogger(os.Stdout))
w.SetWatchDataFunc()

a1 := newMockAccount(1, 1)
a11 := newMockAccount(1, 2)
a111 := newMockAccount(1, 3)
w.SaveAccount(a1, true)
w.SaveAccount(a11, true)
w.SaveAccount(a111, true)
w.Commit()
time.Sleep(time.Second)
store := watcher.InstanceOfWatchStore()
pWd := getDBKV(store)
require.Equal(t, 1, len(pWd))

m := watcher.NewMsgAccount(a1)
v, err := store.Get(m.GetKey())
require.NoError(t, err)
mm := make(map[string]interface{})
json.Unmarshal(v, &mm)
require.Equal(t, 3, int(mm["Seq"].(float64)))
}

0 comments on commit 2162ce3

Please sign in to comment.