@@ -13,7 +13,8 @@ const {
1313 MultipleBackendInitiateMPUCommand,
1414 MultipleBackendPutMPUPartCommand,
1515 MultipleBackendCompleteMPUCommand,
16- MultipleBackendAbortMPUCommand
16+ MultipleBackendAbortMPUCommand,
17+ addContentLengthMiddleware,
1718} = require ( '@scality/cloudserverclient' ) ;
1819const { LifecycleMetrics } = require ( '../../lifecycle/LifecycleMetrics' ) ;
1920const ReplicationMetric = require ( '../ReplicationMetric' ) ;
@@ -223,6 +224,54 @@ class CopyLocationTask extends BackbeatTask {
223224 log . debug ( 'getting object data' , actionEntry . getLogInfo ( ) ) ;
224225 const doneOnce = jsutil . once ( done ) ;
225226 const size = objMD . getContentLength ( ) ;
227+
228+ // Use abort controller to cancel requests on error
229+ const abortController = new AbortController ( ) ;
230+ let sourceStreamAborted = false ;
231+
232+ const performPutObject = incomingMsg => {
233+ log . debug ( 'putting data' , actionEntry . getLogInfo ( ) ) ;
234+
235+ // Set up stream error handler if we have a stream
236+ if ( incomingMsg ) {
237+ incomingMsg . on ( 'error' , err => {
238+ if ( ! sourceStreamAborted ) {
239+ if ( err . $metadata ?. httpStatusCode === 404 ) {
240+ log . error ( 'the source object was not found' , Object . assign ( {
241+ method : 'CopyLocationTask._getAndPutObjectOnce' ,
242+ peer : this . sourceConfig . s3 ,
243+ error : err . message ,
244+ httpStatus : err . $metadata ?. httpStatusCode ,
245+ } , actionEntry . getLogInfo ( ) ) ) ;
246+ return doneOnce ( errors . ObjNotFound ) ;
247+ }
248+ log . error ( 'an error occurred when streaming data from S3' ,
249+ Object . assign ( {
250+ method : 'CopyLocationTask._getAndPutObjectOnce' ,
251+ peer : this . sourceConfig . s3 ,
252+ error : err . message ,
253+ } , actionEntry . getLogInfo ( ) ) ) ;
254+ return doneOnce ( err ) ;
255+ }
256+ return undefined ;
257+ } ) ;
258+ }
259+
260+ const putDone = err => {
261+ if ( err && incomingMsg && ! sourceStreamAborted ) {
262+ // Abort the source stream on PUT error
263+ sourceStreamAborted = true ;
264+ if ( incomingMsg . destroy ) {
265+ incomingMsg . destroy ( ) ;
266+ }
267+ }
268+ return doneOnce ( err ) ;
269+ } ;
270+
271+ return this . _sendMultipleBackendPutObject (
272+ actionEntry , objMD , size , incomingMsg , log , putDone ) ;
273+ } ;
274+
226275 if ( size !== 0 ) {
227276 const { bucket, key, version } = actionEntry . getAttribute ( 'target' ) ;
228277 const getObjectCommand = new GetObjectCommand ( {
@@ -233,13 +282,12 @@ class CopyLocationTask extends BackbeatTask {
233282 RequestUids : log . getSerializedUids ( ) ,
234283 } ) ;
235284
236- return this . backbeatClient . send ( getObjectCommand )
237- . then ( response => {
238- log . debug ( 'putting data' , actionEntry . getLogInfo ( ) ) ;
239- return this . _sendMultipleBackendPutObject (
240- actionEntry , objMD , size , response . Body , log , doneOnce ) ;
241- } )
285+ return this . backbeatClient . send ( getObjectCommand , { abortSignal : abortController . signal } )
286+ . then ( response => performPutObject ( response . Body ) )
242287 . catch ( err => {
288+ if ( ! sourceStreamAborted ) {
289+ sourceStreamAborted = true ;
290+ }
243291 if ( err . $metadata ?. httpStatusCode === 404 ) {
244292 log . error ( 'the source object was not found' , Object . assign ( {
245293 method : 'CopyLocationTask._getAndPutObjectOnce' ,
@@ -259,9 +307,7 @@ class CopyLocationTask extends BackbeatTask {
259307 return doneOnce ( err ) ;
260308 } ) ;
261309 }
262- log . debug ( 'putting data' , actionEntry . getLogInfo ( ) ) ;
263- return this . _sendMultipleBackendPutObject (
264- actionEntry , objMD , size , null , log , doneOnce ) ;
310+ return performPutObject ( undefined ) ;
265311 }
266312
267313 /**
@@ -295,6 +341,7 @@ class CopyLocationTask extends BackbeatTask {
295341 Body : incomingMsg ,
296342 RequestUids : log . getSerializedUids ( ) ,
297343 } ) ;
344+ addContentLengthMiddleware ( command , size ) ;
298345
299346 return this . backbeatClient . send ( command )
300347 . then ( data => {
@@ -362,6 +409,9 @@ class CopyLocationTask extends BackbeatTask {
362409 } , actionEntry . getLogInfo ( ) ) ) ;
363410 // A 0-byte object has no range, otherwise range is inclusive.
364411 const size = range ? range . end - range . start + 1 : 0 ;
412+ // Create AbortController even for 0-byte parts for consistency
413+ const abortController = new AbortController ( ) ;
414+
365415 const { bucket, key, version } = actionEntry . getAttribute ( 'target' ) ;
366416 if ( size !== 0 ) {
367417 const getObjectCommand = new GetObjectCommand ( {
@@ -373,9 +423,9 @@ class CopyLocationTask extends BackbeatTask {
373423 RequestUids : log . getSerializedUids ( ) ,
374424 } ) ;
375425
376- return this . backbeatClient . send ( getObjectCommand )
426+ return this . backbeatClient . send ( getObjectCommand , { abortSignal : abortController . signal } )
377427 . then ( response => this . _putMPUPart ( actionEntry , objMD , response . Body , size ,
378- uploadId , partNumber , log , done ) )
428+ uploadId , partNumber , log , abortController , done ) )
379429 . catch ( err => {
380430 if ( err . $metadata ?. httpStatusCode === 404 ) {
381431 return done ( err ) ;
@@ -389,8 +439,9 @@ class CopyLocationTask extends BackbeatTask {
389439 return done ( err ) ;
390440 } ) ;
391441 }
392- return this . _putMPUPart ( actionEntry , objMD , null , size ,
393- uploadId , partNumber , log , done ) ;
442+ // For 0-byte parts, pass undefined body but still provide abortController
443+ return this . _putMPUPart ( actionEntry , objMD , undefined , size ,
444+ uploadId , partNumber , log , abortController , done ) ;
394445 }
395446
396447 /**
@@ -513,45 +564,94 @@ class CopyLocationTask extends BackbeatTask {
513564 * @param {String } uploadId - The upload ID of the initiated MPU
514565 * @param {Number } partNumber - The part number of the part
515566 * @param {Werelogs } log - The logger instance
567+ * @param {AbortController } abortController - The abort controller for the source GET request
516568 * @param {Function } cb - The callback to call
517569 * @return {undefined }
518570 */
519571 _putMPUPart ( actionEntry , objMD , incomingMsg , size , uploadId , partNumber ,
520- log , cb ) {
572+ log , abortController , cb ) {
521573 const doneOnce = jsutil . once ( cb ) ;
522- log . debug ( 'putting data' , actionEntry . getLogInfo ( ) ) ;
574+ let sourceStreamAborted = false ;
575+ let destRequestAborted = false ;
523576
524- const { bucket, key } = actionEntry . getAttribute ( 'target' ) ;
525- const command = new MultipleBackendPutMPUPartCommand ( {
526- Bucket : bucket ,
527- Key : key ,
528- ContentLength : size ,
529- StorageType : this . destType ,
530- StorageClass : this . site ,
531- PartNumber : partNumber ,
532- UploadId : uploadId ,
533- Body : incomingMsg ,
534- RequestUids : log . getSerializedUids ( ) ,
535- } ) ;
536-
537- return this . backbeatClient . send ( command )
538- . then ( data => {
539- this . _replicationMetric
540- . withEntry ( actionEntry )
541- . withMetricType ( metricsTypeCompleted )
542- . withObjectSize ( size )
543- . publish ( ) ;
544- return doneOnce ( null , data ) ;
545- } )
546- . catch ( err => {
547- log . error ( 'an error occurred on putting MPU part to S3' ,
548- Object . assign ( {
549- method : 'CopyLocationTask._putMPUPart' ,
550- error : err . message ,
551- httpStatus : err . $metadata ?. httpStatusCode ,
552- } , actionEntry . getLogInfo ( ) ) ) ;
553- return doneOnce ( err ) ;
577+ const performPutMPUPart = ( ) => {
578+ if ( incomingMsg ) {
579+ log . debug ( 'putting data' , actionEntry . getLogInfo ( ) ) ;
580+ }
581+
582+ const { bucket, key } = actionEntry . getAttribute ( 'target' ) ;
583+ const command = new MultipleBackendPutMPUPartCommand ( {
584+ Bucket : bucket ,
585+ Key : key ,
586+ ContentLength : size ,
587+ StorageType : this . destType ,
588+ StorageClass : this . site ,
589+ PartNumber : partNumber ,
590+ UploadId : uploadId ,
591+ Body : incomingMsg ,
592+ RequestUids : log . getSerializedUids ( ) ,
554593 } ) ;
594+ addContentLengthMiddleware ( command , size ) ;
595+
596+ return this . backbeatClient . send ( command )
597+ . then ( data => {
598+ this . _replicationMetric
599+ . withEntry ( actionEntry )
600+ . withMetricType ( metricsTypeCompleted )
601+ . withObjectSize ( size )
602+ . publish ( ) ;
603+ return doneOnce ( null , data ) ;
604+ } )
605+ . catch ( err => {
606+ if ( ! destRequestAborted ) {
607+ destRequestAborted = true ;
608+ if ( abortController ) {
609+ abortController . abort ( ) ;
610+ sourceStreamAborted = true ;
611+ }
612+ if ( incomingMsg && incomingMsg . destroy ) {
613+ incomingMsg . destroy ( ) ;
614+ }
615+ log . error ( 'an error occurred on putting MPU part to S3' ,
616+ Object . assign ( {
617+ method : 'CopyLocationTask._putMPUPart' ,
618+ error : err . message ,
619+ httpStatus : err . $metadata ?. httpStatusCode ,
620+ } , actionEntry . getLogInfo ( ) ) ) ;
621+ }
622+ return doneOnce ( err ) ;
623+ } ) ;
624+ } ;
625+
626+ if ( incomingMsg ) {
627+ incomingMsg . on ( 'error' , err => {
628+ if ( ! sourceStreamAborted ) {
629+ sourceStreamAborted = true ;
630+ if ( abortController ) {
631+ abortController . abort ( ) ;
632+ }
633+ destRequestAborted = true ;
634+ if ( err . $metadata ?. httpStatusCode === 404 ) {
635+ log . error ( 'source object not found when streaming MPU part' ,
636+ Object . assign ( {
637+ method : 'CopyLocationTask._putMPUPart' ,
638+ error : err . message ,
639+ httpStatus : err . $metadata ?. httpStatusCode ,
640+ } , actionEntry . getLogInfo ( ) ) ) ;
641+ return doneOnce ( errors . ObjNotFound ) ;
642+ }
643+ log . error ( 'an error occurred when streaming MPU part data from S3' ,
644+ Object . assign ( {
645+ method : 'CopyLocationTask._putMPUPart' ,
646+ error : err . message ,
647+ } , actionEntry . getLogInfo ( ) ) ) ;
648+ return doneOnce ( err ) ;
649+ }
650+ return undefined ;
651+ } ) ;
652+ }
653+
654+ return performPutMPUPart ( ) ;
555655 }
556656
557657 _getAndPutMultipartUpload ( actionEntry , objMD , log , cb ) {
0 commit comments