diff --git a/CHANGELOG.md b/CHANGELOG.md index 910157ff29..bf5a0d4d58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Changelog for NeoFS Node ### Fixed - Write cache using too much CPU (#3642) +- Basic income payments (#3357) ### Changed - Move `fschain_autodeploy` into `fschain.disable_autodeploy` in IR config (#3619) @@ -18,6 +19,7 @@ Changelog for NeoFS Node - Move `contracts` into `mainnet.contracts` in IR config (#3619) - Move `persistent_sessions` data to `persistent_state` in SN (#3630) - More efficient write cache batching for small objects (#3414) +- Storage nodes do not serve unpaid containers (#774) ### Removed - Pre-0.46.0 write cache format migration (#3647) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index ce59fb233f..be44c18010 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -242,6 +242,7 @@ type cfgMorph struct { } type cfgContainer struct { + payments *paymentChecker parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 7d97b2e776..7b94efac03 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -85,6 +85,7 @@ func initContainerService(c *cfg) { }) } + initPaymentChecker(c) initSizeLoadReports(c) cnrSrv := containerService.New(&c.key.PrivateKey, c.networkState, c.cli, (*containersInChain)(&c.basics), c.nCli) @@ -361,6 +362,87 @@ func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationPar c.cfgContainer.parsers[typ] = p } +func initPaymentChecker(c *cfg) { + var ( + l = c.log.With(zap.String("component", "paymentChecker")) + p = &paymentChecker{ + m: sync.RWMutex{}, + statuses: make(map[cid.ID]int64), + cnrCli: c.cCli, + } + ) + + if c.cfgContainer.parsers == nil { + c.cfgContainer.parsers = make(map[event.Type]event.NotificationParser) + } + if c.cfgContainer.subscribers == nil { + c.cfgContainer.subscribers = make(map[event.Type][]event.Handler) + } + const changeUnpaidStatusEventName = "ChangePaymentStatus" + setContainerNotificationParser(c, changeUnpaidStatusEventName, containerEvent.ParseChangePaymentStatus) + addContainerAsyncNotificationHandler(c, changeUnpaidStatusEventName, func(e event.Event) { + ev := e.(containerEvent.ChangePaymentStatus) + p.m.Lock() + defer p.m.Unlock() + + cID := cid.ID(ev.ContainerID) + if ev.Unpaid { + l.Info("container status has changed to unpaid", + zap.Stringer("cID", cID), + zap.Uint64("epoch", ev.Epoch)) + + p.statuses[cID] = int64(ev.Epoch) + } else { + l.Debug("container status has changed to paid", + zap.Stringer("cID", cID), + zap.Uint64("epoch", ev.Epoch)) + + p.statuses[cID] = -1 + } + }) + + c.cfgContainer.payments = p +} + +type paymentChecker struct { + m sync.RWMutex + statuses map[cid.ID]int64 + + cnrCli *cntClient.Client +} + +func (p *paymentChecker) resetCache() { + p.m.Lock() + defer p.m.Unlock() + + clear(p.statuses) +} + +func (p *paymentChecker) IsPaid(cID cid.ID) (int64, error) { + p.m.RLock() + epoch, ok := p.statuses[cID] + p.m.RUnlock() + + if ok { + return epoch, nil + } + + p.m.Lock() + defer p.m.Unlock() + epoch, ok = p.statuses[cID] + if ok { + return epoch, nil + } + + epoch, err := p.cnrCli.UnpaidContainer(cID) + if err != nil { + return 0, fmt.Errorf("FS chain RPC call: %w", err) + } + p.statuses[cID] = epoch + + return epoch, nil +} + func (c *cfg) PublicKey() []byte { return nodeKeyFromNetmap(c) } diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index af8635d21a..70ce143c9c 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -209,6 +209,7 @@ func (c *cfg) restartMorph() error { c.log.Info("restarting internal services because of RPC connection loss...") c.resetCaches() + c.cfgContainer.payments.resetCache() epoch, ni, err := getNetworkState(c) if err != nil { diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index bf3d0858ce..3a96c70bd5 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -243,6 +243,7 @@ func initObjectService(c *cfg) { os := &objectSource{signer: neofsecdsa.SignerRFC6979(c.key.PrivateKey), get: sGet} sPut := putsvc.NewService(&transport{clients: putConstructor}, c, c.metaService, initQuotas(c.cCli, c.cfgObject.quotasTTL), + c.cfgContainer.payments, putsvc.WithNetworkMagic(mNumber), putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), diff --git a/go.mod b/go.mod index dc541bbd1d..a72b22aa4a 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/nspcc-dev/locode-db v0.8.1 github.com/nspcc-dev/neo-go v0.113.0 github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea - github.com/nspcc-dev/neofs-contract v0.24.0 + github.com/nspcc-dev/neofs-contract v0.24.1-0.20251023234316-33fcbda86b2f github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.15.0.20251015122943-b38583ddd311 github.com/nspcc-dev/tzhash v1.8.3 github.com/panjf2000/ants/v2 v2.11.3 diff --git a/go.sum b/go.sum index 736a591435..820cc01b4e 100644 --- a/go.sum +++ b/go.sum @@ -197,8 +197,8 @@ github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20250923153235-ffb84619d02f h1:EO github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20250923153235-ffb84619d02f/go.mod h1:X2spkE8hK/l08CYulOF19fpK5n3p2xO0L1GnJFIywQg= github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea h1:mK0EMGLvunXcFyq7fBURS/CsN4MH+4nlYiqn6pTwWAU= github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea/go.mod h1:YzhD4EZmC9Z/PNyd7ysC7WXgIgURc9uCG1UWDeV027Y= -github.com/nspcc-dev/neofs-contract v0.24.0 h1:lQHtfRc00WEhW9qcnVNbM2sMa4oCBQ5v7vcunJKk9rA= -github.com/nspcc-dev/neofs-contract v0.24.0/go.mod h1:PPxjwRiK6hhXPXduvyojEqLMHNpgPaF+rULPhdFlzDg= +github.com/nspcc-dev/neofs-contract v0.24.1-0.20251023234316-33fcbda86b2f h1:ao93kD+71aqq69yG/Zh91o0F3ycaMFnmaUOUynBZznU= +github.com/nspcc-dev/neofs-contract v0.24.1-0.20251023234316-33fcbda86b2f/go.mod h1:uDF8RHDeOHAJFfaSvAwAQB/s0NcHrH65GudosWHmFqQ= github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.15.0.20251015122943-b38583ddd311 h1:iHjokyLIiOW7zvaNZZPmch0c1tghwkfOniL+JOt+F7M= github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.15.0.20251015122943-b38583ddd311/go.mod h1:Vukuf6qDOQESOWAx5yOjYtVC5wdsQp3hiZrxbJIa2fs= github.com/nspcc-dev/rfc6979 v0.2.4 h1:NBgsdCjhLpEPJZqmC9rciMZDcSY297po2smeaRjw57k= diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 425f1670b1..b80e26d6a9 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -736,7 +736,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *config.Config, errChan chan< State: server, ContainerClient: cnrClient, NetmapClient: server.netmapClient, - BalanceClient: server.balanceClient, }, settlement.WithLogger(server.log.With(zap.String("component", "basicIncomeProcessor"))), ) diff --git a/pkg/innerring/processors/settlement/calls.go b/pkg/innerring/processors/settlement/calls.go index 5218243994..7d3aa7dc31 100644 --- a/pkg/innerring/processors/settlement/calls.go +++ b/pkg/innerring/processors/settlement/calls.go @@ -1,53 +1,12 @@ package settlement import ( - "crypto/ecdsa" - "crypto/elliptic" - "math/big" - "slices" - "sync" - - "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/common" "github.com/nspcc-dev/neofs-node/pkg/morph/event" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) -type payment struct { - owner user.ID - amount *big.Int -} -type incomeReceivers struct { - m sync.Mutex - nodes map[string][]payment -} - -func (ir *incomeReceivers) add(nodeKey string, p payment) { - ir.m.Lock() - defer ir.m.Unlock() - - if payers, ok := ir.nodes[nodeKey]; ok { - i := slices.IndexFunc(ir.nodes[nodeKey], func(registeredPayment payment) bool { - return registeredPayment.owner == p.owner - }) - if i == -1 { - payers = append(payers, p) - ir.nodes[nodeKey] = payers - - return - } - - payers[i].amount.Add(payers[i].amount, p.amount) - - return - } - - ir.nodes[nodeKey] = []payment{p} -} - func (p *Processor) HandleBasicIncomeEvent(e event.Event) { ev := e.(BasicIncomeEvent) epoch := ev.Epoch() @@ -77,111 +36,29 @@ func (p *Processor) HandleBasicIncomeEvent(e event.Event) { return } - l.Info("start basic income calculation...") - - payments := p.calculatePayments(l, rate, cnrs) + l.Info("start basic income distribution...", zap.Int("numberOfContainers", len(cnrs))) - l.Info("basic income calculated, transfer tokens...", zap.Int("numberOfRecievers", len(payments.nodes))) - - p.distributePayments(l, epoch, payments) + p.sendPaymentTXs(l, cnrs) l.Debug("finished basic income distribution") } -func (p *Processor) calculatePayments(l *zap.Logger, paymentRate uint64, cnrs []cid.ID) *incomeReceivers { - var ( - wg errgroup.Group - transferTable = incomeReceivers{nodes: make(map[string][]payment)} - ) - +func (p *Processor) sendPaymentTXs(l *zap.Logger, cnrs []cid.ID) { + var wg errgroup.Group wg.SetLimit(parallelFactor) for _, cID := range cnrs { wg.Go(func() error { - cnr, err := p.cnrClient.Get(cID[:]) + err := p.cnrClient.Pay(cID) if err != nil { - l.Warn("failed to get container, it will be skipped", zap.Stringer("cID", cID), zap.Error(err)) + l.Error("could not send payment transaction", zap.Stringer("cID", cID), zap.Error(err)) return nil } - owner := cnr.Owner() - reports, err := p.cnrClient.NodeReports(cID) - if err != nil { - l.Warn("failed to get container reports, container will be skipped", zap.Stringer("cID", cID), zap.Error(err)) - return nil - } - for _, r := range reports { - switch { - case r.StorageSize == 0: - l.Debug("skipping container with zero storage size", zap.Stringer("cID", cID)) - return nil - case r.StorageSize < 0: - l.Warn("skipping container with negative storage size", - zap.Stringer("cID", cID), zap.Int64("storageSize", r.StorageSize)) - return nil - default: - } - - transferTable.add(string(r.Reporter), payment{owner: owner, amount: calculateBasicSum(uint64(r.StorageSize), paymentRate)}) - } + l.Debug("successfully sent transfer", zap.Stringer("cID", cID)) return nil }) } - _ = wg.Wait() // always nil errors are returned in routines - - return &transferTable -} - -func (p *Processor) distributePayments(l *zap.Logger, epoch uint64, payments *incomeReceivers) { - var ( - wg errgroup.Group - distributionDetails = common.BasicIncomeDistributionDetails(epoch) - ) - // r/o operations only, no mutex needed to be taken - for rawKey, payers := range payments.nodes { - pubKey, err := keys.NewPublicKeyFromBytes([]byte(rawKey), elliptic.P256()) - if err != nil { - l.Warn("failed to decode public key, skip income receiver", - zap.Binary("reporter", []byte(rawKey)), zap.Error(err)) - continue - } - receiverID := user.NewFromECDSAPublicKey(ecdsa.PublicKey(*pubKey)) - - // TODO: batch it? - for _, payer := range payers { - wg.Go(func() error { - l := l.With( - zap.Stringer("sender", payer.owner), - zap.Stringer("recipient", receiverID), - zap.Stringer("amount (GASe-12)", payer.amount), - zap.Binary("paymentDetails", distributionDetails)) - - err := p.balanceClient.TransferX(payer.owner, receiverID, payer.amount, distributionDetails) - if err != nil { - l.Error("could not send transfer", zap.Error(err)) - return nil - } - - l.Debug("successfully sent transfer") - - return nil - }) - } - - _ = wg.Wait() - } -} - -func calculateBasicSum(size, rate uint64) *big.Int { - bigRate := big.NewInt(int64(rate)) - - price := big.NewInt(int64(size)) - price.Mul(price, bigRate) - price.Div(price, bigGB) - - if price.Sign() == 0 { - price.Add(price, bigOne) - } - return price + _ = wg.Wait() } diff --git a/pkg/innerring/processors/settlement/common/details.go b/pkg/innerring/processors/settlement/common/details.go deleted file mode 100644 index 47399b60d0..0000000000 --- a/pkg/innerring/processors/settlement/common/details.go +++ /dev/null @@ -1,23 +0,0 @@ -package common - -import ( - "encoding/binary" -) - -var ( - basicIncomeDistributionPrefix = []byte{0x42} -) - -func BasicIncomeDistributionDetails(epoch uint64) []byte { - return details(basicIncomeDistributionPrefix, epoch) -} - -func details(prefix []byte, epoch uint64) []byte { - prefixLen := len(prefix) - buf := make([]byte, prefixLen+8) - - copy(buf, prefix) - binary.LittleEndian.PutUint64(buf[prefixLen:], epoch) - - return buf -} diff --git a/pkg/innerring/processors/settlement/common/details_test.go b/pkg/innerring/processors/settlement/common/details_test.go deleted file mode 100644 index 1be8b7c646..0000000000 --- a/pkg/innerring/processors/settlement/common/details_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package common - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestBasicIncomeDistributionDetails(t *testing.T) { - var n uint64 = 1994 // 0x7CA - exp := []byte{0x42, 0xCA, 0x07, 0, 0, 0, 0, 0, 0} - got := BasicIncomeDistributionDetails(n) - require.Equal(t, exp, got) -} diff --git a/pkg/innerring/processors/settlement/processor.go b/pkg/innerring/processors/settlement/processor.go index 601ae98b32..bf332c8eac 100644 --- a/pkg/innerring/processors/settlement/processor.go +++ b/pkg/innerring/processors/settlement/processor.go @@ -1,22 +1,13 @@ package settlement import ( - "math/big" - - balanceClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance" containerClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" netmapClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" "go.uber.org/zap" ) -var ( - bigGB = big.NewInt(1 << 30) - bigOne = big.NewInt(1) -) - -// parallelFactor is a max parallel routines number doing settlement -// calculations/handlings. -const parallelFactor = 10 +// parallelFactor is a max parallel routines number sending payment transactions. +const parallelFactor = 100 type ( // AlphabetState is a callback interface for inner ring global state. @@ -30,9 +21,8 @@ type ( state AlphabetState - cnrClient *containerClient.Client - nmClient *netmapClient.Client - balanceClient *balanceClient.Client + cnrClient *containerClient.Client + nmClient *netmapClient.Client } // Prm groups the required parameters of Processor's constructor. @@ -40,7 +30,6 @@ type ( State AlphabetState ContainerClient *containerClient.Client NetmapClient *netmapClient.Client - BalanceClient *balanceClient.Client } ) @@ -53,10 +42,9 @@ func New(prm Prm, opts ...Option) *Processor { } return &Processor{ - log: o.log, - state: prm.State, - cnrClient: prm.ContainerClient, - nmClient: prm.NetmapClient, - balanceClient: prm.BalanceClient, + log: o.log, + state: prm.State, + cnrClient: prm.ContainerClient, + nmClient: prm.NetmapClient, } } diff --git a/pkg/morph/client/container/payment.go b/pkg/morph/client/container/payment.go new file mode 100644 index 0000000000..0ec0644c52 --- /dev/null +++ b/pkg/morph/client/container/payment.go @@ -0,0 +1,49 @@ +package container + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/morph/client" + fschaincontracts "github.com/nspcc-dev/neofs-node/pkg/morph/contracts" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" +) + +// Pay invokes transfers from container's owner to storage nodes that maintain +// objects inside the container. Always sends a notary request with Alphabet +// multi-signature. Always awaits transaction inclusion. +func (c *Client) Pay(cID cid.ID) error { + prm := client.InvokePrm{} + prm.SetMethod(fschaincontracts.PayContainerMethod) + prm.SetArgs(cID[:]) + prm.RequireAlphabetSignature() + prm.Await() + + err := c.client.Invoke(prm) + if err != nil { + return fmt.Errorf("could not invoke method (%s): %w", fschaincontracts.PayContainerMethod, err) + } + + return nil +} + +// UnpaidContainer checks if container has been marked as unpaid. Returns epoch +// mark was put. A negative epoch means container has not been marked as unpaid. +func (c *Client) UnpaidContainer(cID cid.ID) (int64, error) { + prm := client.TestInvokePrm{} + prm.SetMethod(fschaincontracts.UnpaidContainerMethod) + prm.SetArgs(cID[:]) + + res, err := c.client.TestInvoke(prm) + if err != nil { + return 0, fmt.Errorf("could not invoke method (%s): %w", fschaincontracts.UnpaidContainerMethod, err) + } + if len(res) != 1 { + return 0, fmt.Errorf("stack has unexpected items: %d (want %d)", len(res), 1) + } + unpaidEpoch, err := client.IntFromStackItem(res[0]) + if err != nil { + return 0, fmt.Errorf("not an int on stack: %w", err) + } + + return unpaidEpoch, nil +} diff --git a/pkg/morph/contracts/methods.go b/pkg/morph/contracts/methods.go index 7575d33ce5..dca4a755a8 100644 --- a/pkg/morph/contracts/methods.go +++ b/pkg/morph/contracts/methods.go @@ -4,6 +4,8 @@ package fschaincontracts const ( CreateContainerMethod = "create" RemoveContainerMethod = "remove" + PayContainerMethod = "pay" + UnpaidContainerMethod = "unpaid" PutContainerEACLMethod = "putEACL" PutContainerReportMethod = "putReport" GetReportsSummaryMethod = "getNodeReportSummary" diff --git a/pkg/morph/event/container/payment.go b/pkg/morph/event/container/payment.go new file mode 100644 index 0000000000..214eff6dd9 --- /dev/null +++ b/pkg/morph/event/container/payment.go @@ -0,0 +1,42 @@ +package container + +import ( + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/util" + containerrpc "github.com/nspcc-dev/neofs-contract/rpc/container" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" +) + +// ChangePaymentStatus structure of container.ChangePaymentStatus notification from FS chain. +type ChangePaymentStatus struct { + ContainerID util.Uint256 + Epoch uint64 + Unpaid bool +} + +// MorphEvent implements Neo:Morph Event interface. +func (ChangePaymentStatus) MorphEvent() {} + +// ParseChangePaymentStatus from notification into [ChangePaymentStatus] structure. +func ParseChangePaymentStatus(e *state.ContainedNotificationEvent) (event.Event, error) { + var ( + ev ChangePaymentStatus + rpcEv containerrpc.ChangePaymentStatusEvent + ) + + err := rpcEv.FromStackItem(e.Item) + if err != nil { + return nil, fmt.Errorf("could not parse stack items from notify event: %w", err) + } + if rpcEv.Epoch.Sign() <= 0 { + return nil, fmt.Errorf("negative epoch") + } + + ev.Unpaid = rpcEv.Unpaid + ev.ContainerID = rpcEv.ContainerID + ev.Epoch = rpcEv.Epoch.Uint64() + + return ev, nil +} diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 01cadd5003..973fef1b65 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -19,6 +19,15 @@ import ( "go.uber.org/zap" ) +// PaymentChecker defines interface that must keep container's payment status +// up-to-date. +type PaymentChecker interface { + // IsPaid returns -1 epoch if container is paid and any non-negative epoch + // if container was unpaid starting from that epoch. + // It must return any error that does not allow ensure payment status. + IsPaid(cID cid.ID) (unpaidFromEpoch int64, err error) +} + // QuotaLimiter describes limits for used space. type QuotaLimiter interface { // AvailableQuotasLeft must return (soft limit, hard limit) pair for @@ -127,6 +136,7 @@ type cfg struct { metaSvc *meta.Meta quotaLimiter QuotaLimiter + payments PaymentChecker } func defaultCfg() *cfg { @@ -136,7 +146,7 @@ func defaultCfg() *cfg { } } -func NewService(transport Transport, neoFSNet NeoFSNetwork, m *meta.Meta, q QuotaLimiter, opts ...Option) *Service { +func NewService(transport Transport, neoFSNet NeoFSNetwork, m *meta.Meta, q QuotaLimiter, p PaymentChecker, opts ...Option) *Service { c := defaultCfg() for i := range opts { @@ -151,6 +161,7 @@ func NewService(transport Transport, neoFSNet NeoFSNetwork, m *meta.Meta, q Quot c.fmtValidator = object.NewFormatValidator(fmtValidatorChain, neoFSNet, c.cnrSrc, c.fmtValidatorOpts...) c.metaSvc = m c.quotaLimiter = q + c.payments = p return &Service{ cfg: c, diff --git a/pkg/services/object/put/service_test.go b/pkg/services/object/put/service_test.go index e5286927cf..6a1629e6bf 100644 --- a/pkg/services/object/put/service_test.go +++ b/pkg/services/object/put/service_test.go @@ -68,6 +68,90 @@ func (q quotas) AvailableQuotasLeft(cID cid.ID, owner user.ID) (uint64, uint64, return q.soft, q.hard, nil } +type payments struct { + m map[cid.ID]int64 +} + +func (p *payments) IsPaid(cID cid.ID) (unpaidFromEpoch int64, err error) { + if p.m == nil { + return -1, nil + } + + e, ok := p.m[cID] + if !ok { + return -1, nil + } + + return e, nil +} + +func TestPayments(t *testing.T) { + var ( + cluster = newTestClusterForRepPolicy(t, 1, 1, 1) + nodeKey = neofscryptotest.ECDSAPrivateKey() + cnr = containertest.Container() + rep = netmap.ReplicaDescriptor{} + p = netmaptest.PlacementPolicy() + nodeWorkerPool, _ = ants.NewPool(1, ants.WithNonblocking(true)) + cID = cidtest.ID() + owner = usertest.User() + payments = &payments{map[cid.ID]int64{cID: 123}} + ) + + rep.SetNumberOfObjects(1) + p.SetReplicas([]netmap.ReplicaDescriptor{rep}) + cnr.SetPlacementPolicy(p) + + s := NewService(cluster.nodeServices, &cluster.nodeNetworks[0], nil, + quotas{math.MaxUint64, math.MaxUint64}, + payments, + WithLogger(zaptest.NewLogger(t)), + WithKeyStorage(objutil.NewKeyStorage(&nodeKey, cluster.nodeSessions[0], &cluster.nodeNetworks[0])), + WithObjectStorage(&cluster.nodeLocalStorages[0]), + WithMaxSizeSource(mockMaxSize(maxObjectSize)), + WithContainerSource(mockContainer(cnr)), + WithNetworkState(&cluster.nodeNetworks[0]), + WithClientConstructor(cluster.nodeServices), + WithSplitChainVerifier(mockSplitVerifier{}), + WithRemoteWorkerPool(nodeWorkerPool), + ) + + stream, err := s.Put(context.Background()) + require.NoError(t, err) + + var sessionToken session.Object + sessionToken.SetID(uuid.New()) + sessionToken.SetExp(1) + sessionToken.BindContainer(cID) + sessionToken.SetAuthKey(cluster.nodeSessions[0].signer.Public()) + require.NoError(t, sessionToken.Sign(owner)) + + req := &protoobject.PutRequest{ + MetaHeader: &protosession.RequestMetaHeader{ + Ttl: 2, + SessionToken: sessionToken.ProtoMessage(), + }, + } + commonPrm, err := objutil.CommonPrmFromRequest(req) + if err != nil { + panic(err) + } + + o := objecttest.Object() + o.SetContainerID(cID) + o.ResetRelations() + ip := new(PutInitPrm). + WithObject(&o). + WithCommonPrm(commonPrm) + + err = stream.Init(ip) + require.ErrorContains(t, err, "container is unpaid") + + payments.m[cID] = -1 + + require.NoError(t, stream.Init(ip)) +} + func TestQuotas(t *testing.T) { const hardLimit = 2 var ( @@ -87,6 +171,7 @@ func TestQuotas(t *testing.T) { s := NewService(cluster.nodeServices, &cluster.nodeNetworks[0], nil, quotas{hard: hardLimit}, + &payments{}, WithLogger(zaptest.NewLogger(t)), WithKeyStorage(objutil.NewKeyStorage(&nodeKey, cluster.nodeSessions[0], &cluster.nodeNetworks[0])), WithObjectStorage(&cluster.nodeLocalStorages[0]), @@ -511,6 +596,7 @@ func newTestClusterForRepPolicyWithContainer(t *testing.T, repNodes, cnrReserveN cluster.nodeServices[i] = NewService(cluster.nodeServices, &cluster.nodeNetworks[i], nil, quotas{math.MaxUint64, math.MaxUint64}, + &payments{}, WithLogger(zaptest.NewLogger(t).With(zap.Int("node", i))), WithKeyStorage(objutil.NewKeyStorage(&nodeKey, cluster.nodeSessions[i], &cluster.nodeNetworks[i])), WithObjectStorage(&cluster.nodeLocalStorages[i]), diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 54e2862d5a..16d655291a 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -13,6 +13,7 @@ import ( neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/user" + "go.uber.org/zap" ) type Streamer struct { @@ -164,6 +165,17 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { return errors.New("missing container ID") } + unpaidSinceEpoch, err := p.payments.IsPaid(idCnr) + if err != nil { + p.log.Warn("cannot check container payment status, allow object PUT", + zap.Stringer("cID", idCnr), + zap.Error(err)) + } else { + if unpaidSinceEpoch >= 0 { + return fmt.Errorf("container is unpaid since epoch %d", unpaidSinceEpoch) + } + } + // get container to store the object prm.cnr, err = p.cnrSrc.Get(idCnr) if err != nil {