Skip to content

Commit

Permalink
refactoring(full/availability): removing TeeGetter, storing via Share…
Browse files Browse the repository at this point in the history
…sAvailable (#2726)
  • Loading branch information
distractedm1nd authored Sep 21, 2023
1 parent 2b61033 commit b6ac8c2
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 166 deletions.
2 changes: 1 addition & 1 deletion das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) {
ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0],
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
require.NoError(t, err)
avail := full.TestAvailability(getters.NewIPLDGetter(bServ))
avail := full.TestAvailability(t, getters.NewIPLDGetter(bServ))
// 15 headers from the past and 15 future headers
mockGet, sub, _ := createDASerSubcomponents(t, bServ, 15, 15)

Expand Down
8 changes: 3 additions & 5 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,19 @@ func lightGetter(
// by shrex the next time the data is retrieved (meaning shard recovery is
// manual after corruption is detected).
func bridgeGetter(
store *eds.Store,
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
cfg Config,
) share.Getter {
var cascade []share.Getter
cascade = append(cascade, storeGetter)
if cfg.UseShareExchange {
cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store))
cascade = append(cascade, shrexGetter)
}
return getters.NewCascadeGetter(cascade)
}

func fullGetter(
store *eds.Store,
storeGetter *getters.StoreGetter,
shrexGetter *getters.ShrexGetter,
ipldGetter *getters.IPLDGetter,
Expand All @@ -115,8 +113,8 @@ func fullGetter(
var cascade []share.Getter
cascade = append(cascade, storeGetter)
if cfg.UseShareExchange {
cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store))
cascade = append(cascade, shrexGetter)
}
cascade = append(cascade, getters.NewTeeGetter(ipldGetter, store))
cascade = append(cascade, ipldGetter)
return getters.NewCascadeGetter(cascade)
}
2 changes: 1 addition & 1 deletion nodebuilder/tests/nd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func replaceShareGetter() fx.Option {
) share.Getter {
cascade := make([]share.Getter, 0, 2)
cascade = append(cascade, storeGetter)
cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store))
cascade = append(cascade, shrexGetter)
return getters.NewCascadeGetter(cascade)
},
))
Expand Down
2 changes: 1 addition & 1 deletion share/availability/cache/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func FullAvailabilityWithLocalRandSquare(t *testing.T, n int) (share.Availabilit
store := dssync.MutexWrap(ds.NewMapDatastore())
getter := getters.NewIPLDGetter(bServ)
avail := NewShareAvailability(
full.TestAvailability(getter),
full.TestAvailability(t, getter),
store,
)
return avail, availability_test.RandFillBS(t, n, bServ)
Expand Down
23 changes: 17 additions & 6 deletions share/availability/full/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package full
import (
"context"
"errors"
"fmt"

"github.com/filecoin-project/dagstore"
logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
)

Expand Down Expand Up @@ -63,13 +66,15 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, root *share.Ro
}

// a hack to avoid loading the whole EDS in mem if we store it already.
if fa.store != nil {
if ok, _ := fa.store.Has(ctx, root.Hash()); ok {
return nil
}
if ok, _ := fa.store.Has(ctx, root.Hash()); ok {
return nil
}

_, err := fa.getter.GetEDS(ctx, root)
adder := ipld.NewProofsAdder(len(root.RowRoots))
ctx = ipld.CtxWithProofsAdder(ctx, adder)
defer adder.Purge()

eds, err := fa.getter.GetEDS(ctx, root)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
Expand All @@ -79,8 +84,14 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, root *share.Ro
if errors.Is(err, share.ErrNotFound) || errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &byzantineErr) {
return share.ErrNotAvailable
}
return err
}
return err

err = fa.store.Put(ctx, root.Hash(), eds)
if err != nil && !errors.Is(err, dagstore.ErrShardExists) {
return fmt.Errorf("full availability: failed to store eds: %w", err)
}
return nil
}

func (fa *ShareAvailability) ProbabilityOfAvailability(context.Context) float64 {
Expand Down
19 changes: 17 additions & 2 deletions share/availability/full/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,26 @@ func TestSharesAvailable_Full(t *testing.T) {

// RandServiceWithSquare creates a NewShareAvailability inside, so we can test it
getter, dah := GetterWithRandSquare(t, 16)
avail := TestAvailability(getter)
avail := TestAvailability(t, getter)
err := avail.SharesAvailable(ctx, dah)
assert.NoError(t, err)
}

func TestSharesAvailable_StoresToEDSStore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// RandServiceWithSquare creates a NewShareAvailability inside, so we can test it
getter, dah := GetterWithRandSquare(t, 16)
avail := TestAvailability(t, getter)
err := avail.SharesAvailable(ctx, dah)
assert.NoError(t, err)

has, err := avail.store.Has(ctx, dah.Hash())
assert.NoError(t, err)
assert.True(t, has)
}

