Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Changelog for NeoFS Node

### Fixed
- Write cache using too much CPU (#3642)
- Basic income payments (#3357)

### Changed
- Move `fschain_autodeploy` into `fschain.disable_autodeploy` in IR config (#3619)
Expand All @@ -18,6 +19,7 @@ Changelog for NeoFS Node
- Move `contracts` into `mainnet.contracts` in IR config (#3619)
- Move `persistent_sessions` data to `persistent_state` in SN (#3630)
- More efficient write cache batching for small objects (#3414)
- Storage nodes do not serve unpaid containers (#774)

### Removed
- Pre-0.46.0 write cache format migration (#3647)
Expand Down
1 change: 1 addition & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ type cfgMorph struct {
}

type cfgContainer struct {
payments *paymentChecker
parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers
Expand Down
82 changes: 82 additions & 0 deletions cmd/neofs-node/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func initContainerService(c *cfg) {
})
}

initPaymentChecker(c)
initSizeLoadReports(c)

cnrSrv := containerService.New(&c.key.PrivateKey, c.networkState, c.cli, (*containersInChain)(&c.basics), c.nCli)
Expand Down Expand Up @@ -361,6 +362,87 @@ func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationPar
c.cfgContainer.parsers[typ] = p
}

func initPaymentChecker(c *cfg) {
var (
l = c.log.With(zap.String("component", "paymentChecker"))
p = &paymentChecker{
m: sync.RWMutex{},
statuses: make(map[cid.ID]int64),
cnrCli: c.cCli,
}
)

if c.cfgContainer.parsers == nil {
c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser)
}
if c.cfgContainer.subscribers == nil {
c.cfgContainer.subscribers = make(map[event.Type][]event.Handler)
}
const changeUnpaidStatusEventName = "ChangePaymentStatus"
setContainerNotificationParser(c, changeUnpaidStatusEventName, containerEvent.ParseChangePaymentStatus)
addContainerAsyncNotificationHandler(c, changeUnpaidStatusEventName, func(e event.Event) {
ev := e.(containerEvent.ChangePaymentStatus)
p.m.Lock()
defer p.m.Unlock()

cID := cid.ID(ev.ContainerID)
if ev.Unpaid {
l.Info("container status has changed to unpaid",
zap.Stringer("cID", cID),
zap.Uint64("epoch", ev.Epoch))

p.statuses[cID] = int64(ev.Epoch)
} else {
l.Debug("container status has changed to paid",
zap.Stringer("cID", cID),
zap.Uint64("epoch", ev.Epoch))

p.statuses[cID] = -1
}
})

c.cfgContainer.payments = p
}

type paymentChecker struct {
m sync.RWMutex
statuses map[cid.ID]int64

cnrCli *cntClient.Client
}

func (p *paymentChecker) resetCache() {
p.m.Lock()
defer p.m.Unlock()

clear(p.statuses)
}

func (p *paymentChecker) IsPaid(cID cid.ID) (int64, error) {
p.m.RLock()
epoch, ok := p.statuses[cID]
p.m.RUnlock()

if ok {
return epoch, nil
}

p.m.Lock()
defer p.m.Unlock()
epoch, ok = p.statuses[cID]
if ok {
return epoch, nil
}

epoch, err := p.cnrCli.UnpaidContainer(cID)
if err != nil {
return 0, fmt.Errorf("FS chain RPC call: %w", err)
}
p.statuses[cID] = epoch

return epoch, nil
}

func (c *cfg) PublicKey() []byte {
return nodeKeyFromNetmap(c)
}
Expand Down
1 change: 1 addition & 0 deletions cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func (c *cfg) restartMorph() error {
c.log.Info("restarting internal services because of RPC connection loss...")

c.resetCaches()
c.cfgContainer.payments.resetCache()

epoch, ni, err := getNetworkState(c)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func initObjectService(c *cfg) {
os := &objectSource{signer: neofsecdsa.SignerRFC6979(c.key.PrivateKey), get: sGet}
sPut := putsvc.NewService(&transport{clients: putConstructor}, c, c.metaService,
initQuotas(c.cCli, c.cfgObject.quotasTTL),
c.cfgContainer.payments,
putsvc.WithNetworkMagic(mNumber),
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(putConstructor),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/nspcc-dev/locode-db v0.8.1
github.com/nspcc-dev/neo-go v0.113.0
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea
github.com/nspcc-dev/neofs-contract v0.24.0
github.com/nspcc-dev/neofs-contract v0.24.1-0.20251023234316-33fcbda86b2f
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.15.0.20251015122943-b38583ddd311
github.com/nspcc-dev/tzhash v1.8.3
github.com/panjf2000/ants/v2 v2.11.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20250923153235-ffb84619d02f h1:EO
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20250923153235-ffb84619d02f/go.mod h1:X2spkE8hK/l08CYulOF19fpK5n3p2xO0L1GnJFIywQg=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea h1:mK0EMGLvunXcFyq7fBURS/CsN4MH+4nlYiqn6pTwWAU=
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea/go.mod h1:YzhD4EZmC9Z/PNyd7ysC7WXgIgURc9uCG1UWDeV027Y=
github.com/nspcc-dev/neofs-contract v0.24.0 h1:lQHtfRc00WEhW9qcnVNbM2sMa4oCBQ5v7vcunJKk9rA=
github.com/nspcc-dev/neofs-contract v0.24.0/go.mod h1:PPxjwRiK6hhXPXduvyojEqLMHNpgPaF+rULPhdFlzDg=
github.com/nspcc-dev/neofs-contract v0.24.1-0.20251023234316-33fcbda86b2f h1:ao93kD+71aqq69yG/Zh91o0F3ycaMFnmaUOUynBZznU=
github.com/nspcc-dev/neofs-contract v0.24.1-0.20251023234316-33fcbda86b2f/go.mod h1:uDF8RHDeOHAJFfaSvAwAQB/s0NcHrH65GudosWHmFqQ=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.15.0.20251015122943-b38583ddd311 h1:iHjokyLIiOW7zvaNZZPmch0c1tghwkfOniL+JOt+F7M=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.15.0.20251015122943-b38583ddd311/go.mod h1:Vukuf6qDOQESOWAx5yOjYtVC5wdsQp3hiZrxbJIa2fs=
github.com/nspcc-dev/rfc6979 v0.2.4 h1:NBgsdCjhLpEPJZqmC9rciMZDcSY297po2smeaRjw57k=
Expand Down
1 change: 0 additions & 1 deletion pkg/innerring/innerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *config.Config, errChan chan<
State: server,
ContainerClient: cnrClient,
NetmapClient: server.netmapClient,
BalanceClient: server.balanceClient,
},
settlement.WithLogger(server.log.With(zap.String("component", "basicIncomeProcessor"))),
)
Expand Down
139 changes: 8 additions & 131 deletions pkg/innerring/processors/settlement/calls.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,12 @@
package settlement

import (
"crypto/ecdsa"
"crypto/elliptic"
"math/big"
"slices"
"sync"

"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/common"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/user"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type payment struct {
owner user.ID
amount *big.Int
}
type incomeReceivers struct {
m sync.Mutex
nodes map[string][]payment
}

func (ir *incomeReceivers) add(nodeKey string, p payment) {
ir.m.Lock()
defer ir.m.Unlock()

if payers, ok := ir.nodes[nodeKey]; ok {
i := slices.IndexFunc(ir.nodes[nodeKey], func(registeredPayment payment) bool {
return registeredPayment.owner == p.owner
})
if i == -1 {
payers = append(payers, p)
ir.nodes[nodeKey] = payers

return
}

payers[i].amount.Add(payers[i].amount, p.amount)

return
}

ir.nodes[nodeKey] = []payment{p}
}

func (p *Processor) HandleBasicIncomeEvent(e event.Event) {
ev := e.(BasicIncomeEvent)
epoch := ev.Epoch()
Expand Down Expand Up @@ -77,111 +36,29 @@ func (p *Processor) HandleBasicIncomeEvent(e event.Event) {
return
}

l.Info("start basic income calculation...")

payments := p.calculatePayments(l, rate, cnrs)
l.Info("start basic income distribution...", zap.Int("numberOfContainers", len(cnrs)))

l.Info("basic income calculated, transfer tokens...", zap.Int("numberOfRecievers", len(payments.nodes)))

p.distributePayments(l, epoch, payments)
p.sendPaymentTXs(l, cnrs)

l.Debug("finished basic income distribution")
}

func (p *Processor) calculatePayments(l *zap.Logger, paymentRate uint64, cnrs []cid.ID) *incomeReceivers {
var (
wg errgroup.Group
transferTable = incomeReceivers{nodes: make(map[string][]payment)}
)

func (p *Processor) sendPaymentTXs(l *zap.Logger, cnrs []cid.ID) {
var wg errgroup.Group
wg.SetLimit(parallelFactor)
for _, cID := range cnrs {
wg.Go(func() error {
cnr, err := p.cnrClient.Get(cID[:])
err := p.cnrClient.Pay(cID)
if err != nil {
l.Warn("failed to get container, it will be skipped", zap.Stringer("cID", cID), zap.Error(err))
l.Error("could not send payment transaction", zap.Stringer("cID", cID), zap.Error(err))
return nil
}
owner := cnr.Owner()

reports, err := p.cnrClient.NodeReports(cID)
if err != nil {
l.Warn("failed to get container reports, container will be skipped", zap.Stringer("cID", cID), zap.Error(err))
return nil
}
for _, r := range reports {
switch {
case r.StorageSize == 0:
l.Debug("skipping container with zero storage size", zap.Stringer("cID", cID))
return nil
case r.StorageSize < 0:
l.Warn("skipping container with negative storage size",
zap.Stringer("cID", cID), zap.Int64("storageSize", r.StorageSize))
return nil
default:
}

transferTable.add(string(r.Reporter), payment{owner: owner, amount: calculateBasicSum(uint64(r.StorageSize), paymentRate)})
}
l.Debug("successfully sent transfer", zap.Stringer("cID", cID))

return nil
})
}
_ = wg.Wait() // always nil errors are returned in routines

return &transferTable
}

func (p *Processor) distributePayments(l *zap.Logger, epoch uint64, payments *incomeReceivers) {
var (
wg errgroup.Group
distributionDetails = common.BasicIncomeDistributionDetails(epoch)
)
// r/o operations only, no mutex needed to be taken
for rawKey, payers := range payments.nodes {
pubKey, err := keys.NewPublicKeyFromBytes([]byte(rawKey), elliptic.P256())
if err != nil {
l.Warn("failed to decode public key, skip income receiver",
zap.Binary("reporter", []byte(rawKey)), zap.Error(err))
continue
}
receiverID := user.NewFromECDSAPublicKey(ecdsa.PublicKey(*pubKey))

// TODO: batch it?
for _, payer := range payers {
wg.Go(func() error {
l := l.With(
zap.Stringer("sender", payer.owner),
zap.Stringer("recipient", receiverID),
zap.Stringer("amount (GASe-12)", payer.amount),
zap.Binary("paymentDetails", distributionDetails))

err := p.balanceClient.TransferX(payer.owner, receiverID, payer.amount, distributionDetails)
if err != nil {
l.Error("could not send transfer", zap.Error(err))
return nil
}

l.Debug("successfully sent transfer")

return nil
})
}

_ = wg.Wait()
}
}

func calculateBasicSum(size, rate uint64) *big.Int {
bigRate := big.NewInt(int64(rate))

price := big.NewInt(int64(size))
price.Mul(price, bigRate)
price.Div(price, bigGB)

if price.Sign() == 0 {
price.Add(price, bigOne)
}

return price
_ = wg.Wait()
}
23 changes: 0 additions & 23 deletions pkg/innerring/processors/settlement/common/details.go

This file was deleted.

14 changes: 0 additions & 14 deletions pkg/innerring/processors/settlement/common/details_test.go

This file was deleted.

Loading
Loading