Skip to content

Commit 1263882

Browse files
committed
sn/policer: Recreate unavailable EC parts in container when possible
Extend `Policer` with workers checking availability of EC parts. Free worker takes the job each time some EC part is processed by regular Policer routine. If no more than N parts are unavailable (where N is a number of parity parts in the container policy), they are recreated using available ones. Recreated object has the same ID but signed by SN itself. To ensure such objects are accepted, the corresponding checks are relaxed for EC parts. Closes #3554. Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 86161a2 commit 1263882

File tree

24 files changed

+1406
-25
lines changed

24 files changed

+1406
-25
lines changed

cmd/neofs-node/object.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func initObjectService(c *cfg) {
199199
),
200200
)
201201

202-
c.policer = policer.New(
202+
c.policer = policer.New(neofsecdsa.Signer(c.key.PrivateKey),
203203
policer.WithLogger(c.log),
204204
policer.WithLocalStorage(ls),
205205
policer.WithRemoteHeader(

internal/crypto/object.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
)
1414

1515
// AuthenticateObject checks whether obj is signed correctly by its owner.
16-
func AuthenticateObject(obj object.Object, fsChain HistoricN3ScriptRunner) error {
16+
func AuthenticateObject(obj object.Object, fsChain HistoricN3ScriptRunner, ecPart bool) error {
1717
sig := obj.Signature()
1818
if sig == nil {
1919
return errMissingSignature
@@ -39,7 +39,7 @@ func AuthenticateObject(obj object.Object, fsChain HistoricN3ScriptRunner) error
3939

4040
if sessionToken != nil {
4141
// NOTE: update this place for non-ECDSA schemes
42-
if !sessionToken.AssertAuthKey((*neofsecdsa.PublicKey)(ecdsaPub)) { // same format for all ECDSA schemes
42+
if !ecPart && !sessionToken.AssertAuthKey((*neofsecdsa.PublicKey)(ecdsaPub)) { // same format for all ECDSA schemes
4343
return errors.New("session token is not for object's signer")
4444
}
4545
if err := AuthenticateToken(sessionToken, fsChain); err != nil {
@@ -57,7 +57,7 @@ func AuthenticateObject(obj object.Object, fsChain HistoricN3ScriptRunner) error
5757
if !verifyECDSAFns[scheme](*ecdsaPub, sig.Value(), obj.GetID().Marshal()) {
5858
return schemeError(scheme, errSignatureMismatch)
5959
}
60-
if sessionToken == nil && user.NewFromECDSAPublicKey(*ecdsaPub) != obj.Owner() {
60+
if sessionToken == nil && !ecPart && user.NewFromECDSAPublicKey(*ecdsaPub) != obj.Owner() {
6161
return errors.New("owner mismatches signature")
6262
}
6363
case neofscrypto.N3:

internal/crypto/object_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ import (
2020
func TestAuthenticateObject(t *testing.T) {
2121
t.Run("without signature", func(t *testing.T) {
2222
obj := getUnsignedObject()
23-
require.EqualError(t, icrypto.AuthenticateObject(obj, nil), "missing signature")
23+
require.EqualError(t, icrypto.AuthenticateObject(obj, nil, false), "missing signature")
2424
})
2525
t.Run("unsupported scheme", func(t *testing.T) {
2626
obj := objectECDSASHA512
2727
sig := *obj.Signature()
2828
sig.SetScheme(4)
2929
obj.SetSignature(&sig)
30-
require.EqualError(t, icrypto.AuthenticateObject(obj, nil), "unsupported scheme 4")
30+
require.EqualError(t, icrypto.AuthenticateObject(obj, nil, false), "unsupported scheme 4")
3131
})
3232
t.Run("invalid public key", func(t *testing.T) {
3333
for _, tc := range []struct {
@@ -48,7 +48,7 @@ func TestAuthenticateObject(t *testing.T) {
4848
sig := *obj.Signature()
4949
sig.SetPublicKeyBytes(tc.changePub(sig.PublicKeyBytes()))
5050
obj.SetSignature(&sig)
51-
err := icrypto.AuthenticateObject(obj, nil)
51+
err := icrypto.AuthenticateObject(obj, nil, false)
5252
require.EqualError(t, err, "scheme ECDSA_SHA512: decode public key: "+tc.err)
5353
})
5454
}
@@ -70,7 +70,7 @@ func TestAuthenticateObject(t *testing.T) {
7070
cp[i]++
7171
sig.SetValue(cp)
7272
tc.obj.SetSignature(&sig)
73-
err := icrypto.AuthenticateObject(tc.obj, nil)
73+
err := icrypto.AuthenticateObject(tc.obj, nil, false)
7474
require.EqualError(t, err, fmt.Sprintf("scheme %s: signature mismatch", tc.scheme))
7575
}
7676
})
@@ -86,7 +86,7 @@ func TestAuthenticateObject(t *testing.T) {
8686
{scheme: neofscrypto.ECDSA_WALLETCONNECT, object: wrongOwnerObjectECDSAWalletConnect},
8787
} {
8888
t.Run(tc.scheme.String(), func(t *testing.T) {
89-
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil), "owner mismatches signature")
89+
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil, false), "owner mismatches signature")
9090
})
9191
}
9292
})
@@ -101,7 +101,7 @@ func TestAuthenticateObject(t *testing.T) {
101101
{scheme: neofscrypto.ECDSA_WALLETCONNECT, object: objectWithNoIssuerSessionECDSAWalletConnect},
102102
} {
103103
t.Run(tc.scheme.String(), func(t *testing.T) {
104-
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil), "session token: missing issuer")
104+
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil, false), "session token: missing issuer")
105105
})
106106
}
107107
})
@@ -115,7 +115,7 @@ func TestAuthenticateObject(t *testing.T) {
115115
{scheme: neofscrypto.ECDSA_WALLETCONNECT, object: objectWithWrongIssuerSessionECDSAWalletConnect},
116116
} {
117117
t.Run(tc.scheme.String(), func(t *testing.T) {
118-
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil), "session token: issuer mismatches signature")
118+
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil, false), "session token: issuer mismatches signature")
119119
})
120120
}
121121
})
@@ -129,7 +129,7 @@ func TestAuthenticateObject(t *testing.T) {
129129
{scheme: neofscrypto.ECDSA_WALLETCONNECT, object: objectWithWrongSessionSubjectECDSAWalletConnect},
130130
} {
131131
t.Run(tc.scheme.String(), func(t *testing.T) {
132-
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil), "session token is not for object's signer")
132+
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil, false), "session token is not for object's signer")
133133
})
134134
}
135135
})
@@ -143,7 +143,7 @@ func TestAuthenticateObject(t *testing.T) {
143143
{scheme: neofscrypto.ECDSA_WALLETCONNECT, object: objectWithWrongOwnerSessionECDSAWalletConnect},
144144
} {
145145
t.Run(tc.scheme.String(), func(t *testing.T) {
146-
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil), "different object owner and session issuer")
146+
require.EqualError(t, icrypto.AuthenticateObject(tc.object, nil, false), "different object owner and session issuer")
147147
})
148148
}
149149
})
@@ -160,7 +160,7 @@ func TestAuthenticateObject(t *testing.T) {
160160
{name: neofscrypto.ECDSA_WALLETCONNECT.String() + " with session", object: objectWithSessionECDSAWalletConnect},
161161
} {
162162
t.Run(tc.name, func(t *testing.T) {
163-
require.NoError(t, icrypto.AuthenticateObject(tc.object, nil))
163+
require.NoError(t, icrypto.AuthenticateObject(tc.object, nil, false))
164164
})
165165
}
166166
}

