Skip to content

Commit

Permalink
refactor: api rm Salt, reservationPeriod->timestamp (#1254)
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen authored Feb 14, 2025
1 parent 014dfa9 commit d145679
Show file tree
Hide file tree
Showing 63 changed files with 387 additions and 397 deletions.
34 changes: 17 additions & 17 deletions api/clients/v2/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"math/big"
"slices"
"sync"
"time"

disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -61,14 +60,15 @@ func NewAccountant(accountID string, reservation *core.ReservedPayment, onDemand

// BlobPaymentInfo calculates and records payment information. The accountant
// will attempt to use the active reservation first and check for quorum settings,
// then on-demand if the reservation is not available. The returned values are
// reservation period for reservation payments and cumulative payment for on-demand payments,
// and both fields are used to create the payment header and signature.
// then on-demand if the reservation is not available. It takes in a timestamp at
// the current UNIX time in nanoseconds, and returns a cumulative payment for on-
// demand payments in units of wei. Both timestamp and cumulative payment are used
// to create the payment header and signature, with non-zero cumulative payment
// indicating on-demand payment.
// These generated values are used to create the payment header and signature, as specified in
// api/proto/common/v2/common_v2.proto
func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quorumNumbers []uint8) (uint32, *big.Int, error) {
now := time.Now().Unix()
currentReservationPeriod := meterer.GetReservationPeriod(uint64(now), a.reservationWindow)
func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quorumNumbers []uint8, timestamp int64) (*big.Int, error) {
currentReservationPeriod := meterer.GetReservationPeriodByNanosecond(timestamp, a.reservationWindow)
symbolUsage := uint64(a.SymbolsCharged(numSymbols))

a.usageLock.Lock()
Expand All @@ -80,19 +80,19 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quo
binLimit := a.reservation.SymbolsPerSecond * uint64(a.reservationWindow)
if relativePeriodRecord.Usage <= binLimit {
if err := QuorumCheck(quorumNumbers, a.reservation.QuorumNumbers); err != nil {
return 0, big.NewInt(0), err
return big.NewInt(0), err
}
return currentReservationPeriod, big.NewInt(0), nil
return big.NewInt(0), nil
}

overflowPeriodRecord := a.GetRelativePeriodRecord(currentReservationPeriod + 2)
// Allow one overflow when the overflow bin is empty, the current usage and new length are both less than the limit
if overflowPeriodRecord.Usage == 0 && relativePeriodRecord.Usage-symbolUsage < binLimit && symbolUsage <= binLimit {
overflowPeriodRecord.Usage += relativePeriodRecord.Usage - binLimit
if err := QuorumCheck(quorumNumbers, a.reservation.QuorumNumbers); err != nil {
return 0, big.NewInt(0), err
return big.NewInt(0), err
}
return currentReservationPeriod, big.NewInt(0), nil
return big.NewInt(0), nil
}

// reservation not available, rollback reservation records, attempt on-demand
Expand All @@ -102,24 +102,24 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, numSymbols uint32, quo
a.cumulativePayment.Add(a.cumulativePayment, incrementRequired)
if a.cumulativePayment.Cmp(a.onDemand.CumulativePayment) <= 0 {
if err := QuorumCheck(quorumNumbers, requiredQuorums); err != nil {
return 0, big.NewInt(0), err
return big.NewInt(0), err
}
return 0, a.cumulativePayment, nil
return a.cumulativePayment, nil
}

return 0, big.NewInt(0), fmt.Errorf("neither reservation nor on-demand payment is available")
return big.NewInt(0), fmt.Errorf("neither reservation nor on-demand payment is available")
}

// AccountBlob accountant provides and records payment information
func (a *Accountant) AccountBlob(ctx context.Context, numSymbols uint32, quorums []uint8) (*core.PaymentMetadata, error) {
reservationPeriod, cumulativePayment, err := a.BlobPaymentInfo(ctx, numSymbols, quorums)
func (a *Accountant) AccountBlob(ctx context.Context, timestamp int64, numSymbols uint32, quorums []uint8) (*core.PaymentMetadata, error) {
cumulativePayment, err := a.BlobPaymentInfo(ctx, numSymbols, quorums, timestamp)
if err != nil {
return nil, err
}

pm := &core.PaymentMetadata{
AccountID: a.accountID,
ReservationPeriod: reservationPeriod,
Timestamp: timestamp,
CumulativePayment: cumulativePayment,
}

Expand Down
81 changes: 49 additions & 32 deletions api/clients/v2/accountant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,31 @@ func TestAccountBlob_Reservation(t *testing.T) {
ctx := context.Background()
symbolLength := uint32(500)
quorums := []uint8{0, 1}
now := time.Now().UnixNano()

header, err := accountant.AccountBlob(ctx, symbolLength, quorums)
header, err := accountant.AccountBlob(ctx, now, symbolLength, quorums)

assert.NoError(t, err)
assert.Equal(t, meterer.GetReservationPeriod(uint64(time.Now().Unix()), reservationWindow), header.ReservationPeriod)
assert.Equal(t, meterer.GetReservationPeriod(time.Now().Unix(), reservationWindow), meterer.GetReservationPeriodByNanosecond(header.Timestamp, reservationWindow))
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{500, 0, 0}, mapRecordUsage(accountant.periodRecords)), true)

symbolLength = uint32(700)

header, err = accountant.AccountBlob(ctx, symbolLength, quorums)
now = time.Now().UnixNano()
header, err = accountant.AccountBlob(ctx, now, symbolLength, quorums)

assert.NoError(t, err)
assert.NotEqual(t, 0, header.ReservationPeriod)
assert.NotEqual(t, uint64(0), header.Timestamp)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1200, 0, 200}, mapRecordUsage(accountant.periodRecords)), true)

