1+ const { constants : { HTTP_STATUS_CONFLICT } } = require ( 'http2' ) ;
12const url = require ( 'url' ) ;
23const async = require ( 'async' ) ;
34const httpProxy = require ( 'http-proxy' ) ;
@@ -7,7 +8,13 @@ const joi = require('@hapi/joi');
78const backbeatProxy = httpProxy . createProxyServer ( {
89 ignorePath : true ,
910} ) ;
10- const { auth, errors, errorInstances, s3middleware, s3routes, models, storage } = require ( 'arsenal' ) ;
11+ const { auth, errors, errorInstances, s3middleware, s3routes, models, storage, versioning } = require ( 'arsenal' ) ;
12+ const { decode, encode, compare : compareMicroVersionId , Ordering } = versioning . VersionID ;
13+ const {
14+ VersionIdCollisionException,
15+ StaleMicroVersionIdException,
16+ MicroVersionIdAlreadyStoredException,
17+ } = require ( '@scality/cloudserverclient' ) ;
1118
1219const { responseJSONBody } = s3routes . routesUtils ;
1320const { getSubPartIds } = s3middleware . azureHelper . mpuUtils ;
@@ -21,6 +28,7 @@ const locationStorageCheck = require('../api/apiUtils/object/locationStorageChec
2128const { dataStore } = require ( '../api/apiUtils/object/storeObject' ) ;
2229const prepareRequestContexts = require ( '../api/apiUtils/authorization/prepareRequestContexts' ) ;
2330const { decodeVersionId } = require ( '../api/apiUtils/object/versioning' ) ;
31+ const getReplicationInfo = require ( '../api/apiUtils/object/getReplicationInfo' ) ;
2432const locationKeysHaveChanged = require ( '../api/apiUtils/object/locationKeysHaveChanged' ) ;
2533const { standardMetadataValidateBucketAndObj, metadataGetObject } = require ( '../metadata/metadataUtils' ) ;
2634const { config } = require ( '../Config' ) ;
@@ -32,6 +40,7 @@ const {
3240} = require ( '../api/apiUtils/integrity/validateChecksums' ) ;
3341const { BackendInfo } = models ;
3442const { pushReplicationMetric } = require ( './utilities/pushReplicationMetric' ) ;
43+ const writeContinue = require ( '../utilities/writeContinue' ) ;
3544const kms = require ( '../kms/wrapper' ) ;
3645const { listLifecycleCurrents } = require ( '../api/backbeat/listLifecycleCurrents' ) ;
3746const { listLifecycleNonCurrents } = require ( '../api/backbeat/listLifecycleNonCurrents' ) ;
@@ -93,7 +102,7 @@ function _isObjectRequest(req) {
93102 return [ 'data' , 'metadata' , 'multiplebackenddata' , 'multiplebackendmetadata' ] . includes ( req . resourceType ) ;
94103}
95104
96- function _respondWithHeaders ( response , payload , extraHeaders , log , callback ) {
105+ function _respondWithHeaders ( response , payload , extraHeaders , log , callback , statusCode = 200 ) {
97106 let body = '' ;
98107 if ( typeof payload === 'string' ) {
99108 body = payload ;
@@ -115,10 +124,10 @@ function _respondWithHeaders(response, payload, extraHeaders, log, callback) {
115124 // eslint-disable-next-line no-param-reassign
116125 response . serverAccessLog . endTurnAroundTime = process . hrtime . bigint ( ) ;
117126 }
118- response . writeHead ( 200 , httpHeaders ) ;
127+ response . writeHead ( statusCode , httpHeaders ) ;
119128 response . end ( body , 'utf8' , ( ) => {
120129 log . end ( ) . info ( 'responded with payload' , {
121- httpCode : 200 ,
130+ httpCode : statusCode ,
122131 contentLength : Buffer . byteLength ( body ) ,
123132 } ) ;
124133 callback ( ) ;
@@ -129,6 +138,15 @@ function _respond(response, payload, log, callback) {
129138 _respondWithHeaders ( response , payload , { } , log , callback ) ;
130139}
131140
141+ function _respondWithHeaderCrrConflict ( response , log , callback , code , message , mvId ) {
142+ return _respondWithHeaders (
143+ response ,
144+ { code, message } ,
145+ mvId ? { 'x-scal-micro-version-id' : encode ( mvId ) } : { } ,
146+ log , callback , HTTP_STATUS_CONFLICT ,
147+ ) ;
148+ }
149+
132150function _getRequestPayload ( req , cb ) {
133151 const payload = [ ] ;
134152 let payloadLen = 0 ;
@@ -414,6 +432,30 @@ function putData(request, response, bucketInfo, objMd, log, callback) {
414432 log . error ( errMessage ) ;
415433 return callback ( errorInstances . BadRequest . customizeDescription ( errMessage ) ) ;
416434 }
435+
436+ const incomingVersionIdEncoded = request . headers [ 'x-scal-version-id' ] ;
437+ const decoded = incomingVersionIdEncoded ? decode ( incomingVersionIdEncoded ) : null ;
438+ const incomingVersionIdDecoded = decoded instanceof Error ? null : decoded ;
439+ if ( incomingVersionIdDecoded && objMd && objMd . versionId === incomingVersionIdDecoded ) {
440+ // Skip the write if data is already at destination for this version id
441+ // Return 409 with the existing microVersionId so backbeat can
442+ // decide if putMetadata is still needed
443+ log . debug ( 'crr cascade putData: version already at destination' , {
444+ method : 'putData' ,
445+ bucketName : request . bucketName ,
446+ objectKey : request . objectKey ,
447+ hasMicroVersionId : ! ! objMd . microVersionId ,
448+ } ) ;
449+ request . resume ( ) ;
450+ return _respondWithHeaderCrrConflict (
451+ response , log , callback ,
452+ VersionIdCollisionException . name ,
453+ 'version id already at destination' ,
454+ objMd . microVersionId ,
455+ ) ;
456+ }
457+
458+ writeContinue ( request , response ) ;
417459 const context = {
418460 bucketName : request . bucketName ,
419461 owner : canonicalID ,
@@ -539,6 +581,42 @@ function getCanonicalIdsByAccountId(accountId, log, cb) {
539581}
540582
541583function putMetadata ( request , response , bucketInfo , objMd , log , callback ) {
584+ const { bucketName, objectKey } = request ;
585+
586+ const encodedMicroVersionId = request . headers [ 'x-scal-micro-version-id' ] ;
587+ const decoded = encodedMicroVersionId ? decode ( encodedMicroVersionId ) : null ;
588+ const incomingMicroVersionId = decoded instanceof Error ? null : decoded ;
589+ if ( incomingMicroVersionId ) {
590+ const cmp = compareMicroVersionId ( incomingMicroVersionId , objMd ?. microVersionId ) ;
591+ if ( cmp === Ordering . EQUAL ) {
592+ log . debug ( 'crr cascade putMetadata: loop detected, skipping write' , {
593+ method : 'putMetadata' ,
594+ bucketName,
595+ objectKey,
596+ } ) ;
597+ request . resume ( ) ;
598+ return _respondWithHeaderCrrConflict (
599+ response , log , callback ,
600+ MicroVersionIdAlreadyStoredException . name ,
601+ 'incoming microVersionId already at destination' ,
602+ ) ;
603+ }
604+ if ( cmp === Ordering . OLDER ) {
605+ log . debug ( 'crr cascade putMetadata: stale event, rejecting' , {
606+ method : 'putMetadata' ,
607+ bucketName,
608+ objectKey,
609+ } ) ;
610+ request . resume ( ) ;
611+ return _respondWithHeaderCrrConflict (
612+ response , log , callback ,
613+ StaleMicroVersionIdException . name ,
614+ 'incoming revision is older than destination' ,
615+ objMd ?. microVersionId ,
616+ ) ;
617+ }
618+ }
619+
542620 return _getRequestPayload ( request , ( err , payload ) => {
543621 if ( err ) {
544622 return callback ( err ) ;
@@ -552,15 +630,15 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
552630 return callback ( errors . MalformedPOSTRequest ) ;
553631 }
554632
555- const { headers, bucketName , objectKey } = request ;
633+ const { headers } = request ;
556634
557635 // Destination-side delete-marker replication.
558636 // We need the REPLICA status to distinguish from
559637 // source-side replication status updates that also carry isDeleteMarker=true.
560638 if (
561639 omVal . isDeleteMarker &&
562640 omVal . replicationInfo &&
563- omVal . replicationInfo . status === 'REPLICA' &&
641+ ( omVal . replicationInfo . isReplica === true || omVal . replicationInfo . status === 'REPLICA' ) &&
564642 request . serverAccessLog
565643 ) {
566644 // eslint-disable-next-line no-param-reassign
@@ -576,7 +654,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
576654 // The REPLICA status excludes source-side replication-status updates.
577655 if (
578656 omVal . replicationInfo &&
579- omVal . replicationInfo . status === 'REPLICA' &&
657+ ( omVal . replicationInfo . isReplica === true || omVal . replicationInfo . status === 'REPLICA' ) &&
580658 ( omVal . originOp === 's3:ObjectTagging:Put' || omVal . originOp === 's3:ObjectTagging:Delete' ) &&
581659 request . serverAccessLog
582660 ) {
@@ -593,7 +671,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
593671 // The REPLICA status excludes source-side replication-status updates.
594672 if (
595673 omVal . replicationInfo &&
596- omVal . replicationInfo . status === 'REPLICA' &&
674+ ( omVal . replicationInfo . isReplica === true || omVal . replicationInfo . status === 'REPLICA' ) &&
597675 omVal . originOp === 's3:ObjectAcl:Put' &&
598676 request . serverAccessLog
599677 ) {
@@ -672,7 +750,8 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
672750 // then we want to create a version for the replica object even though
673751 // none was provided in the object metadata value.
674752 if ( omVal . replicationInfo . isNFS ) {
675- const { isReplica } = omVal . replicationInfo ;
753+ const isReplica = omVal . replicationInfo . isReplica === true
754+ || omVal . replicationInfo . status === 'REPLICA' ;
676755 versioning = isReplica ;
677756 omVal . replicationInfo . isNFS = ! isReplica ;
678757 }
@@ -724,6 +803,48 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
724803 options . isNull = isNull ;
725804 }
726805
806+ // Cascade triggering
807+ // If the bucket receiving this replica has its own CRR rules, set
808+ // status to PENDING so the queue populator here picks it up for the
809+ // next hop. If not, clear the source-side replicationInfo fields
810+ // Always mark isReplica=true.
811+ if ( incomingMicroVersionId ) {
812+ const isMDOnly = headers [ 'x-scal-replication-content' ] === 'METADATA' ;
813+ const objSize = omVal [ 'content-length' ] || 0 ;
814+
815+ // These S3-compatible Scality locations are excluded
816+ // as cascade targets because they use the MultiBackend S3 path which
817+ // bypasses the putData/putMetadata routes, so loop detection cannot fire
818+ // on those destinations.
819+ const BLOCKED_LOCATION_TYPES = [ 'location-scality-ring-s3-v1' , 'location-scality-artesca-s3-v1' ] ;
820+
821+ const nextReplInfo = getReplicationInfo ( config , objectKey , bucketInfo , isMDOnly , objSize , null , null , null ) ;
822+
823+ if ( nextReplInfo ) {
824+ nextReplInfo . backends = nextReplInfo . backends . filter ( b => {
825+ const loc = config . locationConstraints [ b . site ] ;
826+ return ! loc || ! BLOCKED_LOCATION_TYPES . includes ( loc . type ) ;
827+ } ) ;
828+ }
829+
830+ if ( nextReplInfo && nextReplInfo . backends . length > 0 ) {
831+ omVal . replicationInfo = nextReplInfo ;
832+ } else {
833+ omVal . replicationInfo = {
834+ status : '' ,
835+ backends : [ ] ,
836+ content : [ ] ,
837+ destination : '' ,
838+ storageClass : '' ,
839+ role : '' ,
840+ storageType : '' ,
841+ dataStoreVersionId : '' ,
842+ } ;
843+ }
844+
845+ omVal . replicationInfo . isReplica = true ;
846+ }
847+
727848 return async . series (
728849 [
729850 // Zenko's CRR delegates replacing the account
0 commit comments