Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Merge pull request #145 from vulcanize/contract-watcher-logging
Browse files Browse the repository at this point in the history
Increase logging in contract watcher
  • Loading branch information
rmulhol authored Oct 2, 2019
2 parents b4e16c4 + 4505382 commit 0310431
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 126 deletions.
8 changes: 7 additions & 1 deletion pkg/contract_watcher/full/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package transformer
import (
"errors"

"github.com/sirupsen/logrus"

"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/converter"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/full/retriever"
Expand Down Expand Up @@ -106,7 +108,11 @@ func (tr *Transformer) Init() error {

// Get contract name if it has one
var name = new(string)
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock)
pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, tr.LastBlock)
if pollingErr != nil {
// can't return this error because "name" might not exist on the contract
logrus.Warnf("error fetching contract data: %s", pollingErr.Error())
}

// Remove any potential accidental duplicate inputs in arg filter values
eventArgs := map[string]bool{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/contract_watcher/header/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (c *Converter) Convert(logs []gethTypes.Log, event types.Event, headerID in

// Convert the given watched event logs into types.Logs; returns a map of event names to a slice of their converted logs
func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.Event, headerID int64) (map[string][]types.Log, error) {
contract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
boundContract := bind.NewBoundContract(common.HexToAddress(c.ContractInfo.Address), c.ContractInfo.ParsedAbi, nil, nil, nil)
eventsToLogs := make(map[string][]types.Log)
for _, event := range events {
eventsToLogs[event.Name] = make([]types.Log, 0, len(logs))
Expand All @@ -141,7 +141,7 @@ func (c *Converter) ConvertBatch(logs []gethTypes.Log, events map[string]types.E
// If the log is of this event type, process it as such
if event.Sig() == log.Topics[0] {
values := make(map[string]interface{})
err := contract.UnpackLogIntoMap(values, event.Name, log)
err := boundContract.UnpackLogIntoMap(values, event.Name, log)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/contract_watcher/header/repository/header_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/hashicorp/golang-lru"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"

"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
Expand Down Expand Up @@ -148,7 +149,10 @@ func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids [
pgStr = pgStr[:len(pgStr)-2]
_, err = tx.Exec(pgStr, header.Id)
if err != nil {
tx.Rollback()
rollbackErr := tx.Rollback()
if rollbackErr != nil {
logrus.Warnf("error rolling back transaction: %s", rollbackErr.Error())
}
return err
}
}
Expand Down Expand Up @@ -246,6 +250,7 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock
// Returns a continuous set of headers
func continuousHeaders(headers []core.Header) []core.Header {
if len(headers) < 1 {
logrus.Trace("no headers to arrange continuously")
return headers
}
previousHeader := headers[0].BlockNumber
Expand Down
14 changes: 7 additions & 7 deletions pkg/contract_watcher/header/repository/header_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ var _ = Describe("Repository", func() {
h1 := missingHeaders[0]
h2 := missingHeaders[1]
h3 := missingHeaders[2]
Expect(h1.BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber)))
Expect(h2.BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber)))
Expect(h3.BlockNumber).To(Equal(int64(mocks.MockHeader3.BlockNumber)))
Expect(h1.BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
Expect(h2.BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
Expect(h3.BlockNumber).To(Equal(mocks.MockHeader3.BlockNumber))
})

It("Returns only contiguous chunks of headers", func() {
Expand All @@ -150,8 +150,8 @@ var _ = Describe("Repository", func() {
missingHeaders, err := contractHeaderRepo.MissingHeaders(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber)))
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber)))
Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
})

It("Fails if eventID does not yet exist in check_headers table", func() {
Expand Down Expand Up @@ -199,8 +199,8 @@ var _ = Describe("Repository", func() {
missingHeaders, err := contractHeaderRepo.MissingHeadersForAll(mocks.MockHeader1.BlockNumber, mocks.MockHeader4.BlockNumber, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(mocks.MockHeader1.BlockNumber)))
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(mocks.MockHeader2.BlockNumber)))
Expect(missingHeaders[0].BlockNumber).To(Equal(mocks.MockHeader1.BlockNumber))
Expect(missingHeaders[1].BlockNumber).To(Equal(mocks.MockHeader2.BlockNumber))
})

