Skip to content

Commit

Permalink
Merge pull request #476 from DefiantLabs/feat/469-epoch-indexer-tracking
Browse files Browse the repository at this point in the history
Feat/469 epoch indexer tracking
  • Loading branch information
pharr117 authored Sep 17, 2023
2 parents 6e3b7ad + 56a33f5 commit 70f09bc
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 15 deletions.
58 changes: 46 additions & 12 deletions cmd/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,17 +604,35 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor
endEpochNumber := idxr.cfg.Base.EpochEventsEndEpoch
epochIdentifier := idxr.cfg.Base.EpochIndexingIdentifier

// Get epochs for identifier between start and end epoch
epochsBetween, err := GetEpochsAtIdentifierBetweenStartAndEnd(idxr.db, chainID, epochIdentifier, startEpochNumber, endEpochNumber)
config.Log.Infof("Checking for latest epochs before running indexer")
var chain dbTypes.Chain
chain.ChainID = idxr.cl.Config.ChainID
chain.Name = idxr.cfg.Lens.ChainName
res := idxr.db.FirstOrCreate(&chain)

if res.Error != nil {
config.Log.Fatalf("Error setting up Chain model. Err: %v", res.Error)
}

latestHeight, err := rpc.GetLatestBlockHeight(idxr.cl)
if err != nil {
config.Log.Fatalf("Error getting latest block height. Err: %v", err)
}

indexEpochsAtStartingHeight(idxr.db, idxr.cl, latestHeight, chain, epochIdentifier, idxr.cfg.Base.Throttling)

// Get epochs for identifier between start and end epoch that have not been indexed
epochsBetween, err := GetUnindexedEpochsAtIdentifierBetweenStartAndEnd(idxr.db, chainID, epochIdentifier, startEpochNumber, endEpochNumber)
if err != nil {
config.Log.Fatalf("Error getting epochs between %d and %d for identifier %s. %s", startEpochNumber, endEpochNumber, epochIdentifier, err)
}

if len(epochsBetween) == 0 {
config.Log.Fatalf("No epochs found in database between start %d and end %d for epoch identifier %s", startEpochNumber, endEpochNumber, epochIdentifier)
config.Log.Infof("No unindexed epochs found in database between start %d and end %d for epoch identifier %s", startEpochNumber, endEpochNumber, epochIdentifier)
return
}

config.Log.Infof("Indexing epoch events from epoch: %v to %v", startEpochNumber, endEpochNumber)
config.Log.Infof("Indexing epoch events from epoch: %v to %v", epochsBetween[0].EpochNumber, epochsBetween[len(epochsBetween)-1].EpochNumber)

rpcClient := rpc.URIClient{
Address: idxr.cl.Config.RPCAddr,
Expand Down Expand Up @@ -642,14 +660,17 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor

blockRelevantEvents, err := core.ProcessRPCEpochEvents(bresults, epochIdentifier)

switch {
case err != nil:
if err != nil {
failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err)
err := dbTypes.UpsertFailedEventBlock(idxr.db, int64(epoch.StartHeight), idxr.cfg.Lens.ChainID, idxr.cfg.Lens.ChainName)
if err != nil {
config.Log.Fatal("Failed to insert failed block event", err)
}
case len(blockRelevantEvents) != 0:
} else {
if len(blockRelevantEvents) == 0 {
config.Log.Infof("Block %d has no relevant block events", bresults.Height)
}

result, err := rpc.GetBlock(idxr.cl, bresults.Height)
if err != nil {
failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err)
Expand All @@ -667,8 +688,6 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor
epochNumber: epoch.EpochNumber,
}
}
default:
config.Log.Infof("Block %d has no relevant block events", bresults.Height)
}

if idxr.cfg.Base.Throttling != 0 {
Expand All @@ -680,10 +699,19 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor
config.Log.Infof("Finished gathering epoch events for epochs %d to %d in identifier %s", startEpochNumber, endEpochNumber, epochIdentifier)
}

func GetEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifier string, startEpochNumber int64, endEpochNumber int64) ([]dbTypes.Epoch, error) {
func GetUnindexedEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifier string, startEpochNumber int64, endEpochNumber int64) ([]dbTypes.Epoch, error) {
var epochsBetween []dbTypes.Epoch
dbResp := db.Where("epoch_number >= ? AND epoch_number <= ? AND identifier=? AND blockchain_id=?", startEpochNumber, endEpochNumber, identifier, chainID).Find(&epochsBetween)
return epochsBetween, dbResp.Error
var err error
if endEpochNumber >= 0 {
config.Log.Info("Epoch number start and end set, searching database between start and end epoch number")
dbResp := db.Where("epoch_number >= ? AND epoch_number <= ? AND identifier=? AND blockchain_id=? AND indexed=False", startEpochNumber, endEpochNumber, identifier, chainID).Order("epoch_number asc").Find(&epochsBetween)
err = dbResp.Error
} else {
config.Log.Info("End epoch number less than 0, searching database for epochs greater than start epoch number")
dbResp := db.Where("epoch_number >= ? AND identifier=? AND blockchain_id=? AND indexed=False", startEpochNumber, identifier, chainID).Order("epoch_number asc").Find(&epochsBetween)
err = dbResp.Error
}
return epochsBetween, err
}