// Second call should use on-demand payment
header, err = accountant.AccountBlob(ctx, 300, quorums)
now = time.Now().UnixNano()
header, err = accountant.AccountBlob(ctx, now, 300, quorums)

assert.NoError(t, err)
assert.Equal(t, uint32(0), header.ReservationPeriod)
assert.NotEqual(t, uint64(0), header.Timestamp)
assert.Equal(t, big.NewInt(300), header.CumulativePayment)
}

Expand All @@ -117,12 +120,12 @@ func TestAccountBlob_OnDemand(t *testing.T) {
ctx := context.Background()
numSymbols := uint32(1500)
quorums := []uint8{0, 1}

header, err := accountant.AccountBlob(ctx, numSymbols, quorums)
now := time.Now().UnixNano()
header, err := accountant.AccountBlob(ctx, now, numSymbols, quorums)
assert.NoError(t, err)

expectedPayment := big.NewInt(int64(numSymbols * pricePerSymbol))
assert.Equal(t, uint32(0), header.ReservationPeriod)
assert.NotEqual(t, uint64(0), header.Timestamp)
assert.Equal(t, expectedPayment, header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{0, 0, 0}, mapRecordUsage(accountant.periodRecords)), true)
assert.Equal(t, expectedPayment, accountant.cumulativePayment)
Expand All @@ -145,8 +148,8 @@ func TestAccountBlob_InsufficientOnDemand(t *testing.T) {
ctx := context.Background()
numSymbols := uint32(2000)
quorums := []uint8{0, 1}

_, err = accountant.AccountBlob(ctx, numSymbols, quorums)
now := time.Now().UnixNano()
_, err = accountant.AccountBlob(ctx, now, numSymbols, quorums)
assert.Contains(t, err.Error(), "neither reservation nor on-demand payment is available")
}

Expand All @@ -172,28 +175,33 @@ func TestAccountBlobCallSeries(t *testing.T) {

ctx := context.Background()
quorums := []uint8{0, 1}
now := time.Now().Unix()

now := time.Now().UnixNano()
// First call: Use reservation
header, err := accountant.AccountBlob(ctx, 800, quorums)
header, err := accountant.AccountBlob(ctx, now, 800, quorums)
assert.NoError(t, err)
assert.Equal(t, meterer.GetReservationPeriod(uint64(now), reservationWindow), header.ReservationPeriod)
timestamp := (time.Duration(header.Timestamp) * time.Nanosecond).Seconds()
assert.Equal(t, uint64(meterer.GetReservationPeriodByNanosecond(now, reservationWindow)), uint64(timestamp)/uint64(reservationWindow))
assert.Equal(t, big.NewInt(0), header.CumulativePayment)

// Second call: Use remaining reservation + overflow
header, err = accountant.AccountBlob(ctx, 300, quorums)
now = time.Now().UnixNano()
header, err = accountant.AccountBlob(ctx, now, 300, quorums)
assert.NoError(t, err)
assert.Equal(t, meterer.GetReservationPeriod(uint64(now), reservationWindow), header.ReservationPeriod)
timestamp = (time.Duration(header.Timestamp) * time.Nanosecond).Seconds()
assert.Equal(t, uint64(meterer.GetReservationPeriodByNanosecond(now, reservationWindow)), uint64(timestamp)/uint64(reservationWindow))
assert.Equal(t, big.NewInt(0), header.CumulativePayment)

// Third call: Use on-demand
header, err = accountant.AccountBlob(ctx, 500, quorums)
now = time.Now().UnixNano()
header, err = accountant.AccountBlob(ctx, now, 500, quorums)
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.ReservationPeriod)
assert.NotEqual(t, uint64(0), header.Timestamp)
assert.Equal(t, big.NewInt(500), header.CumulativePayment)

