Skip to content

Commit

Permalink
Fix: don't send to S3 the same blobs (#224)
Browse files Browse the repository at this point in the history
* Fix: don't send to S3 the same blobs

* Add test
  • Loading branch information
aopoltorzhicky authored Jun 26, 2024
1 parent dcc5fb9 commit 529230a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
31 changes: 19 additions & 12 deletions pkg/indexer/blob_saver/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Module struct {

kind string
blocks *sync.Map[pkgTypes.Level, *[]blob.Blob]
blobs *sync.Map[string, struct{}]
storage blob.Storage
head pkgTypes.Level
}
Expand All @@ -57,6 +58,7 @@ func NewModule(
m := Module{
BaseModule: modules.New("blob_saver"),
blocks: sync.NewMap[pkgTypes.Level, *[]blob.Blob](),
blobs: sync.NewMap[string, struct{}](),
kind: kind,
}

Expand Down Expand Up @@ -153,6 +155,7 @@ func (module *Module) processEndOfBlock(ctx context.Context, height pkgTypes.Lev

module.head = height
module.blocks.Delete(height)
module.blobs.Clear()
return nil
}

Expand All @@ -167,20 +170,24 @@ func (module *Module) processBlob(msg *Msg) error {
return errors.Wrap(err, "can't create commitment")
}

blb := blob.Blob{
Commitment: commitment,
Blob: msg.Blob,
Height: uint64(msg.Height),
}
key := blb.String()

// skip blobs with the same commitments in the current block.
if _, ok := module.blobs.Get(key); ok {
return nil
}
module.blobs.Set(key, struct{}{})

if blobs, ok := module.blocks.Get(msg.Height); ok {
*blobs = append(*blobs, blob.Blob{
Commitment: commitment,
Blob: msg.Blob,
Height: uint64(msg.Height),
})
*blobs = append(*blobs, blb)
} else {
module.blocks.Set(msg.Height, &[]blob.Blob{
{
Commitment: commitment,
Blob: msg.Blob,
Height: uint64(msg.Height),
},
})

module.blocks.Set(msg.Height, &[]blob.Blob{blb})
}

return nil
Expand Down
10 changes: 10 additions & 0 deletions pkg/indexer/blob_saver/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ func TestBlobSaverModule(t *testing.T) {
ShareVersion: 0,
NamespaceVersion: 0,
}
b2Copy := &types.Blob{
NamespaceId: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x67, 0x6d},
Data: []byte("0x676d"),
ShareVersion: 0,
NamespaceVersion: 0,
}
commitment, err := base64.StdEncoding.DecodeString("uwghsElFtoHNqQ3JrsDGj8uLW456izVbegVL/AunMOw=")
require.NoError(t, err, "decode commitment")

Expand Down Expand Up @@ -83,6 +89,10 @@ func TestBlobSaverModule(t *testing.T) {
Height: 101,
Blob: b2,
})
input.Push(&Msg{
Height: 101,
Blob: b2Copy,
})
input.Push(&Msg{
Height: 101,
EndBlock: true,
Expand Down

0 comments on commit 529230a

Please sign in to comment.