Skip to content

Commit 2cde16f

Browse files
committed
WIP
Signed-off-by: Leonard Lyubich <[email protected]>
1 parent d38cb71 commit 2cde16f

File tree

17 files changed

+859
-28
lines changed

17 files changed

+859
-28
lines changed

cmd/neofs-node/object.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func initObjectService(c *cfg) {
199199
),
200200
)
201201

202-
c.policer = policer.New(
202+
c.policer = policer.New(neofsecdsa.Signer(c.key.PrivateKey),
203203
policer.WithLogger(c.log),
204204
policer.WithLocalStorage(ls),
205205
policer.WithRemoteHeader(

internal/ec/ec.go

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ec
22

33
import (
44
"fmt"
5+
"iter"
56
"slices"
67
"strconv"
78

@@ -40,10 +41,9 @@ func Encode(rule Rule, data []byte) ([][]byte, error) {
4041
return make([][]byte, rule.DataPartNum+rule.ParityPartNum), nil
4142
}
4243

43-
// TODO: Explore reedsolomon.Option for performance improvement. https://github.com/nspcc-dev/neofs-node/issues/3501
44-
enc, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum))
45-
if err != nil { // should never happen with correct rule
46-
return nil, fmt.Errorf("init Reed-Solomon encoder: %w", err)
44+
enc, err := newCoderForRule(rule)
45+
if err != nil {
46+
return nil, err
4747
}
4848

4949
parts, err := enc.Split(data)
@@ -61,26 +61,44 @@ func Encode(rule Rule, data []byte) ([][]byte, error) {
6161
// Decode decodes source data of known len from EC parts obtained by applying
6262
// specified rule.
6363
func Decode(rule Rule, dataLen uint64, parts [][]byte) ([]byte, error) {
64-
// TODO: Explore reedsolomon.Option for performance improvement. https://github.com/nspcc-dev/neofs-node/issues/3501
65-
dec, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum))
66-
if err != nil { // should never happen with correct rule
67-
return nil, fmt.Errorf("init Reed-Solomon decoder: %w", err)
64+
if err := decodeIndex(rule, parts, func(yield func(int) bool) {
65+
for i := range int(rule.DataPartNum) {
66+
if !yield(i) {
67+
return
68+
}
69+
}
70+
}); err != nil {
71+
return nil, err
6872
}
6973

70-
required := make([]bool, rule.DataPartNum+rule.ParityPartNum)
71-
for i := range rule.DataPartNum {
72-
required[i] = true
74+
if got := islices.TwoDimSliceElementCount(parts[:rule.DataPartNum]); uint64(got) < dataLen {
75+
return nil, fmt.Errorf("sum len of received data parts is less than full len: %d < %d", got, dataLen)
76+
}
77+
78+
return ConcatDataParts(rule, dataLen, parts), nil
79+
}
80+
81+
// DecodeIndexes decodes specified EC parts obtained by applying specified rule.
82+
func DecodeIndexes(rule Rule, parts [][]byte, idxs []int) error {
83+
return decodeIndex(rule, parts, slices.Values(idxs))
84+
}
85+
86+
func decodeIndex(rule Rule, parts [][]byte, idxs iter.Seq[int]) error {
87+
rs, err := newCoderForRule(rule)
88+
if err != nil {
89+
return err
7390
}
7491

75-
if err := dec.ReconstructSome(parts, required); err != nil {
76-
return nil, fmt.Errorf("restore Reed-Solomon: %w", err)
92+
required := make([]bool, rule.DataPartNum+rule.ParityPartNum)
93+
for idx := range idxs {
94+
required[idx] = true
7795
}
7896

79-
if got := islices.TwoDimSliceElementCount(parts[:rule.DataPartNum]); uint64(got) < dataLen {
80-
return nil, fmt.Errorf("sum len of received data parts is less than full len: %d < %d", got, dataLen)
97+
if err := rs.ReconstructSome(parts, required); err != nil {
98+
return fmt.Errorf("restore Reed-Solomon: %w", err)
8199
}
82100

83-
return ConcatDataParts(rule, dataLen, parts), nil
101+
return nil
84102
}
85103

86104
// ConcatDataParts returns a new slice of dataLen bytes originating given EC
@@ -91,3 +109,13 @@ func ConcatDataParts(rule Rule, dataLen uint64, parts [][]byte) []byte {
91109
// TODO: last part may be shorter, do not overallocate buffer.
92110
return slices.Concat(parts[:rule.DataPartNum]...)[:dataLen]
93111
}
112+
113+
func newCoderForRule(rule Rule) (reedsolomon.Encoder, error) {
114+
// TODO: Explore reedsolomon.Option for performance improvement. https://github.com/nspcc-dev/neofs-node/issues/3501
115+
enc, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum))
116+
if err != nil { // should never happen with correct rule
117+
return nil, fmt.Errorf("init Reed-Solomon decoder: %w", err)
118+
}
119+
120+
return enc, nil
121+
}

internal/ec/objects.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,25 @@ func GetPartInfo(obj object.Object) (PartInfo, error) {
5353

5454
// FormObjectForECPart forms object for EC part produced from given parent object.
5555
func FormObjectForECPart(signer neofscrypto.Signer, parent object.Object, part []byte, partInfo PartInfo) (object.Object, error) {
56+
return formObjectForECPart(signer, parent, part, partInfo, true)
57+
}
58+
59+
// FormObjectForECPartWithoutSession is similar to [FormObjectForECPart] but
60+
// leaves session token unset.
61+
func FormObjectForECPartWithoutSession(signer neofscrypto.Signer, parent object.Object, part []byte, partInfo PartInfo) (object.Object, error) {
62+
return formObjectForECPart(signer, parent, part, partInfo, false)
63+
}
64+
65+
func formObjectForECPart(signer neofscrypto.Signer, parent object.Object, part []byte, partInfo PartInfo, withSession bool) (object.Object, error) {
5666
var obj object.Object
5767
obj.SetVersion(parent.Version())
5868
obj.SetContainerID(parent.GetContainerID())
5969
obj.SetOwner(parent.Owner())
6070
obj.SetCreationEpoch(parent.CreationEpoch())
6171
obj.SetType(object.TypeRegular)
62-
obj.SetSessionToken(parent.SessionToken())
72+
if withSession {
73+
obj.SetSessionToken(parent.SessionToken())
74+
}
6375

6476
obj.SetParent(&parent)
6577
iobject.SetIntAttribute(&obj, AttributeRuleIdx, partInfo.RuleIndex)

pkg/local_object_storage/engine/ec.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,76 @@ loop:
104104

105105
return object.Object{}, nil, apistatus.ErrObjectNotFound
106106
}
107+
108+
// HeadECPart is similar to [StorageEngine.GetECPart] but returns only the header.
109+
// TODO: unit tests.
110+
func (e *StorageEngine) HeadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, error) {
111+
if e.metrics != nil {
112+
defer elapsed(e.metrics.AddHeadECPartDuration)()
113+
}
114+
115+
e.blockMtx.RLock()
116+
defer e.blockMtx.RUnlock()
117+
if e.blockErr != nil {
118+
return object.Object{}, e.blockErr
119+
}
120+
121+
// TODO: sync placement with PUT. They should sort shards equally, but now PUT sorts by part ID.
122+
// https://github.com/nspcc-dev/neofs-node/issues/3537
123+
s := e.sortShardsFn(e, oid.NewAddress(cnr, parent))
124+
125+
var partID oid.ID
126+
loop:
127+
for i := range s {
128+
hdr, err := s[i].shardIface.HeadECPart(cnr, parent, pi)
129+
switch {
130+
case err == nil:
131+
return hdr, nil
132+
case errors.Is(err, apistatus.ErrObjectAlreadyRemoved):
133+
return object.Object{}, err
134+
case errors.Is(err, meta.ErrObjectIsExpired):
135+
return object.Object{}, apistatus.ErrObjectNotFound // like Get
136+
case errors.As(err, (*ierrors.ObjectID)(&partID)):
137+
if partID.IsZero() {
138+
panic("zero object ID returned as error")
139+
}
140+
141+
e.log.Info("EC part's object ID resolved in shard but reading failed, continue by ID",
142+
zap.Stringer("container", cnr), zap.Stringer("parent", parent),
143+
zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index),
144+
zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err))
145+
// TODO: need report error? Same for other places. https://github.com/nspcc-dev/neofs-node/issues/3538
146+
147+
s = s[i+1:]
148+
break loop
149+
case errors.Is(err, apistatus.ErrObjectNotFound):
150+
default:
151+
e.log.Info("failed to get EC part header from shard, ignore error",
152+
zap.Stringer("container", cnr), zap.Stringer("parent", parent),
153+
zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index),
154+
zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err))
155+
}
156+
}
157+
158+
if partID.IsZero() {
159+
return object.Object{}, apistatus.ErrObjectNotFound
160+
}
161+
162+
for i := range s {
163+
// get an object bypassing the metabase. We can miss deletion or expiration mark. Get behaves like this, so here too.
164+
hdr, err := s[i].shardIface.Head(oid.NewAddress(cnr, partID), true)
165+
switch {
166+
case err == nil:
167+
return *hdr, nil
168+
case errors.Is(err, apistatus.ErrObjectNotFound):
169+
default:
170+
e.log.Info("failed to get EC part header from shard bypassing metabase, ignore error",
171+
zap.Stringer("container", cnr), zap.Stringer("parent", parent),
172+
zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index),
173+
zap.Stringer("partID", partID),
174+
zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err))
175+
}
176+
}
177+
178+
return object.Object{}, apistatus.ErrObjectNotFound
179+
}