// doDBUpdates will read the data out of the db data chan that had been processed by the workers
Expand Down Expand Up @@ -778,6 +806,12 @@ func (idxr *Indexer) doDBUpdates(wg *sync.WaitGroup, txDataChan chan *dbData, bl
config.Log.Fatal(fmt.Sprintf("Error indexing block events for %s.", identifierLoggingString), err)
}
}

err = dbTypes.UpdateEpochIndexingStatus(idxr.db, idxr.dryRun, epochEventData.epochNumber, epochEventData.epochIdentifier, idxr.cfg.Lens.ChainID, idxr.cfg.Lens.ChainName)

if err != nil {
config.Log.Fatal(fmt.Sprintf("Error indexing block events for %s. Could not mark Epoch indexed.", identifierLoggingString), err)
}
}
}
}
6 changes: 3 additions & 3 deletions cmd/update_epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func updateEpochs(cmd *cobra.Command, args []string) {
currentHeight := latestHeight

for {
lastIndexedEpoch, foundLast := indexEpochsAtStartingHeight(db, cl, currentHeight, chain, cfg.Base.Throttling)
lastIndexedEpoch, foundLast := indexEpochsAtStartingHeight(db, cl, currentHeight, chain, epochIdentifier, cfg.Base.Throttling)

if lastIndexedEpoch.EpochNumber <= 1 || foundLast {
config.Log.Infof("Indexed earliest possible Epoch through Epoch querying method")
Expand Down Expand Up @@ -125,7 +125,7 @@ func updateEpochs(cmd *cobra.Command, args []string) {
}
}

func indexEpochsAtStartingHeight(db *gorm.DB, cl *client.ChainClient, startingHeight int64, chain dbTypes.Chain, throttling float64) (*dbTypes.Epoch, bool) {
func indexEpochsAtStartingHeight(db *gorm.DB, cl *client.ChainClient, startingHeight int64, chain dbTypes.Chain, identifierToIndex string, throttling float64) (*dbTypes.Epoch, bool) {
currentHeight := startingHeight
var lastIndexedItem dbTypes.Epoch
for {
Expand All @@ -148,7 +148,7 @@ func indexEpochsAtStartingHeight(db *gorm.DB, cl *client.ChainClient, startingHe
// Make sure we have the ability to index this EpochInfo
// This will save us trouble if Osmosis adds more Epochs in the future
indexable, identifierExists := epochsTypes.OsmosisIndexableEpochs[epoch.Identifier]
if identifierExists && indexable && epoch.Identifier == epochIdentifier {
if identifierExists && indexable && epoch.Identifier == identifierToIndex {

if epoch.CurrentEpochStartHeight <= 0 {
config.Log.Debugf("Found Epoch %d that contains 0 for CurrentEpochStartHeight, cannot continue", epoch.CurrentEpoch)
Expand Down
10 changes: 10 additions & 0 deletions db/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ func IndexBlockEvents(db *gorm.DB, dryRun bool, blockHeight int64, blockTime tim
return nil
}

func UpdateEpochIndexingStatus(db *gorm.DB, dryRun bool, epochNumber uint, epochIdentifier string, dbChainID string, dbChainName string) error {
epochToUpdate := Epoch{
EpochNumber: epochNumber,
Chain: Chain{ChainID: dbChainID, Name: dbChainName},
Identifier: epochIdentifier,
}

return db.Model(&Epoch{}).Where(&epochToUpdate).Update("indexed", true).Error
}

func createTaxableEvents(db *gorm.DB, events []TaxableEvent) error {
// Ordering matters due to foreign key constraints. Call Create() first to get right foreign key ID
return db.Transaction(func(dbTransaction *gorm.DB) error {
Expand Down
1 change: 1 addition & 0 deletions db/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,5 @@ type Epoch struct {
StartHeight uint `gorm:"uniqueIndex:chainepochidentifierheight"`
Identifier string `gorm:"uniqueIndex:chainepochidentifierheight"`
EpochNumber uint
Indexed bool `gorm:"default:false"`
}

0 comments on commit 70f09bc

Please sign in to comment.