internal/ec/ec.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,25 @@ func DecodeRange(rule Rule, fromIdx, toIdx int, parts [][]byte) error {
112112
return nil
113113
}
114114

115+
// DecodeIndexes decodes specified EC parts obtained by applying specified rule.
116+
func DecodeIndexes(rule Rule, parts [][]byte, idxs []int) error {
117+
rs, err := newCoderForRule(rule)
118+
if err != nil {
119+
return err
120+
}
121+
122+
required := make([]bool, rule.DataPartNum+rule.ParityPartNum)
123+
for i := range idxs {
124+
required[idxs[i]] = true
125+
}
126+
127+
if err := rs.ReconstructSome(parts, required); err != nil {
128+
return fmt.Errorf("restore Reed-Solomon: %w", err)
129+
}
130+
131+
return nil
132+
}
133+
115134
func newCoderForRule(rule Rule) (reedsolomon.Encoder, error) {
116135
enc, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum))
117136
if err != nil { // should never happen with correct rule

pkg/core/object/ec_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,19 @@ func TestFormatValidator_Validate_EC(t *testing.T) {
249249
require.EqualError(t, v.Validate(&cp, false), "object with EC attributes __NEOFS__EC_RULE_IDX in container without EC rules")
250250
})
251251

252+
t.Run("part created by 3rd party", func(t *testing.T) {
253+
otherCreator := usertest.User()
254+
for i := range parts {
255+
obj, err := iec.FormObjectForECPart(otherCreator, parent, parts[i], iec.PartInfo{
256+
RuleIndex: ruleIdx,
257+
Index: i,
258+
})
259+
require.NoError(t, err)
260+
261+
require.NoError(t, v.Validate(&obj, false))
262+
}
263+
})
264+
252265
for i := range ecParts {
253266
require.NoError(t, v.Validate(&ecParts[i], false))
254267
}