pkg/local_object_storage/engine/engine.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/nspcc-dev/neofs-node/pkg/util"
1616
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
1717
"github.com/nspcc-dev/neofs-sdk-go/object"
18+
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
1819
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1920
"go.uber.org/zap"
2021
)
@@ -47,6 +48,8 @@ type shardInterface interface {
4748
ID() *shard.ID
4849
GetStream(oid.Address, bool) (*object.Object, io.ReadCloser, error)
4950
GetECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, io.ReadCloser, error)
51+
Head(oid.Address, bool) (*objectSDK.Object, error)
52+
HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error)
5053
}
5154

5255
type shardWrapper struct {

pkg/local_object_storage/engine/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type MetricRegister interface {
1818
AddSearchDuration(d time.Duration)
1919
AddListObjectsDuration(d time.Duration)
2020
AddGetECPartDuration(d time.Duration)
21+
AddHeadECPartDuration(d time.Duration)
2122

2223
SetObjectCounter(shardID, objectType string, v uint64)
2324
AddToObjectCounter(shardID, objectType string, delta int)

pkg/local_object_storage/shard/ec.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,33 @@ func (s *Shard) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Ob
6565

6666
return *hdr, rdr, nil
6767
}
68+
69+
// HeadECPart is similar to [Shard.GetECPart] but returns only the header.
70+
// TODO: unit tests.
71+
func (s *Shard) HeadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, error) {
72+
partID, err := s.metaBaseIface.ResolveECPart(cnr, parent, pi)
73+
if err != nil {
74+
return object.Object{}, fmt.Errorf("resolve part ID in metabase: %w", err)
75+
}
76+
77+
partAddr := oid.NewAddress(cnr, partID)
78+
if s.hasWriteCache() {
79+
hdr, err := s.writeCache.Head(partAddr)
80+
if err == nil {
81+
return *hdr, nil
82+
}
83+
84+
if errors.Is(err, apistatus.ErrObjectNotFound) {
85+
s.log.Debug("EC part object is missing in write-cache, fallback to BLOB storage", zap.Stringer("partAddr", partAddr), zap.Error(err))
86+
} else {
87+
s.log.Info("failed to get EC part object header from write-cache, fallback to BLOB storage", zap.Stringer("partAddr", partAddr), zap.Error(err))
88+
}
89+
}
90+
91+
hdr, err := s.blobStor.Head(partAddr)
92+
if err != nil {
93+
return object.Object{}, fmt.Errorf("get header from BLOB storage by ID %w: %w", ierrors.ObjectID(partID), err)
94+
}
95+
96+
return *hdr, nil
97+
}

