Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/469 epoch indexer tracking #476

Merged
merged 2 commits into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions cmd/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,17 +604,35 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor
endEpochNumber := idxr.cfg.Base.EpochEventsEndEpoch
epochIdentifier := idxr.cfg.Base.EpochIndexingIdentifier

// Get epochs for identifier between start and end epoch
epochsBetween, err := GetEpochsAtIdentifierBetweenStartAndEnd(idxr.db, chainID, epochIdentifier, startEpochNumber, endEpochNumber)
config.Log.Infof("Checking for latest epochs before running indexer")
var chain dbTypes.Chain
chain.ChainID = idxr.cl.Config.ChainID
chain.Name = idxr.cfg.Lens.ChainName
res := idxr.db.FirstOrCreate(&chain)

if res.Error != nil {
config.Log.Fatalf("Error setting up Chain model. Err: %v", res.Error)
}

latestHeight, err := rpc.GetLatestBlockHeight(idxr.cl)
if err != nil {
config.Log.Fatalf("Error getting latest block height. Err: %v", err)
}

indexEpochsAtStartingHeight(idxr.db, idxr.cl, latestHeight, chain, epochIdentifier, idxr.cfg.Base.Throttling)

// Get epochs for identifier between start and end epoch that have not been indexed
epochsBetween, err := GetUnindexedEpochsAtIdentifierBetweenStartAndEnd(idxr.db, chainID, epochIdentifier, startEpochNumber, endEpochNumber)
if err != nil {
config.Log.Fatalf("Error getting epochs between %d and %d for identifier %s. %s", startEpochNumber, endEpochNumber, epochIdentifier, err)
}

if len(epochsBetween) == 0 {
config.Log.Fatalf("No epochs found in database between start %d and end %d for epoch identifier %s", startEpochNumber, endEpochNumber, epochIdentifier)
config.Log.Infof("No unindexed epochs found in database between start %d and end %d for epoch identifier %s", startEpochNumber, endEpochNumber, epochIdentifier)
return
}

config.Log.Infof("Indexing epoch events from epoch: %v to %v", startEpochNumber, endEpochNumber)
config.Log.Infof("Indexing epoch events from epoch: %v to %v", epochsBetween[0].EpochNumber, epochsBetween[len(epochsBetween)-1].EpochNumber)

rpcClient := rpc.URIClient{
Address: idxr.cl.Config.RPCAddr,
Expand Down Expand Up @@ -642,14 +660,17 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor

blockRelevantEvents, err := core.ProcessRPCEpochEvents(bresults, epochIdentifier)

switch {
case err != nil:
if err != nil {
failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err)
err := dbTypes.UpsertFailedEventBlock(idxr.db, int64(epoch.StartHeight), idxr.cfg.Lens.ChainID, idxr.cfg.Lens.ChainName)
if err != nil {
config.Log.Fatal("Failed to insert failed block event", err)
}
case len(blockRelevantEvents) != 0:
} else {
if len(blockRelevantEvents) == 0 {
config.Log.Infof("Block %d has no relevant block events", bresults.Height)
}

result, err := rpc.GetBlock(idxr.cl, bresults.Height)
if err != nil {
failedBlockHandler(int64(epoch.StartHeight), core.FailedBlockEventHandling, err)
Expand All @@ -667,8 +688,6 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor
epochNumber: epoch.EpochNumber,
}
}
default:
config.Log.Infof("Block %d has no relevant block events", bresults.Height)
}

if idxr.cfg.Base.Throttling != 0 {
Expand All @@ -680,10 +699,19 @@ func (idxr *Indexer) indexEpochEvents(wg *sync.WaitGroup, failedBlockHandler cor
config.Log.Infof("Finished gathering epoch events for epochs %d to %d in identifier %s", startEpochNumber, endEpochNumber, epochIdentifier)
}

func GetEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifier string, startEpochNumber int64, endEpochNumber int64) ([]dbTypes.Epoch, error) {
func GetUnindexedEpochsAtIdentifierBetweenStartAndEnd(db *gorm.DB, chainID uint, identifier string, startEpochNumber int64, endEpochNumber int64) ([]dbTypes.Epoch, error) {
var epochsBetween []dbTypes.Epoch
dbResp := db.Where("epoch_number >= ? AND epoch_number <= ? AND identifier=? AND blockchain_id=?", startEpochNumber, endEpochNumber, identifier, chainID).Find(&epochsBetween)
return epochsBetween, dbResp.Error
var err error
if endEpochNumber >= 0 {
config.Log.Info("Epoch number start and end set, searching database between start and end epoch number")
dbResp := db.Where("epoch_number >= ? AND epoch_number <= ? AND identifier=? AND blockchain_id=? AND indexed=False", startEpochNumber, endEpochNumber, identifier, chainID).Order("epoch_number asc").Find(&epochsBetween)
err = dbResp.Error
} else {
config.Log.Info("End epoch number less than 0, searching database for epochs greater than start epoch number")
dbResp := db.Where("epoch_number >= ? AND identifier=? AND blockchain_id=? AND indexed=False", startEpochNumber, identifier, chainID).Order("epoch_number asc").Find(&epochsBetween)
err = dbResp.Error
}
return epochsBetween, err
}

// doDBUpdates will read the data out of the db data chan that had been processed by the workers
Expand Down Expand Up @@ -778,6 +806,12 @@ func (idxr *Indexer) doDBUpdates(wg *sync.WaitGroup, txDataChan chan *dbData, bl
config.Log.Fatal(fmt.Sprintf("Error indexing block events for %s.", identifierLoggingString), err)
}
}