// Fourth call: Insufficient on-demand
_, err = accountant.AccountBlob(ctx, 600, quorums)
now = time.Now().UnixNano()
_, err = accountant.AccountBlob(ctx, now, 600, quorums)
assert.Error(t, err)
assert.Contains(t, err.Error(), "neither reservation nor on-demand payment is available")
}
Expand Down Expand Up @@ -222,20 +230,23 @@ func TestAccountBlob_BinRotation(t *testing.T) {
quorums := []uint8{0, 1}

// First call
_, err = accountant.AccountBlob(ctx, 800, quorums)
now := time.Now().UnixNano()
_, err = accountant.AccountBlob(ctx, now, 800, quorums)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.periodRecords)), true)

// next reservation duration
time.Sleep(1000 * time.Millisecond)

// Second call
_, err = accountant.AccountBlob(ctx, 300, quorums)
now = time.Now().UnixNano()
_, err = accountant.AccountBlob(ctx, now, 300, quorums)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{800, 300, 0}, mapRecordUsage(accountant.periodRecords)), true)

// Third call
_, err = accountant.AccountBlob(ctx, 500, quorums)
now = time.Now().UnixNano()
_, err = accountant.AccountBlob(ctx, now, 500, quorums)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{800, 800, 0}, mapRecordUsage(accountant.periodRecords)), true)
}
Expand Down Expand Up @@ -269,7 +280,8 @@ func TestConcurrentBinRotationAndAccountBlob(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, err := accountant.AccountBlob(ctx, 100, quorums)
now := time.Now().UnixNano()
_, err := accountant.AccountBlob(ctx, now, 100, quorums)
assert.NoError(t, err)
}()
}
Expand Down Expand Up @@ -304,25 +316,27 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) {

ctx := context.Background()
quorums := []uint8{0, 1}
now := time.Now().Unix()
now := time.Now().UnixNano()

// Okay reservation
header, err := accountant.AccountBlob(ctx, 800, quorums)
header, err := accountant.AccountBlob(ctx, now, 800, quorums)
assert.NoError(t, err)
assert.Equal(t, meterer.GetReservationPeriod(uint64(now), reservationWindow), header.ReservationPeriod)
timestamp := (time.Duration(header.Timestamp) * time.Nanosecond).Seconds()
assert.Equal(t, uint64(meterer.GetReservationPeriodByNanosecond(now, reservationWindow)), uint64(timestamp)/uint64(reservationWindow))
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{800, 0, 0}, mapRecordUsage(accountant.periodRecords)), true)

// Second call: Allow one overflow
header, err = accountant.AccountBlob(ctx, 500, quorums)
header, err = accountant.AccountBlob(ctx, now, 500, quorums)
assert.NoError(t, err)
assert.Equal(t, big.NewInt(0), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.periodRecords)), true)

// Third call: Should use on-demand payment
header, err = accountant.AccountBlob(ctx, 200, quorums)
now = time.Now().UnixNano()
header, err = accountant.AccountBlob(ctx, now, 200, quorums)
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.ReservationPeriod)
assert.NotEqual(t, uint64(0), header.Timestamp)
assert.Equal(t, big.NewInt(200), header.CumulativePayment)
assert.Equal(t, isRotation([]uint64{1300, 0, 300}, mapRecordUsage(accountant.periodRecords)), true)
}
Expand Down Expand Up @@ -351,12 +365,14 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
quorums := []uint8{0, 1}

// full reservation
_, err = accountant.AccountBlob(ctx, 1000, quorums)
now := time.Now().UnixNano()
_, err = accountant.AccountBlob(ctx, now, 1000, quorums)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.periodRecords)), true)

// no overflow
header, err := accountant.AccountBlob(ctx, 500, quorums)
now = time.Now().UnixNano()
header, err := accountant.AccountBlob(ctx, now, 500, quorums)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{1000, 0, 0}, mapRecordUsage(accountant.periodRecords)), true)
assert.Equal(t, big.NewInt(500), header.CumulativePayment)
Expand All @@ -365,7 +381,8 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
time.Sleep(time.Duration(reservationWindow) * time.Second)