pkg/metrics/engine.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type (
2121
searchDuration prometheus.Histogram
2222
listObjectsDuration prometheus.Histogram
2323
getECPartDuration prometheus.Histogram
24+
headECPartDuration prometheus.Histogram
2425

2526
containerSize prometheus.GaugeVec
2627
payloadSize prometheus.GaugeVec
@@ -122,6 +123,13 @@ func newEngineMetrics() engineMetrics {
122123
Name: "get_ec_part_time",
123124
Help: "Engine 'get EC part' operations handling time",
124125
})
126+
127+
headECPartDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
128+
Namespace: storageNodeNameSpace,
129+
Subsystem: engineSubsystem,
130+
Name: "head_ec_part_time",
131+
Help: "Engine 'head EC part' operations handling time",
132+
})
125133
)
126134

127135
var (
@@ -161,6 +169,7 @@ func newEngineMetrics() engineMetrics {
161169
searchDuration: searchDuration,
162170
listObjectsDuration: listObjectsDuration,
163171
getECPartDuration: getECPartDuration,
172+
headECPartDuration: headECPartDuration,
164173
containerSize: *containerSize,
165174
payloadSize: *payloadSize,
166175
capacitySize: *capacitySize,
@@ -181,6 +190,7 @@ func (m engineMetrics) register() {
181190
prometheus.MustRegister(m.searchDuration)
182191
prometheus.MustRegister(m.listObjectsDuration)
183192
prometheus.MustRegister(m.getECPartDuration)
193+
prometheus.MustRegister(m.headECPartDuration)
184194
prometheus.MustRegister(m.containerSize)
185195
prometheus.MustRegister(m.payloadSize)
186196
prometheus.MustRegister(m.capacitySize)
@@ -238,6 +248,10 @@ func (m engineMetrics) AddGetECPartDuration(d time.Duration) {
238248
m.getECPartDuration.Observe(d.Seconds())
239249
}
240250

251+
func (m engineMetrics) AddHeadECPartDuration(d time.Duration) {
252+
m.headECPartDuration.Observe(d.Seconds())
253+
}
254+
241255
func (m engineMetrics) AddToContainerSize(cnrID string, size int64) {
242256
m.containerSize.With(
243257
prometheus.Labels{

pkg/services/object/get/ec.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,20 @@ func (s *Service) copyLocalECPart(dst ObjectWriter, cnr cid.ID, parent oid.ID, p
4747
return nil
4848
}
4949

50+
// similar to copyLocalECPart but returns only the header.
51+
func (s *Service) copyLocalECPartHeader(dst internal.HeaderWriter, cnr cid.ID, parent oid.ID, pi iec.PartInfo) error {
52+
hdr, err := s.localObjects.HeadECPart(cnr, parent, pi)
53+
if err != nil {
54+
return fmt.Errorf("get object header from local storage: %w", err)
55+
}
56+
57+
if err := dst.WriteHeader(&hdr); err != nil {
58+
return fmt.Errorf("write header: %w", err)
59+
}
60+
61+
return nil
62+
}
63+
5064
func (s *Service) copyECObject(ctx context.Context, cnr cid.ID, parent oid.ID, sTok *session.Object,
5165
ecRules []iec.Rule, sortedNodeLists [][]netmap.NodeInfo, dst ObjectWriter) error {
5266
obj, err := s.restoreFromECParts(ctx, cnr, parent, sTok, ecRules, sortedNodeLists)
@@ -527,3 +541,19 @@ func checkECAttributesInReceivedObject(hdr object.Object, ruleIdx, partIdx strin
527541

528542
return fmt.Errorf("not all EC attributes received: requested %d, got %d", expected, found)
529543
}
544+
545+
func checkPartRequestAgainstPolicy(ecRules []iec.Rule, pi iec.PartInfo) error {
546+
if len(ecRules) == 0 {
547+
return errors.New("EC part requested in container without EC policy")
548+
}
549+
550+
if pi.RuleIndex >= len(ecRules) {
551+
return fmt.Errorf("EC rule index overflows container policy: idx=%d,rules=%d", pi.RuleIndex, len(ecRules))
552+
}
553+
554+
if total := ecRules[pi.RuleIndex].DataPartNum + ecRules[pi.RuleIndex].ParityPartNum; pi.Index >= int(total) {
555+
return fmt.Errorf("EC part index overflows container policy: idx=%d,parts=%d", pi.Index, total)
556+
}
557+
558+
return nil
559+
}

pkg/services/object/get/get.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,26 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
9797
// Returns ErrNotFound if the header was not received for the call.
9898
// Returns SplitInfoError if object is virtual and raw flag is set.
9999
func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
100+
pi, err := checkECPartInfoRequest(prm.common.XHeaders())
101+
if err != nil {
102+
// TODO: track https://github.com/nspcc-dev/neofs-api/issues/269.
103+
return fmt.Errorf("invalid request: %w", err)
104+
}
105+
100106
nodeLists, repRules, ecRules, err := s.neoFSNet.GetNodesForObject(prm.addr)
101107
if err != nil {
102108
return fmt.Errorf("get nodes for object: %w", err)
103109
}
104110

111+
if pi.RuleIndex >= 0 {
112+
if err := checkPartRequestAgainstPolicy(ecRules, pi); err != nil {
113+
// TODO: track https://github.com/nspcc-dev/neofs-api/issues/269.
114+
return fmt.Errorf("invalid request: %w", err)
115+
}
116+
// TODO: deny if node is not in the container?
117+
return s.copyLocalECPartHeader(prm.objWriter, prm.addr.Container(), prm.addr.Object(), pi)
118+
}
119+
105120
if len(repRules) > 0 {
106121
err := s.get(ctx, prm.commonPrm, headOnly(), withPreSortedContainerNodes(nodeLists[:len(repRules)], repRules)).err
107122
if len(ecRules) == 0 || !errors.Is(err, apistatus.ErrObjectNotFound) {

0 commit comments

Comments
 (0)