func TestSharesAvailable_Full_ErrNotAvailable(t *testing.T) {
ctrl := gomock.NewController(t)
getter := mocks.NewMockGetter(ctrl)
Expand All @@ -49,7 +64,7 @@ func TestSharesAvailable_Full_ErrNotAvailable(t *testing.T) {
eds := edstest.RandEDS(t, 4)
dah, err := da.NewDataAvailabilityHeader(eds)
require.NoError(t, err)
avail := TestAvailability(getter)
avail := TestAvailability(t, getter)

errors := []error{share.ErrNotFound, context.DeadlineExceeded}
for _, getterErr := range errors {
Expand Down
14 changes: 11 additions & 3 deletions share/availability/full/testing.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package full

import (
"context"
"testing"
"time"

"github.com/ipfs/go-datastore"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
Expand All @@ -32,16 +36,20 @@ func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_t
func Node(dn *availability_test.TestDagNet) *availability_test.TestNode {
nd := dn.NewTestNode()
nd.Getter = getters.NewIPLDGetter(nd.BlockService)
nd.Availability = TestAvailability(nd.Getter)
nd.Availability = TestAvailability(dn.T, nd.Getter)
return nd
}

func TestAvailability(getter share.Getter) *ShareAvailability {
func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability {
disc := discovery.NewDiscovery(
nil,
routing.NewRoutingDiscovery(routinghelpers.Null{}),
discovery.WithAdvertiseInterval(time.Second),
discovery.WithPeersLimit(10),
)
return NewShareAvailability(nil, getter, disc)
store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore())
require.NoError(t, err)
err = store.Start(context.Background())
require.NoError(t, err)
return NewShareAvailability(store, getter, disc)
}
55 changes: 0 additions & 55 deletions share/getters/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,61 +22,6 @@ import (
"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestTeeGetter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

storeCfg := eds.DefaultParameters()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
edsStore, err := eds.NewStore(storeCfg, t.TempDir(), ds)
require.NoError(t, err)

err = edsStore.Start(ctx)
require.NoError(t, err)

bServ := ipld.NewMemBlockservice()
ig := NewIPLDGetter(bServ)
tg := NewTeeGetter(ig, edsStore)

t.Run("TeesToEDSStore", func(t *testing.T) {
randEds, dah := randomEDS(t)
_, err := ipld.ImportShares(ctx, randEds.Flattened(), bServ)
require.NoError(t, err)

// eds store doesn't have the EDS yet
ok, err := edsStore.Has(ctx, dah.Hash())
assert.False(t, ok)
assert.NoError(t, err)

retrievedEDS, err := tg.GetEDS(ctx, dah)
require.NoError(t, err)
require.True(t, randEds.Equals(retrievedEDS))

// eds store now has the EDS and it can be retrieved
ok, err = edsStore.Has(ctx, dah.Hash())
assert.True(t, ok)
assert.NoError(t, err)
finalEDS, err := edsStore.Get(ctx, dah.Hash())
assert.NoError(t, err)
require.True(t, randEds.Equals(finalEDS))
})

t.Run("ShardAlreadyExistsDoesntError", func(t *testing.T) {
randEds, dah := randomEDS(t)
_, err := ipld.ImportShares(ctx, randEds.Flattened(), bServ)
require.NoError(t, err)

retrievedEDS, err := tg.GetEDS(ctx, dah)
require.NoError(t, err)
require.True(t, randEds.Equals(retrievedEDS))

// no error should be returned, even though the EDS identified by the DAH already exists
retrievedEDS, err = tg.GetEDS(ctx, dah)
require.NoError(t, err)
require.True(t, randEds.Equals(retrievedEDS))
})
}

func TestStoreGetter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand Down
12 changes: 7 additions & 5 deletions share/getters/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,13 @@ func (sg *StoreGetter) GetSharesByNamespace(
blockGetter := eds.NewBlockGetter(bs)
shares, err = collectSharesByNamespace(ctx, blockGetter, root, namespace)
if errors.Is(err, ipld.ErrNodeNotFound) {
// IPLD node not found after the index pointed to this shard and the CAR blockstore has been
// opened successfully is a strong indicator of corruption. We remove the block on bridges
// and fulls and return share.ErrNotFound to ensure the data is retrieved by the next
// getter. Note that this recovery is manual and will only be restored by an RPC call to
// fetch the same datahash that was removed.
// IPLD node not found after the index pointed to this shard and the CAR
// blockstore has been opened successfully is a strong indicator of
// corruption. We remove the block on bridges and fulls and return
// share.ErrNotFound to ensure the data is retrieved by the next getter.
// Note that this recovery is manual and will only be restored by an RPC
// call to SharesAvailable that fetches the same datahash that was
// removed.
err = sg.store.Remove(ctx, root.Hash())
if err != nil {
log.Errorf("getter/store: failed to remove CAR after detected corruption: %w", err)
Expand Down
85 changes: 0 additions & 85 deletions share/getters/tee.go

This file was deleted.

5 changes: 3 additions & 2 deletions share/ipld/corrupted_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestNamespaceHasher_CorruptedData(t *testing.T) {

requestor := full.Node(net)
provider, mockBS := availability_test.MockNode(t, net)
provider.Availability = full.TestAvailability(getters.NewIPLDGetter(provider.BlockService))
provider.Availability = full.TestAvailability(t, getters.NewIPLDGetter(provider.BlockService))
net.ConnectAll()

// before the provider starts attacking, we should be able to retrieve successfully. We pass a size
Expand All @@ -38,7 +38,8 @@ func TestNamespaceHasher_CorruptedData(t *testing.T) {
require.NoError(t, err)

// clear the storage of the requester so that it must retrieve again, then start attacking
requestor.ClearStorage()
// we reinitialize the node to clear the eds store
requestor = full.Node(net)
mockBS.Attacking = true
getCtx, cancelGet = context.WithTimeout(ctx, sharesAvailableTimeout)
t.Cleanup(cancelGet)
Expand Down

0 comments on commit b6ac8c2

Please sign in to comment.