// Third call: Should use new bin and allow overflow again
_, err = accountant.AccountBlob(ctx, 500, quorums)
now = time.Now().UnixNano()
_, err = accountant.AccountBlob(ctx, now, 500, quorums)
assert.NoError(t, err)
assert.Equal(t, isRotation([]uint64{1000, 500, 0}, mapRecordUsage(accountant.periodRecords)), true)
}
Expand Down
10 changes: 5 additions & 5 deletions api/clients/v2/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package clients
import (
"context"
"fmt"
"github.com/docker/go-units"
"sync"
"time"

"github.com/docker/go-units"

"github.com/Layr-Labs/eigenda/api"
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
Expand All @@ -24,7 +26,7 @@ type DisperserClientConfig struct {

type DisperserClient interface {
Close() error
DisperseBlob(ctx context.Context, data []byte, blobVersion corev2.BlobVersion, quorums []core.QuorumID, salt uint32) (*dispv2.BlobStatus, corev2.BlobKey, error)
DisperseBlob(ctx context.Context, data []byte, blobVersion corev2.BlobVersion, quorums []core.QuorumID) (*dispv2.BlobStatus, corev2.BlobKey, error)
GetBlobStatus(ctx context.Context, blobKey corev2.BlobKey) (*disperser_rpc.BlobStatusReply, error)
GetBlobCommitment(ctx context.Context, data []byte) (*disperser_rpc.BlobCommitmentReply, error)
}
Expand Down Expand Up @@ -125,7 +127,6 @@ func (c *disperserClient) DisperseBlob(
data []byte,
blobVersion corev2.BlobVersion,
quorums []core.QuorumID,
salt uint32,
) (*dispv2.BlobStatus, corev2.BlobKey, error) {
err := c.initOnceGrpcConnection()
if err != nil {
Expand All @@ -141,7 +142,7 @@ func (c *disperserClient) DisperseBlob(
}

symbolLength := encoding.GetBlobLengthPowerOf2(uint(len(data)))
payment, err := c.accountant.AccountBlob(ctx, uint32(symbolLength), quorums)
payment, err := c.accountant.AccountBlob(ctx, time.Now().UnixNano(), uint32(symbolLength), quorums)
if err != nil {
return nil, [32]byte{}, fmt.Errorf("error accounting blob: %w", err)
}
Expand Down Expand Up @@ -188,7 +189,6 @@ func (c *disperserClient) DisperseBlob(
BlobCommitments: blobCommitments,
QuorumNumbers: quorums,
PaymentMetadata: *payment,
Salt: salt,
}

sig, err := c.signer.SignBlobRequest(blobHeader)
Expand Down
15 changes: 7 additions & 8 deletions api/clients/v2/disperser_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,26 @@ import (

func TestVerifyReceivedBlobKey(t *testing.T) {
blobCommitments := encoding.BlobCommitments{
Commitment: &encoding.G1Commitment{},
Commitment: &encoding.G1Commitment{},
LengthCommitment: &encoding.G2Commitment{},
LengthProof: &encoding.LengthProof{},
Length: 4,
LengthProof: &encoding.LengthProof{},
Length: 4,
}

quorumNumbers := make([]core.QuorumID, 1)
quorumNumbers[0] = 8

paymentMetadata := core.PaymentMetadata{
AccountID: "asdf",
ReservationPeriod: 5,
AccountID: "asdf",
Timestamp: 5,
CumulativePayment: big.NewInt(6),
}

blobHeader := &corev2.BlobHeader{
BlobVersion: 0,
BlobVersion: 0,
BlobCommitments: blobCommitments,
QuorumNumbers: quorumNumbers,
QuorumNumbers: quorumNumbers,
PaymentMetadata: paymentMetadata,
Salt: 9,
}

realKey, err := blobHeader.BlobKey()
Expand Down
6 changes: 1 addition & 5 deletions api/clients/v2/payload_disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ func (pd *PayloadDisperser) SendPayload(
ctx context.Context,
// payload is the raw data to be stored on eigenDA
payload []byte,
// salt is added while constructing the blob header
// This salt should be utilized if a blob dispersal fails, in order to retry dispersing the same payload under a
// different blob key, when using reserved bandwidth payments.
salt uint32,
) (*verification.EigenDACert, error) {

blobBytes, err := pd.codec.EncodeBlob(payload)
Expand All @@ -157,7 +153,7 @@ func (pd *PayloadDisperser) SendPayload(
blobBytes,
pd.config.BlobVersion,
pd.config.Quorums,
salt)
)
if err != nil {
return nil, fmt.Errorf("disperse blob: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion api/clients/v2/verification/conversion_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func blobHeaderProtoToBinding(inputHeader *commonv2.BlobHeader) (*contractEigenD
QuorumNumbers: quorumNumbers,
Commitment: *convertedBlobCommitment,
PaymentHeaderHash: paymentHeaderHash,
Salt: inputHeader.GetSalt(),
}, nil
}

Expand Down
Loading

0 comments on commit d145679

Please sign in to comment.