err = dbTypes.UpdateEpochIndexingStatus(idxr.db, idxr.dryRun, epochEventData.epochNumber, epochEventData.epochIdentifier, idxr.cfg.Lens.ChainID, idxr.cfg.Lens.ChainName)

if err != nil {
config.Log.Fatal(fmt.Sprintf("Error indexing block events for %s. Could not mark Epoch indexed.", identifierLoggingString), err)
}
}
}
}
6 changes: 3 additions & 3 deletions cmd/update_epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func updateEpochs(cmd *cobra.Command, args []string) {
currentHeight := latestHeight

for {
lastIndexedEpoch, foundLast := indexEpochsAtStartingHeight(db, cl, currentHeight, chain, cfg.Base.Throttling)
lastIndexedEpoch, foundLast := indexEpochsAtStartingHeight(db, cl, currentHeight, chain, epochIdentifier, cfg.Base.Throttling)

if lastIndexedEpoch.EpochNumber <= 1 || foundLast {
config.Log.Infof("Indexed earliest possible Epoch through Epoch querying method")
Expand Down Expand Up @@ -125,7 +125,7 @@ func updateEpochs(cmd *cobra.Command, args []string) {
}
}

func indexEpochsAtStartingHeight(db *gorm.DB, cl *client.ChainClient, startingHeight int64, chain dbTypes.Chain, throttling float64) (*dbTypes.Epoch, bool) {
func indexEpochsAtStartingHeight(db *gorm.DB, cl *client.ChainClient, startingHeight int64, chain dbTypes.Chain, identifierToIndex string, throttling float64) (*dbTypes.Epoch, bool) {
currentHeight := startingHeight
var lastIndexedItem dbTypes.Epoch
for {
Expand All @@ -148,7 +148,7 @@ func indexEpochsAtStartingHeight(db *gorm.DB, cl *client.ChainClient, startingHe
// Make sure we have the ability to index this EpochInfo
// This will save us trouble if Osmosis adds more Epochs in the future
indexable, identifierExists := epochsTypes.OsmosisIndexableEpochs[epoch.Identifier]
if identifierExists && indexable && epoch.Identifier == epochIdentifier {
if identifierExists && indexable && epoch.Identifier == identifierToIndex {

if epoch.CurrentEpochStartHeight <= 0 {
config.Log.Debugf("Found Epoch %d that contains 0 for CurrentEpochStartHeight, cannot continue", epoch.CurrentEpoch)
Expand Down
10 changes: 10 additions & 0 deletions db/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ func IndexBlockEvents(db *gorm.DB, dryRun bool, blockHeight int64, blockTime tim
return nil
}

func UpdateEpochIndexingStatus(db *gorm.DB, dryRun bool, epochNumber uint, epochIdentifier string, dbChainID string, dbChainName string) error {
epochToUpdate := Epoch{
EpochNumber: epochNumber,
Chain: Chain{ChainID: dbChainID, Name: dbChainName},
Identifier: epochIdentifier,
}

return db.Model(&Epoch{}).Where(&epochToUpdate).Update("indexed", true).Error
}

func createTaxableEvents(db *gorm.DB, events []TaxableEvent) error {
// Ordering matters due to foreign key constraints. Call Create() first to get right foreign key ID
return db.Transaction(func(dbTransaction *gorm.DB) error {
Expand Down
1 change: 1 addition & 0 deletions db/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,5 @@ type Epoch struct {
StartHeight uint `gorm:"uniqueIndex:chainepochidentifierheight"`
Identifier string `gorm:"uniqueIndex:chainepochidentifierheight"`
EpochNumber uint
Indexed bool `gorm:"default:false"`
}
Loading