Skip to content

Commit

Permalink
perf(shwap/bitswap): add session pools inside Getter (#3947)
Browse files Browse the repository at this point in the history
In the Robusta environment this change:
* Improved load distribution across BN/FNs
* Sped up sampling

Originally, I was testing Bitswap with this change but decided against including it in the patch, as it is solely an optimization. However, without this change on the rc tag, load distribution was rather strange. One LN sampled at 180/s, while all 4 others at 4/s. Adding pooling fixed the issue and resulted in all 5 LNs at about 100/s.

Co-authored-by: Vlad <[email protected]>
  • Loading branch information
Wondertan and walldiss authored Nov 18, 2024
1 parent 72c08f7 commit b24d47f
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 19 deletions.
99 changes: 80 additions & 19 deletions share/shwap/p2p/bitswap/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package bitswap
import (
"context"
"fmt"
"sync"
"time"

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/celestia-app/v3/pkg/wrapper"
libshare "github.com/celestiaorg/go-square/v2/share"
Expand All @@ -30,8 +30,8 @@ type Getter struct {
bstore blockstore.Blockstore
availWndw time.Duration

availableSession exchange.Fetcher
archivalSession exchange.Fetcher
availablePool *pool
archivalPool *pool

cancel context.CancelFunc
}
Expand All @@ -42,7 +42,13 @@ func NewGetter(
bstore blockstore.Blockstore,
availWndw time.Duration,
) *Getter {
return &Getter{exchange: exchange, bstore: bstore, availWndw: availWndw}
return &Getter{
exchange: exchange,
bstore: bstore,
availWndw: availWndw,
availablePool: newPool(exchange),
archivalPool: newPool(exchange),
}
}

// Start kicks off internal fetching sessions.
Expand All @@ -57,12 +63,13 @@ func NewGetter(
// with regular full node peers.
func (g *Getter) Start() {
ctx, cancel := context.WithCancel(context.Background())
g.availableSession = g.exchange.NewSession(ctx)
g.archivalSession = g.exchange.NewSession(ctx)
g.cancel = cancel

g.availablePool.ctx = ctx
g.availablePool.ctx = ctx
}

// Stop shuts down Getter's internal fetching session.
// Stop shuts down Getter's internal fetching getSession.
func (g *Getter) Stop() {
g.cancel()
}
Expand Down Expand Up @@ -97,7 +104,12 @@ func (g *Getter) GetShares(
blks[i] = sid
}

ses := g.session(ctx, hdr)
isArchival := g.isArchival(hdr)
span.SetAttributes(attribute.Bool("is_archival", isArchival))

ses, release := g.getSession(isArchival)
defer release()

err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithStore(g.bstore), WithFetcher(ses))
if err != nil {
span.RecordError(err)
Expand Down Expand Up @@ -156,7 +168,12 @@ func (g *Getter) GetEDS(
blks[i] = blk
}

ses := g.session(ctx, hdr)
isArchival := g.isArchival(hdr)
span.SetAttributes(attribute.Bool("is_archival", isArchival))

ses, release := g.getSession(isArchival)
defer release()

err := Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses))
if err != nil {
span.RecordError(err)
Expand Down Expand Up @@ -210,7 +227,12 @@ func (g *Getter) GetNamespaceData(
blks[i] = rndblk
}

ses := g.session(ctx, hdr)
isArchival := g.isArchival(hdr)
span.SetAttributes(attribute.Bool("is_archival", isArchival))

ses, release := g.getSession(isArchival)
defer release()

if err = Fetch(ctx, g.exchange, hdr.DAH, blks, WithFetcher(ses)); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "Fetch")
Expand All @@ -230,17 +252,19 @@ func (g *Getter) GetNamespaceData(
return nsShrs, nil
}

// session decides which fetching session to use for the given header.
func (g *Getter) session(ctx context.Context, hdr *header.ExtendedHeader) exchange.Fetcher {
session := g.archivalSession
// isArchival reports whether the header is for archival data
func (g *Getter) isArchival(hdr *header.ExtendedHeader) bool {
return !availability.IsWithinWindow(hdr.Time(), g.availWndw)
}

isWithinAvailability := availability.IsWithinWindow(hdr.Time(), g.availWndw)
if isWithinAvailability {
session = g.availableSession
// getSession takes a session out of the respective session pool
func (g *Getter) getSession(isArchival bool) (ses exchange.Fetcher, release func()) {
if isArchival {
ses = g.archivalPool.get()
return ses, func() { g.archivalPool.put(ses) }
}

trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("within_availability", isWithinAvailability))
return session
ses = g.availablePool.get()
return ses, func() { g.availablePool.put(ses) }
}

// edsFromRows imports given Rows and computes EDS out of them, assuming enough Rows were provided.
Expand Down Expand Up @@ -274,3 +298,40 @@ func edsFromRows(roots *share.AxisRoots, rows []shwap.Row) (*rsmt2d.ExtendedData

return square, nil
}

// pool is a pool of Bitswap sessions.
type pool struct {
lock sync.Mutex
sessions []exchange.Fetcher
ctx context.Context
exchange exchange.SessionExchange
}

func newPool(ex exchange.SessionExchange) *pool {
return &pool{
exchange: ex,
sessions: make([]exchange.Fetcher, 0),
}
}

// get returns a session from the pool or creates a new one if the pool is empty.
func (p *pool) get() exchange.Fetcher {
p.lock.Lock()
defer p.lock.Unlock()

if len(p.sessions) == 0 {
return p.exchange.NewSession(p.ctx)
}

ses := p.sessions[len(p.sessions)-1]
p.sessions = p.sessions[:len(p.sessions)-1]
return ses
}

// put returns a session to the pool.
func (p *pool) put(ses exchange.Fetcher) {
p.lock.Lock()
defer p.lock.Unlock()

p.sessions = append(p.sessions, ses)
}
86 changes: 86 additions & 0 deletions share/shwap/p2p/bitswap/getter_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package bitswap

import (
"context"
"sync"
"testing"

"github.com/ipfs/boxo/exchange"
"github.com/stretchr/testify/require"

libshare "github.com/celestiaorg/go-square/v2/share"
Expand All @@ -28,3 +31,86 @@ func TestEDSFromRows(t *testing.T) {
require.NoError(t, err)
require.True(t, edsIn.Equals(edsOut))
}

// mockSessionExchange is a mock implementation of exchange.SessionExchange
type mockSessionExchange struct {
exchange.SessionExchange
sessionCount int
mu sync.Mutex
}

func (m *mockSessionExchange) NewSession(ctx context.Context) exchange.Fetcher {
m.mu.Lock()
defer m.mu.Unlock()
m.sessionCount++
return &mockFetcher{id: m.sessionCount}
}

// mockFetcher is a mock implementation of exchange.Fetcher
type mockFetcher struct {
exchange.Fetcher
id int
}

func TestPoolGetFromEmptyPool(t *testing.T) {
ex := &mockSessionExchange{}
p := newPool(ex)
ctx := context.Background()
p.ctx = ctx

ses := p.get().(*mockFetcher)
require.NotNil(t, ses)
require.Equal(t, 1, ses.id)
}

func TestPoolPutAndGet(t *testing.T) {
ex := &mockSessionExchange{}
p := newPool(ex)
ctx := context.Background()
p.ctx = ctx

// Get a session
ses := p.get().(*mockFetcher)

// Put it back
p.put(ses)

// Get again
ses2 := p.get().(*mockFetcher)

require.Equal(t, ses.id, ses2.id)
}

func TestPoolConcurrency(t *testing.T) {
ex := &mockSessionExchange{}
p := newPool(ex)
ctx := context.Background()
p.ctx = ctx

const numGoroutines = 50
var wg sync.WaitGroup

sessionIDSet := make(map[int]struct{})
lock := sync.Mutex{}

// Start multiple goroutines to get sessions
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ses := p.get()
mockSes := ses.(*mockFetcher)
p.put(ses)
lock.Lock()
sessionIDSet[mockSes.id] = struct{}{}
lock.Unlock()
}()
}
wg.Wait()

// Since the pool reuses sessions, the number of unique session IDs should be less than or equal to numGoroutines
if len(sessionIDSet) > numGoroutines {
t.Fatalf("expected number of unique sessions to be less than or equal to %d, got %d",
numGoroutines, len(sessionIDSet))
}
}

0 comments on commit b24d47f

Please sign in to comment.