pkg/core/object/fmt.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (v *FormatValidator) validate(obj *object.Object, unprepared, isParent bool
235235
if err := icrypto.AuthenticateObject(*obj, historicN3ScriptRunner{
236236
FSChain: v.fsChain,
237237
NetmapContract: v.netmapContract,
238-
}); err != nil {
238+
}, isEC); err != nil {
239239
return fmt.Errorf("authenticate: %w", err)
240240
}
241241

pkg/local_object_storage/engine/ec.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,75 @@ loop:
203203

204204
return 0, nil, apistatus.ErrObjectNotFound
205205
}
206+
207+
// HeadECPart is similar to [StorageEngine.GetECPart] but returns only the header.
208+
func (e *StorageEngine) HeadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, error) {
209+
if e.metrics != nil {
210+
defer elapsed(e.metrics.AddHeadECPartDuration)()
211+
}
212+
213+
e.blockMtx.RLock()
214+
defer e.blockMtx.RUnlock()
215+
if e.blockErr != nil {
216+
return object.Object{}, e.blockErr
217+
}
218+
219+
// TODO: sync placement with PUT. They should sort shards equally, but now PUT sorts by part ID.
220+
// https://github.com/nspcc-dev/neofs-node/issues/3537
221+
s := e.sortShardsFn(e, oid.NewAddress(cnr, parent))
222+
223+
var partID oid.ID
224+
loop:
225+
for i := range s {
226+
hdr, err := s[i].shardIface.HeadECPart(cnr, parent, pi)
227+
switch {
228+
case err == nil:
229+
return hdr, nil
230+
case errors.Is(err, apistatus.ErrObjectAlreadyRemoved):
231+
return object.Object{}, err
232+
case errors.Is(err, meta.ErrObjectIsExpired):
233+
return object.Object{}, apistatus.ErrObjectNotFound // like Get
234+
case errors.As(err, (*ierrors.ObjectID)(&partID)):
235+
if partID.IsZero() {
236+
panic("zero object ID returned as error")
237+
}
238+
239+
e.log.Info("EC part's object ID resolved in shard but reading failed, continue by ID",
240+
zap.Stringer("container", cnr), zap.Stringer("parent", parent),
241+
zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index),
242+
zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err))
243+
// TODO: need report error? Same for other places. https://github.com/nspcc-dev/neofs-node/issues/3538
244+
245+
s = s[i+1:]
246+
break loop
247+
case errors.Is(err, apistatus.ErrObjectNotFound):
248+
default:
249+
e.log.Info("failed to get EC part header from shard, ignore error",
250+
zap.Stringer("container", cnr), zap.Stringer("parent", parent),
251+
zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index),
252+
zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err))
253+
}
254+
}
255+
256+
if partID.IsZero() {
257+
return object.Object{}, apistatus.ErrObjectNotFound
258+
}
259+
260+
for i := range s {
261+
// get an object bypassing the metabase. We can miss deletion or expiration mark. Get behaves like this, so here too.
262+
hdr, err := s[i].shardIface.Head(oid.NewAddress(cnr, partID), true)
263+
switch {
264+
case err == nil:
265+
return *hdr, nil
266+
case errors.Is(err, apistatus.ErrObjectNotFound):
267+
default:
268+
e.log.Info("failed to get EC part header from shard bypassing metabase, ignore error",
269+
zap.Stringer("container", cnr), zap.Stringer("parent", parent),
270+
zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index),
271+
zap.Stringer("partID", partID),
272+
zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err))
273+
}
274+
}
275+
276+
return object.Object{}, apistatus.ErrObjectNotFound
277+
}

0 commit comments

Comments
 (0)