It("returns headers after starting header if starting header not missing", func() {
Expand Down
18 changes: 12 additions & 6 deletions pkg/contract_watcher/header/retriever/block_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ var _ = Describe("Block Retriever", func() {

Describe("RetrieveFirstBlock", func() {
It("Retrieves block number of earliest header in the database", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
_, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
Expect(err).ToNot(HaveOccurred())

i, err := r.RetrieveFirstBlock()
Expect(err).NotTo(HaveOccurred())
Expand All @@ -61,9 +64,12 @@ var _ = Describe("Block Retriever", func() {

Describe("RetrieveMostRecentBlock", func() {
It("Retrieves the latest header's block number", func() {
headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
_, err := headerRepository.CreateOrUpdateHeader(mocks.MockHeader1)
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader2)
Expect(err).ToNot(HaveOccurred())
_, err = headerRepository.CreateOrUpdateHeader(mocks.MockHeader3)
Expect(err).ToNot(HaveOccurred())

i, err := r.RetrieveMostRecentBlock()
Expect(err).ToNot(HaveOccurred())
Expand Down
102 changes: 56 additions & 46 deletions pkg/contract_watcher/header/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package transformer

import (
"errors"
"fmt"
"strings"

"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/sirupsen/logrus"

"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/converter"
Expand Down Expand Up @@ -107,22 +109,22 @@ func (tr *Transformer) Init() error {
// Configure Abi
if tr.Config.Abis[contractAddr] == "" {
// If no abi is given in the config, this method will try fetching from internal look-up table and etherscan
err := tr.Parser.Parse(contractAddr)
if err != nil {
return err
parseErr := tr.Parser.Parse(contractAddr)
if parseErr != nil {
return fmt.Errorf("error parsing contract by address: %s", parseErr.Error())
}
} else {
// If we have an abi from the config, load that into the parser
err := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr])
if err != nil {
return err
parseErr := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr])
if parseErr != nil {
return fmt.Errorf("error parsing contract abi: %s", parseErr.Error())
}
}

// Get first block and most recent block number in the header repo
firstBlock, err := tr.Retriever.RetrieveFirstBlock()
if err != nil {
return err
firstBlock, retrieveErr := tr.Retriever.RetrieveFirstBlock()
if retrieveErr != nil {
return fmt.Errorf("error retrieving first block: %s", retrieveErr.Error())
}

// Set to specified range if it falls within the bounds
Expand All @@ -132,7 +134,11 @@ func (tr *Transformer) Init() error {

// Get contract name if it has one
var name = new(string)
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1)
pollingErr := tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1)
if pollingErr != nil {
// can't return this error because "name" might not exist on the contract
logrus.Warnf("error fetching contract data: %s", pollingErr.Error())
}

// Remove any potential accidental duplicate inputs
eventArgs := map[string]bool{}
Expand Down Expand Up @@ -165,9 +171,9 @@ func (tr *Transformer) Init() error {
tr.sortedEventIds[con.Address] = make([]string, 0, len(con.Events))
for _, event := range con.Events {
eventId := strings.ToLower(event.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(eventId)
if err != nil {
return err
addColumnErr := tr.HeaderRepository.AddCheckColumn(eventId)
if addColumnErr != nil {
return fmt.Errorf("error adding check column: %s", addColumnErr.Error())
}
// Keep track of this event id; sorted and unsorted
tr.sortedEventIds[con.Address] = append(tr.sortedEventIds[con.Address], eventId)
Expand All @@ -180,9 +186,9 @@ func (tr *Transformer) Init() error {
tr.sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods))
for _, m := range con.Methods {
methodId := strings.ToLower(m.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(methodId)
if err != nil {
return err
addColumnErr := tr.HeaderRepository.AddCheckColumn(methodId)
if addColumnErr != nil {
return fmt.Errorf("error adding check column: %s", addColumnErr.Error())
}
tr.sortedMethodIds[con.Address] = append(tr.sortedMethodIds[con.Address], methodId)
}
Expand All @@ -202,9 +208,9 @@ func (tr *Transformer) Execute() error {
}

// Find unchecked headers for all events across all contracts; these are returned in asc order
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds)
if err != nil {
return err
missingHeaders, missingHeadersErr := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds)
if missingHeadersErr != nil {
return fmt.Errorf("error getting missing headers: %s", missingHeadersErr.Error())
}

// Iterate over headers
Expand All @@ -216,23 +222,24 @@ func (tr *Transformer) Execute() error {
// Map to sort batch fetched logs by which contract they belong to, for post fetch processing
sortedLogs := make(map[string][]gethTypes.Log)
// And fetch all event logs across contracts at this header
allLogs, err := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header)
if err != nil {
return err
allLogs, fetchErr := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header)
if fetchErr != nil {
return fmt.Errorf("error fetching logs: %s", fetchErr.Error())
}

// If no logs are found mark the header checked for all of these eventIDs
// and continue to method polling and onto the next iteration
if len(allLogs) < 1 {
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds)
if err != nil {
return err
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds)
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
err = tr.methodPolling(header, tr.sortedMethodIds)
if err != nil {
return err
pollingErr := tr.methodPolling(header, tr.sortedMethodIds)
if pollingErr != nil {
return fmt.Errorf("error polling methods: %s", pollingErr.Error())
}
tr.Start = header.BlockNumber + 1 // Empty header; setup to start at the next header
logrus.Tracef("no logs found for block %d, continuing", header.BlockNumber)
continue
}

Expand All @@ -245,41 +252,43 @@ func (tr *Transformer) Execute() error {
// Process logs for each contract
for conAddr, logs := range sortedLogs {
if logs == nil {
logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber)
continue
}
// Configure converter with this contract
con := tr.Contracts[conAddr]
tr.Converter.Update(con)

// Convert logs into batches of log mappings (eventName => []types.Logs
convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
if err != nil {
return err
convertedLogs, convertErr := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
if convertErr != nil {
return fmt.Errorf("error converting logs: %s", convertErr.Error())
}
// Cycle through each type of event log and persist them
for eventName, logs := range convertedLogs {
// If logs for this event are empty, mark them checked at this header and continue
if len(logs) < 1 {
eventId := strings.ToLower(eventName + "_" + con.Address)
err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
if err != nil {
return err
markCheckedErr := tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
logrus.Tracef("no logs found for event %s on contract %s at block %d, continuing", eventName, conAddr, header.BlockNumber)
continue
}
// If logs aren't empty, persist them
// Header is marked checked in the transactions
err = tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
if err != nil {
return err
persistErr := tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
if persistErr != nil {
return fmt.Errorf("error persisting logs: %s", persistErr.Error())
}
}
}

// Poll contracts at this block height
err = tr.methodPolling(header, tr.sortedMethodIds)
if err != nil {
return err
pollingErr := tr.methodPolling(header, tr.sortedMethodIds)
if pollingErr != nil {
return fmt.Errorf("error polling methods: %s", pollingErr.Error())
}
// Success; setup to start at the next header
tr.Start = header.BlockNumber + 1
Expand All @@ -294,19 +303,20 @@ func (tr *Transformer) methodPolling(header core.Header, sortedMethodIds map[str
// Skip method polling processes if no methods are specified
// Also don't try to poll methods below this contract's specified starting block
if len(con.Methods) == 0 || header.BlockNumber < con.StartingBlock {
logrus.Tracef("not polling contract: %s", con.Address)
continue
}

// Poll all methods for this contract at this header
err := tr.Poller.PollContractAt(*con, header.BlockNumber)
if err != nil {
return err
pollingErr := tr.Poller.PollContractAt(*con, header.BlockNumber)
if pollingErr != nil {
return fmt.Errorf("error polling contract %s: %s", con.Address, pollingErr.Error())
}

// Mark this header checked for the methods
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address])
if err != nil {
return err
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address])
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/contract_watcher/header/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var _ = Describe("Transformer", func() {
err := t.Init()

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error()))
})
})

Expand Down Expand Up @@ -109,7 +109,7 @@ var _ = Describe("Transformer", func() {
err := t.Init()

Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
Expect(err.Error()).To(ContainSubstring(fakes.FakeError.Error()))
})
})
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/contract_watcher/shared/constants/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
var SupportsInterfaceABI = `[{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}]`

// Individual event interfaces for constructing ABI from
var SupportsInterace = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}`
var SupportsInterface = `{"constant":true,"inputs":[{"name":"interfaceID","type":"bytes4"}],"name":"supportsInterface","outputs":[{"name":"","type":"bool"}],"payable":false,"type":"function"}`
var AddrChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"a","type":"address"}],"name":"AddrChanged","type":"event"}`
var ContentChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"hash","type":"bytes32"}],"name":"ContentChanged","type":"event"}`
var NameChangeInterface = `{"anonymous":false,"inputs":[{"indexed":true,"name":"node","type":"bytes32"},{"indexed":false,"name":"name","type":"string"}],"name":"NameChanged","type":"event"}`
Expand Down
2 changes: 1 addition & 1 deletion pkg/contract_watcher/shared/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newFetcherError(err error, fetchMethod string) *fetcherError {

// Fetcher struct
type Fetcher struct {
BlockChain core.BlockChain // Underyling Blockchain
BlockChain core.BlockChain // Underlying Blockchain
}

// Fetcher error
Expand Down
4 changes: 2 additions & 2 deletions pkg/contract_watcher/shared/helpers/test_helpers/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func SetupDBandBC() (*postgres.DB, core.BlockChain) {
rpcClient := client.NewRpcClient(rawRpcClient, infuraIPC)
ethClient := ethclient.NewClient(rawRpcClient)
blockChainClient := client.NewEthClient(ethClient)
node := node.MakeNode(rpcClient)
madeNode := node.MakeNode(rpcClient)
transactionConverter := rpc2.NewRpcTransactionConverter(ethClient)
blockChain := geth.NewBlockChain(blockChainClient, rpcClient, node, transactionConverter)
blockChain := geth.NewBlockChain(blockChainClient, rpcClient, madeNode, transactionConverter)

db, err := postgres.NewDB(config.Database{
Hostname: "localhost",
Expand Down
Loading

0 comments on commit 0310431

Please sign in to comment.