Skip to content

Commit

Permalink
Merge pull request #1 from rarimo/feature/gist-transfers
Browse files Browse the repository at this point in the history
Adding separate gist and state transfers
  • Loading branch information
olegfomenko committed Nov 24, 2023
2 parents 9f9cd90 + aac4a59 commit 43e97e4
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 135 deletions.
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ require (
github.com/ethereum/go-ethereum v1.10.26
github.com/go-redis/redis/v8 v8.11.5
github.com/gogo/protobuf v1.3.3
github.com/rarimo/rarimo-core v0.0.0-20231004143803-6b209428ecbf
github.com/rarimo/rarimo-core v1.0.8-0.20231107191012-470dd204b863
github.com/rarimo/saver-grpc-lib v1.0.0
github.com/spf13/cast v1.5.1
github.com/tendermint/tendermint v0.34.27
gitlab.com/distributed_lab/figure v2.1.0+incompatible
gitlab.com/distributed_lab/kit v1.11.1
gitlab.com/distributed_lab/logan v3.8.1+incompatible
gitlab.com/distributed_lab/running v0.0.0-20200706131153-4af0e83eb96c
google.golang.org/grpc v1.58.0
google.golang.org/grpc v1.59.0
)

require (
Expand Down Expand Up @@ -149,15 +149,15 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/term v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
54 changes: 27 additions & 27 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Run(args []string) bool {
runStateUpdatesAll := func() {
cfg.Log().Info("starting all state updates related services")

run(voting.RunStateUpdateVoter, "voter")
run(voting.RunVoter, "voter")
run(grpc.RunAPI, "grpc-api")
run(evm.RunStateChangeListener, "state-change-listener")
}
Expand All @@ -84,7 +84,7 @@ func Run(args []string) bool {
case stateUpdateAll.FullCommand():
runStateUpdatesAll()
case stateUpdateVoter.FullCommand():
run(voting.RunStateUpdateVoter, "voter")
run(voting.RunVoter, "voter")
case stateUpdateSaver.FullCommand():
run(evm.RunStateChangeListener, "state-change-listener")
default:
Expand Down
109 changes: 75 additions & 34 deletions internal/rarimo/stateupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewStateUpdateMessageMaker(txCreatorAddr string, contract string, homeChain
return &StateUpdateMessageMaker{txCreatorAddr: txCreatorAddr, contract: contract, homeChain: homeChain, stateDataProvider: stateDataProvider}
}

func (m *StateUpdateMessageMaker) StateUpdateMsgByBlock(ctx context.Context, issuer, block *big.Int) (*oracletypes.MsgCreateIdentityDefaultTransferOp, error) {
func (m *StateUpdateMessageMaker) StateUpdateMsgByBlock(ctx context.Context, issuer, block *big.Int) (*oracletypes.MsgCreateIdentityStateTransferOp, error) {
latestState, replacedState, err := m.getStatesOnBlock(ctx, issuer, block)
if err != nil {
return nil, err
Expand All @@ -45,30 +45,23 @@ func (m *StateUpdateMessageMaker) StateUpdateMsgByBlock(ctx context.Context, iss
return nil, nil
}

length, err := m.stateDataProvider.GetGISTRootHistoryLength(&bind.CallOpts{
Context: ctx,
})
if err != nil {
return nil, errors.Wrap(err, "failed to get overall gists count")
}

// FIXME: Possible bug: GIST could be changed during our logic processing
// Can be done using the same logic as with states
gists, err := m.stateDataProvider.GetGISTRootHistory(&bind.CallOpts{
Context: ctx,
}, new(big.Int).Sub(length, big.NewInt(2)), big.NewInt(2))
return m.StateUpdateMsgByStates(ctx, *latestState, *replacedState)
}

func (m *StateUpdateMessageMaker) GISTUpdateMsgByBlock(ctx context.Context, block *big.Int) (*oracletypes.MsgCreateIdentityGISTTransferOp, error) {
latestGIST, replacedGIST, err := m.getGISTsOnBlock(ctx, block)
if err != nil {
return nil, errors.Wrap(err, "failed to get last two gist")
return nil, err
}

replacedGIST := gists[0]
latestGIST := gists[1]
if latestGIST == nil || replacedGIST == nil {
return nil, nil
}

return m.StateUpdateMsgByStates(ctx, *latestState, *replacedState, latestGIST, replacedGIST)
return m.GISTUpdateMsgByGISTs(ctx, *latestGIST, *replacedGIST)
}

func (m *StateUpdateMessageMaker) StateUpdateMsgByHashes(ctx context.Context, issuer, latestStateHash, replacedStateHash, latestGISTHash, replacedGISTHash string) (*oracletypes.MsgCreateIdentityDefaultTransferOp, error) {
func (m *StateUpdateMessageMaker) StateUpdateMsgByHashes(ctx context.Context, issuer, latestStateHash, replacedStateHash string) (*oracletypes.MsgCreateIdentityStateTransferOp, error) {
latestState, err := m.stateDataProvider.GetStateInfoByIdAndState(&bind.CallOpts{
Context: ctx,
}, new(big.Int).SetBytes(hexutil.MustDecode(issuer)), new(big.Int).SetBytes(hexutil.MustDecode(latestStateHash)))
Expand All @@ -83,49 +76,58 @@ func (m *StateUpdateMessageMaker) StateUpdateMsgByHashes(ctx context.Context, is
return nil, errors.Wrap(err, "failed to get replaced state")
}

return m.StateUpdateMsgByStates(ctx, latestState, replacedState)
}

func (m *StateUpdateMessageMaker) GISTUpdateMsgByHashes(ctx context.Context, latestGISTHash, replacedGISTHash string) (*oracletypes.MsgCreateIdentityGISTTransferOp, error) {
latestGIST, err := m.stateDataProvider.GetGISTRootInfo(&bind.CallOpts{
Context: ctx,
}, new(big.Int).SetBytes(hexutil.MustDecode(latestGISTHash)))
if err != nil {
return nil, errors.Wrap(err, "failed to get latest gist")
return nil, errors.Wrap(err, "failed to get latest state")
}

replacedGIST, err := m.stateDataProvider.GetGISTRootInfo(&bind.CallOpts{
Context: ctx,
}, new(big.Int).SetBytes(hexutil.MustDecode(replacedGISTHash)))
if err != nil {
return nil, errors.Wrap(err, "failed to get replaced gist")
return nil, errors.Wrap(err, "failed to get replaced state")
}

return m.StateUpdateMsgByStates(ctx, latestState, replacedState, latestGIST, replacedGIST)
return m.GISTUpdateMsgByGISTs(ctx, latestGIST, replacedGIST)
}

func (m *StateUpdateMessageMaker) StateUpdateMsgByStates(_ context.Context, latestState, replacedState statebind.IStateStateInfo, latestGIST, replacedGIST statebind.IStateGistRootInfo) (*oracletypes.MsgCreateIdentityDefaultTransferOp, error) {
func (m *StateUpdateMessageMaker) StateUpdateMsgByStates(_ context.Context, latestState, replacedState statebind.IStateStateInfo) (*oracletypes.MsgCreateIdentityStateTransferOp, error) {
if latestState.State.Cmp(replacedState.ReplacedByState) != 0 {
return nil, errors.New("replaced state does not correspond latest state")
}

if latestGIST.Root.Cmp(replacedGIST.ReplacedByRoot) != 0 {
return nil, errors.New("replaced gist does not correspond latest gist")
}

return &oracletypes.MsgCreateIdentityDefaultTransferOp{
return &oracletypes.MsgCreateIdentityStateTransferOp{
Creator: m.txCreatorAddr,
Contract: m.contract,
Chain: m.homeChain,
Id: hexutil.Encode(latestState.Id.Bytes()), // should be issuer id only
GISTHash: hexutil.Encode(latestGIST.Root.Bytes()),
StateHash: hexutil.Encode(latestState.State.Bytes()),
StateCreatedAtTimestamp: latestState.CreatedAtTimestamp.String(),
StateCreatedAtBlock: latestState.CreatedAtBlock.String(),
StateReplacedBy: hexutil.Encode(latestState.ReplacedByState.Bytes()),
GISTReplacedBy: hexutil.Encode(latestGIST.ReplacedByRoot.Bytes()),
GISTCreatedAtTimestamp: latestGIST.CreatedAtTimestamp.String(),
GISTCreatedAtBlock: latestGIST.CreatedAtBlock.String(),
ReplacedStateHash: hexutil.Encode(replacedState.State.Bytes()),
ReplacedGISTtHash: hexutil.Encode(replacedGIST.Root.Bytes()),
}, nil
}

func (m *StateUpdateMessageMaker) GISTUpdateMsgByGISTs(_ context.Context, latestGIST, replacedGIST statebind.IStateGistRootInfo) (*oracletypes.MsgCreateIdentityGISTTransferOp, error) {
if latestGIST.Root.Cmp(replacedGIST.ReplacedByRoot) != 0 {
return nil, errors.New("replaced gist does not correspond latest state")
}

return &oracletypes.MsgCreateIdentityGISTTransferOp{
Creator: m.txCreatorAddr,
Contract: m.contract,
Chain: m.homeChain,
GISTHash: hexutil.Encode(latestGIST.Root.Bytes()),
GISTCreatedAtTimestamp: latestGIST.CreatedAtTimestamp.String(),
GISTCreatedAtBlock: latestGIST.CreatedAtBlock.String(),
ReplacedGISTtHash: hexutil.Encode(replacedGIST.Root.Bytes()),
}, nil
}

func (m *StateUpdateMessageMaker) getStatesOnBlock(ctx context.Context, issuer, block *big.Int) (*statebind.IStateStateInfo, *statebind.IStateStateInfo, error) {
Expand Down Expand Up @@ -162,7 +164,46 @@ func (m *StateUpdateMessageMaker) getStatesOnBlock(ctx context.Context, issuer,
length.Sub(length, big.NewInt(1))

if length.Cmp(big.NewInt(1)) == 0 {
return nil, nil, errors.Wrap(err, "requested state on block does noe exist")
return nil, nil, errors.Wrap(err, "requested state on block does not exist")
}
}
}

func (m *StateUpdateMessageMaker) getGISTsOnBlock(ctx context.Context, block *big.Int) (*statebind.IStateGistRootInfo, *statebind.IStateGistRootInfo, error) {
length, err := m.stateDataProvider.GetGISTRootHistoryLength(&bind.CallOpts{
Context: ctx,
})
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get overall gists count")
}

if length.Cmp(big.NewInt(1)) == 0 {
// Only one state transition exist. Ignore that state transition.
// FIXME
return nil, nil, nil
}

for {
gists, err := m.stateDataProvider.GetGISTRootHistory(&bind.CallOpts{
Context: ctx,
}, new(big.Int).Sub(length, big.NewInt(2)), big.NewInt(2))

if err != nil {
return nil, nil, errors.Wrap(err, "failed to get last two gists")
}

replacedGIST := gists[0]
latestGIST := gists[1]

if latestGIST.CreatedAtBlock.Cmp(block) == 0 {
return &latestGIST, &replacedGIST, nil
}

// FIXME maybe increase step to reduce RPC calls amount
length.Sub(length, big.NewInt(1))

if length.Cmp(big.NewInt(1)) == 0 {
return nil, nil, errors.Wrap(err, "requested gist on block does not exist")
}
}
}
53 changes: 38 additions & 15 deletions internal/services/evm/stateChangeListener.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ func RunStateChangeListener(ctx context.Context, cfg config.Config) {
panic(errors.Wrap(err, "failed to init state change handler"))
}

filtrationDisabled := cfg.States().DisableFiltration
allowList := Map(cfg.States().IssuerID)

filter := func(id string) bool {
if filtrationDisabled {
return true
}
_, ok := allowList[id]
return ok
}

listener := stateChangeListener{
log: log,
broadcaster: cfg.Broadcaster(),
Expand All @@ -45,10 +56,9 @@ func RunStateChangeListener(ctx context.Context, cfg config.Config) {
cfg.Ethereum().NetworkName,
stateData,
),
watchedIssuerID: Map(cfg.States().IssuerID),
disableFiltration: cfg.States().DisableFiltration,
fromBlock: cfg.Ethereum().StartFromBlock,
blockWindow: cfg.Ethereum().BlockWindow,
filter: filter,
fromBlock: cfg.Ethereum().StartFromBlock,
blockWindow: cfg.Ethereum().BlockWindow,
}

running.WithBackOff(ctx, log, runnerName,
Expand All @@ -57,7 +67,8 @@ func RunStateChangeListener(ctx context.Context, cfg config.Config) {
}

type stateUpdateMsger interface {
StateUpdateMsgByBlock(ctx context.Context, issuer, block *big.Int) (*oracletypes.MsgCreateIdentityDefaultTransferOp, error)
StateUpdateMsgByBlock(ctx context.Context, issuer, block *big.Int) (*oracletypes.MsgCreateIdentityStateTransferOp, error)
GISTUpdateMsgByBlock(ctx context.Context, block *big.Int) (*oracletypes.MsgCreateIdentityGISTTransferOp, error)
}

type blockHandler interface {
Expand All @@ -71,10 +82,9 @@ type stateChangeListener struct {
msger stateUpdateMsger
blockHandler blockHandler

watchedIssuerID map[string]struct{}
disableFiltration bool
fromBlock uint64
blockWindow uint64
filter func(string) bool
fromBlock uint64
blockWindow uint64
}

func (l *stateChangeListener) subscription(ctx context.Context) error {
Expand Down Expand Up @@ -132,14 +142,27 @@ func (l *stateChangeListener) subscription(ctx context.Context) error {
"log_index": e.Raw.Index,
}).Debugf("got event: id: %s block: %s timestamp: %s state: %s", e.Id.String(), e.BlockN.String(), e.Timestamp.String(), e.State.String())

if !l.disableFiltration {
if _, ok := l.watchedIssuerID[e.Id.String()]; !ok {
l.log.Debugf("Skipping event: id %s is not allowed", e.Id.String())
continue
}
msg1, err := l.msger.GISTUpdateMsgByBlock(ctx, e.BlockN)
if err != nil {
l.log.WithError(err).WithField("tx_hash", e.Raw.TxHash.String()).Error("failed to craft GIST updated msg")
continue
}

if msg1 == nil {
l.log.WithField("tx_hash", e.Raw.TxHash.String()).Info("ignoring that GIST transition")
continue
}

if err := l.broadcaster.BroadcastTx(ctx, msg1); err != nil {
l.log.WithError(err).WithField("tx_hash", e.Raw.TxHash.String()).Error(err, "failed to broadcast GIST updated msg")
continue
}

if !l.filter(e.Id.String()) {
l.log.WithField("tx_hash", e.Raw.TxHash.String()).Info("Issuer ID is not supported for state update messages")
return nil
}

// Getting last state message
msg, err := l.msger.StateUpdateMsgByBlock(ctx, e.Id, e.BlockN)
if err != nil {
l.log.WithError(err).WithField("tx_hash", e.Raw.TxHash.String()).Error("failed to craft state updated msg")
Expand Down
5 changes: 3 additions & 2 deletions internal/services/grpc/grpc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ func RunAPI(ctx context.Context, cfg config.Config) {
listener: cfg.Listener(),
voter: voter.NewVoter(
cfg.Ethereum().NetworkName,
cfg.Log().WithField("who", "evm-saver-voter"),
cfg.Log().WithField("who", "grpc-voter"),
cfg.Broadcaster(),
map[rarimotypes.OpType]voter.Verifier{
rarimotypes.OpType_IDENTITY_DEFAULT_TRANSFER: voting.NewStateUpdateVerifier(cfg),
rarimotypes.OpType_IDENTITY_STATE_TRANSFER: voting.NewStateUpdateVerifier(cfg),
rarimotypes.OpType_IDENTITY_GIST_TRANSFER: voting.NewGISTUpdateVerifier(cfg),
},
),
})
Expand Down
55 changes: 55 additions & 0 deletions internal/services/voting/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package voting

import (
"context"
"sync"
"time"

"github.com/rarimo/evm-identity-saver-svc/internal/config"
rarimocore "github.com/rarimo/rarimo-core/x/rarimocore/types"
"github.com/rarimo/saver-grpc-lib/voter"
"gitlab.com/distributed_lab/running"
)

const (
OpQueryGISTUpdate = "tm.event='Tx' AND new_operation.operation_type='IDENTITY_GIST_TRANSFER'"
OpQueryStateUpdate = "tm.event='Tx' AND new_operation.operation_type='IDENTITY_STATE_TRANSFER'"
)

func RunVoter(ctx context.Context, cfg config.Config) {
gistV := NewGISTUpdateVerifier(cfg)
stateV := NewStateUpdateVerifier(cfg)

v := voter.NewVoter(cfg.Ethereum().NetworkName, cfg.Log(), cfg.Broadcaster(), map[rarimocore.OpType]voter.Verifier{
rarimocore.OpType_IDENTITY_STATE_TRANSFER: stateV,
rarimocore.OpType_IDENTITY_GIST_TRANSFER: gistV,
})

// catchup tends to panic on startup and doesn't handle it by itself, so we wrap it into retry loop
running.UntilSuccess(ctx, cfg.Log(), "voter-catchup", func(ctx context.Context) (bool, error) {
voter.
NewCatchupper(cfg.Cosmos(), v, cfg.Log()).
Run(ctx)

return true, nil
}, 1*time.Second, 5*time.Second)

wg := sync.WaitGroup{}
wg.Add(2)

go func() {
defer wg.Done()
voter.
NewSubscriber(v, cfg.Tendermint(), cfg.Cosmos(), OpQueryStateUpdate, cfg.Log(), cfg.Subscriber()).
Run(ctx)
}()

go func() {
defer wg.Done()
voter.
NewSubscriber(v, cfg.Tendermint(), cfg.Cosmos(), OpQueryGISTUpdate, cfg.Log(), cfg.Subscriber()).
Run(ctx)
}()

wg.Wait()
}
Loading

0 comments on commit 43e97e4

Please sign in to comment.