From 245874104ae1f49317e22f3ab1ca241794ea15c7 Mon Sep 17 00:00:00 2001 From: pharr117 Date: Sat, 16 Sep 2023 18:51:03 -0400 Subject: [PATCH 1/2] Add the following: 1. Model migration to add indexed bool to epochs 2. Update where clause to find unindexed epochs between start and end 3. Update db workflow for epoch indexing to set the indexed boolean to true --- cmd/index.go | 28 ++++++++++++++++++---------- db/events.go | 10 ++++++++++ db/models.go | 1 + 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/cmd/index.go b/cmd/index.go index d869bcb..309df1f 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -604,14 +604,15 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor endEpochNumber := idxr.cfg.Base.EpochEventsEndEpoch epochIdentifier := idxr.cfg.Base.EpochIndexingIdentifier - // Get epochs for identifier between start and end epoch - epochsBetween, err := GetEpochsAtIdentifierBetweenStartAndEnd(idxr.db, chainID, epochIdentifier, startEpochNumber, endEpochNumber) + // Get epochs for identifier between start and end epoch that have not been indexed + epochsBetween, err := GetUnindexedEpochsAtIdentifierBetweenStartAndEnd(idxr.db, chainID, epochIdentifier, startEpochNumber, endEpochNumber) if err != nil { config.Log.Fatalf("Error getting epochs between %d and %d for identifier %s. %s", startEpochNumber, endEpochNumber, epochIdentifier, err) } if len(epochsBetween) == 0 { - config.Log.Fatalf("No epochs found in database between start %d and end %d for epoch identifier %s", startEpochNumber, endEpochNumber, epochIdentifier) + config.Log.Infof("No unindexed epochs found in database between start %d and end %d for epoch identifier %s", startEpochNumber, endEpochNumber, epochIdentifier) + return } config.Log.Infof("Indexing epoch events from epoch: %v to %v", startEpochNumber, endEpochNumber) @@ -642,14 +643,17 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor blockRelevantEvents, err := core.ProcessRPCEpochEvents(bresults, epochIdentifier) - switch { - case err != nil: + if err != nil { failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err) err := dbTypes.UpsertFailedEventBlock(idxr.db, int64(epoch.StartHeight), idxr.cfg.Lens.ChainID, idxr.cfg.Lens.ChainName) if err != nil { config.Log.Fatal("Failed to insert failed block event", err) } - case len(blockRelevantEvents) != 0: + } else { + if len(blockRelevantEvents) == 0 { + config.Log.Infof("Block %d has no relevant block events", bresults.Height) + } + result, err := rpc.GetBlock(idxr.cl, bresults.Height) if err != nil { failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err) @@ -667,8 +671,6 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor epochNumber: epoch.EpochNumber, } } - default: - config.Log.Infof("Block %d has no relevant block events", bresults.Height) } if idxr.cfg.Base.Throttling != 0 { @@ -680,9 +682,9 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor config.Log.Infof("Finished gathering epoch events for epochs %d to %d in identifier %s", startEpochNumber, endEpochNumber, epochIdentifier) } -func GetEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifier string, startEpochNumber int64, endEpochNumber int64) ([]dbTypes.Epoch, error) { +func GetUnindexedEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifier string, startEpochNumber int64, endEpochNumber int64) ([]dbTypes.Epoch, error) { var epochsBetween []dbTypes.Epoch - dbResp := db.Where("epoch_number >= ? AND epoch_number <= ? AND identifier=? AND blockchain_id=?", startEpochNumber, endEpochNumber, identifier, chainID).Find(&epochsBetween) + dbResp := db.Where("epoch_number >= ? AND epoch_number <= ? AND identifier=? AND blockchain_id=? AND indexed=False", startEpochNumber, endEpochNumber, identifier, chainID).Find(&epochsBetween) return epochsBetween, dbResp.Error } @@ -778,6 +780,12 @@ func (idxr *Indexer) doDBUpdates(wg *sync.WaitGroup, txDataChan chan *dbData, bl config.Log.Fatal(fmt.Sprintf("Error indexing block events for %s.", identifierLoggingString), err) } } + + err = dbTypes.UpdateEpochIndexingStatus(idxr.db, idxr.dryRun, epochEventData.epochNumber, epochEventData.epochIdentifier, idxr.cfg.Lens.ChainID, idxr.cfg.Lens.ChainName) + + if err != nil { + config.Log.Fatal(fmt.Sprintf("Error indexing block events for %s. Could not mark Epoch indexed.", identifierLoggingString), err) + } } } } diff --git a/db/events.go b/db/events.go index 4ecea85..8f62c0c 100644 --- a/db/events.go +++ b/db/events.go @@ -90,6 +90,16 @@ func IndexBlockEvents(db *gorm.DB, dryRun bool, blockHeight int64, blockTime tim return nil } +func UpdateEpochIndexingStatus(db *gorm.DB, dryRun bool, epochNumber uint, epochIdentifier string, dbChainID string, dbChainName string) error { + epochToUpdate := Epoch{ + EpochNumber: epochNumber, + Chain: Chain{ChainID: dbChainID, Name: dbChainName}, + Identifier: epochIdentifier, + } + + return db.Model(&Epoch{}).Where(&epochToUpdate).Update("indexed", true).Error +} + func createTaxableEvents(db *gorm.DB, events []TaxableEvent) error { // Ordering matters due to foreign key constraints. Call Create() first to get right foreign key ID return db.Transaction(func(dbTransaction *gorm.DB) error { diff --git a/db/models.go b/db/models.go index e84ff47..4c77146 100644 --- a/db/models.go +++ b/db/models.go @@ -193,4 +193,5 @@ type Epoch struct { StartHeight uint `gorm:"uniqueIndex:chainepochidentifierheight"` Identifier string `gorm:"uniqueIndex:chainepochidentifierheight"` EpochNumber uint + Indexed bool `gorm:"default:false"` } From 56a33f5533916c82f597b5d5869e677ae2b69612 Mon Sep 17 00:00:00 2001 From: pharr117 Date: Sat, 16 Sep 2023 19:20:42 -0400 Subject: [PATCH 2/2] Add the following: 1. Check for new epochs at every run of the epoch indexer 2. Sorting on epochs to index 3. End epoch check for negative values to find all above start --- cmd/index.go | 32 +++++++++++++++++++++++++++++--- cmd/update_epochs.go | 6 +++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/cmd/index.go b/cmd/index.go index 309df1f..669bfec 100644 --- a/cmd/index.go +++ b/cmd/index.go @@ -604,6 +604,23 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor endEpochNumber := idxr.cfg.Base.EpochEventsEndEpoch epochIdentifier := idxr.cfg.Base.EpochIndexingIdentifier + config.Log.Infof("Checking for latest epochs before running indexer") + var chain dbTypes.Chain + chain.ChainID = idxr.cl.Config.ChainID + chain.Name = idxr.cfg.Lens.ChainName + res := idxr.db.FirstOrCreate(&chain) + + if res.Error != nil { + config.Log.Fatalf("Error setting up Chain model. Err: %v", res.Error) + } + + latestHeight, err := rpc.GetLatestBlockHeight(idxr.cl) + if err != nil { + config.Log.Fatalf("Error getting latest block height. Err: %v", err) + } + + indexEpochsAtStartingHeight(idxr.db, idxr.cl, latestHeight, chain, epochIdentifier, idxr.cfg.Base.Throttling) + // Get epochs for identifier between start and end epoch that have not been indexed epochsBetween, err := GetUnindexedEpochsAtIdentifierBetweenStartAndEnd(idxr.db, chainID, epochIdentifier, startEpochNumber, endEpochNumber) if err != nil { @@ -615,7 +632,7 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor return } - config.Log.Infof("Indexing epoch events from epoch: %v to %v", startEpochNumber, endEpochNumber) + config.Log.Infof("Indexing epoch events from epoch: %v to %v", epochsBetween[0].EpochNumber, epochsBetween[len(epochsBetween)-1].EpochNumber) rpcClient := rpc.URIClient{ Address: idxr.cl.Config.RPCAddr, @@ -684,8 +701,17 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor func GetUnindexedEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifier string, startEpochNumber int64, endEpochNumber int64) ([]dbTypes.Epoch, error) { var epochsBetween []dbTypes.Epoch - dbResp := db.Where("epoch_number >= ? AND epoch_number <= ? AND identifier=? AND blockchain_id=? AND indexed=False", startEpochNumber, endEpochNumber, identifier, chainID).Find(&epochsBetween) - return epochsBetween, dbResp.Error + var err error + if endEpochNumber >= 0 { + config.Log.Info("Epoch number start and end set, searching database between start and end epoch number") + dbResp := db.Where("epoch_number >= ? AND epoch_number <= ? AND identifier=? AND blockchain_id=? AND indexed=False", startEpochNumber, endEpochNumber, identifier, chainID).Order("epoch_number asc").Find(&epochsBetween) + err = dbResp.Error + } else { + config.Log.Info("End epoch number less than 0, searching database for epochs greater than start epoch number") + dbResp := db.Where("epoch_number >= ? AND identifier=? AND blockchain_id=? AND indexed=False", startEpochNumber, identifier, chainID).Order("epoch_number asc").Find(&epochsBetween) + err = dbResp.Error + } + return epochsBetween, err } // doDBUpdates will read the data out of the db data chan that had been processed by the workers diff --git a/cmd/update_epochs.go b/cmd/update_epochs.go index 404ce41..acc927b 100644 --- a/cmd/update_epochs.go +++ b/cmd/update_epochs.go @@ -63,7 +63,7 @@ func updateEpochs(cmd *cobra.Command, args []string) { currentHeight := latestHeight for { - lastIndexedEpoch, foundLast := indexEpochsAtStartingHeight(db, cl, currentHeight, chain, cfg.Base.Throttling) + lastIndexedEpoch, foundLast := indexEpochsAtStartingHeight(db, cl, currentHeight, chain, epochIdentifier, cfg.Base.Throttling) if lastIndexedEpoch.EpochNumber <= 1 || foundLast { config.Log.Infof("Indexed earliest possible Epoch through Epoch querying method") @@ -125,7 +125,7 @@ func updateEpochs(cmd *cobra.Command, args []string) { } } -func indexEpochsAtStartingHeight(db *gorm.DB, cl *client.ChainClient, startingHeight int64, chain dbTypes.Chain, throttling float64) (*dbTypes.Epoch, bool) { +func indexEpochsAtStartingHeight(db *gorm.DB, cl *client.ChainClient, startingHeight int64, chain dbTypes.Chain, identifierToIndex string, throttling float64) (*dbTypes.Epoch, bool) { currentHeight := startingHeight var lastIndexedItem dbTypes.Epoch for { @@ -148,7 +148,7 @@ func indexEpochsAtStartingHeight(db *gorm.DB, cl *client.ChainClient, startingHe // Make sure we have the ability to index this EpochInfo // This will save us trouble if Osmosis adds more Epochs in the future indexable, identifierExists := epochsTypes.OsmosisIndexableEpochs[epoch.Identifier] - if identifierExists && indexable && epoch.Identifier == epochIdentifier { + if identifierExists && indexable && epoch.Identifier == identifierToIndex { if epoch.CurrentEpochStartHeight <= 0 { config.Log.Debugf("Found Epoch %d that contains 0 for CurrentEpochStartHeight, cannot continue", epoch.CurrentEpoch)