Skip to content

Commit df9d134

Browse files
Recreate lost EC parts (#3628)
2 parents 359fee6 + 93b920c commit df9d134

File tree

21 files changed

+1410
-19
lines changed

21 files changed

+1410
-19
lines changed

cmd/neofs-node/object.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func initObjectService(c *cfg) {
199199
),
200200
)
201201

202-
c.policer = policer.New(
202+
c.policer = policer.New(neofsecdsa.Signer(c.key.PrivateKey),
203203
policer.WithLogger(c.log),
204204
policer.WithLocalStorage(ls),
205205
policer.WithRemoteHeader(

internal/ec/ec.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,25 @@ func DecodeRange(rule Rule, fromIdx, toIdx int, parts [][]byte) error {
112112
return nil
113113
}
114114

115+
// DecodeIndexes decodes specified EC parts obtained by applying specified rule.
116+
func DecodeIndexes(rule Rule, parts [][]byte, idxs []int) error {
117+
rs, err := newCoderForRule(rule)
118+
if err != nil {
119+
return err
120+
}
121+
122+
required := make([]bool, rule.DataPartNum+rule.ParityPartNum)
123+
for i := range idxs {
124+
required[idxs[i]] = true
125+
}
126+
127+
if err := rs.ReconstructSome(parts, required); err != nil {
128+
return fmt.Errorf("restore Reed-Solomon: %w", err)
129+
}
130+
131+
return nil
132+
}
133+
115134
func newCoderForRule(rule Rule) (reedsolomon.Encoder, error) {
116135
enc, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum))
117136
if err != nil { // should never happen with correct rule

pkg/local_object_storage/engine/ec.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,75 @@ loop:
203203

204204
return 0, nil, apistatus.ErrObjectNotFound
205205
}
206+
207+
// HeadECPart is similar to [StorageEngine.GetECPart] but returns only the header.
208+
func (e *StorageEngine) HeadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, error) {
209+
if e.metrics != nil {
210+
defer elapsed(e.metrics.AddHeadECPartDuration)()
211+
}
212+
213+
e.blockMtx.RLock()
214+
defer e.blockMtx.RUnlock()
215+
if e.blockErr != nil {
216+
return object.Object{}, e.blockErr
217+
}
218+
219+
// TODO: sync placement with PUT. They should sort shards equally, but now PUT sorts by part ID.
220+
// https://github.com/nspcc-dev/neofs-node/issues/3537
221+
s := e.sortShardsFn(e, oid.NewAddress(cnr, parent))
222+
223+
var partID oid.ID
224+
loop:
225+
for i := range s {
226+
hdr, err := s[i].shardIface.HeadECPart(cnr, parent, pi)
227+
switch {
228+
case err == nil:
229+
return hdr, nil
230+
case errors.Is(err, apistatus.ErrObjectAlreadyRemoved):
231+
return object.Object{}, err
232+
case errors.Is(err, meta.ErrObjectIsExpired):
233+
return object.Object{}, apistatus.ErrObjectNotFound // like Get
234+
case errors.As(err, (*ierrors.ObjectID)(&partID)):
235+
if partID.IsZero() {
236+
panic("zero object ID returned as error")
237+
}
238+
239+
e.log.Info("EC part's object ID resolved in shard but reading failed, continue by ID",
240+
zap.Stringer("container", cnr), zap.Stringer("parent", parent),
241+
zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index),
242+
zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err))
243+
// TODO: need report error? Same for other places. https://github.com/nspcc-dev/neofs-node/issues/3538
244+
245+
s = s[i+1:]
246+
break loop
247+
case errors.Is(err, apistatus.ErrObjectNotFound):
248+
default:
249+
e.log.Info("failed to get EC part header from shard, ignore error",
250+
zap.Stringer("container", cnr), zap.Stringer("parent", parent),
251+
zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index),
252+
zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err))
253+
}
254+
}
255+
256+
if partID.IsZero() {
257+
return object.Object{}, apistatus.ErrObjectNotFound
258+
}
259+
260+
for i := range s {
261+
// get an object bypassing the metabase. We can miss deletion or expiration mark. Get behaves like this, so here too.
262+
hdr, err := s[i].shardIface.Head(oid.NewAddress(cnr, partID), true)
263+
switch {
264+
case err == nil:
265+
return *hdr, nil
266+
case errors.Is(err, apistatus.ErrObjectNotFound):
267+
default:
268+
e.log.Info("failed to get EC part header from shard bypassing metabase, ignore error",
269+
zap.Stringer("container", cnr), zap.Stringer("parent", parent),
270+
zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index),
271+
zap.Stringer("partID", partID),
272+
zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err))
273+
}
274+
}
275+
276+
return object.Object{}, apistatus.ErrObjectNotFound
277+
}

0 commit comments

Comments
 (0)