Skip to content

Commit

Permalink
post reviews fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
benzekrimaha committed Jul 24, 2024
1 parent 8258211 commit 8dbcf11
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 28 deletions.
61 changes: 35 additions & 26 deletions extensions/gc/tasks/GarbageCollectorTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class GarbageCollectorTask extends BackbeatTask {
});
}

_executeDeleteData(entry, log, done) {
_executeDeleteDataOnce(entry, log, done) {
const { locations } = entry.getAttribute('target');
const ruleType = entry.getContextAttribute('ruleType');
const params = {
Expand Down Expand Up @@ -184,18 +184,29 @@ class GarbageCollectorTask extends BackbeatTask {
});
}

_deleteArchivedSourceData(entry, log, done) {
_executeDeleteData(entry, log, done) {
const logFields = {
bucket: entry.getAttribute('source.bucket'),
objectKey: entry.getAttribute('source.objectKey'),
storageClass: entry.getAttribute('source.storageClass'),
ruleType: entry.getContextAttribute('ruleType'),
};

this.retry({
actionDesc: 'execute delete data',
logFields,
maxRetries: 2,
actionFunc: cb => this._executeDeleteDataOnce(entry, log, cb),
shouldRetryFunc: err => err.retryable,
log,
}, done);
}

_deleteArchivedSourceDataOnce(entry, log, done) {
const { bucket, key, version, oldLocation, newLocation } = entry.getAttribute('target');

async.waterfall([
next => this.retry({
actionDesc: 'get metadata',
logFields: { bucket, key, version },
maxRetries: 2,
actionFunc: cb => this._getMetadata(entry, log, cb),
shouldRetryFunc: err => err.retryable,
log,
}, (err, objMD) => {
next => this._getMetadata(entry, log, (err, objMD) => {
GarbageCollectorMetrics.onS3Request(log, 'getMetadata', 'archive', err);
return next(err, objMD);
}),
Expand All @@ -218,14 +229,7 @@ class GarbageCollectorTask extends BackbeatTask {
}),
};

this.retry({
actionDesc: 'batch delete data',
logFields: { bucket, key, version },
maxRetries: 2,
actionFunc: cb => this._batchDeleteData(params, entry, log, cb),
shouldRetryFunc: err => err.retryable,
log,
}, err => {
this._batchDeleteData(params, entry, log, err => {
GarbageCollectorMetrics.onS3Request(log, 'batchdelete', 'archive', err);
entry.setEnd(err);
log.info('action execution ended', entry.getLogInfo());
Expand Down Expand Up @@ -268,14 +272,7 @@ class GarbageCollectorTask extends BackbeatTask {
.setUserMetadata({
'x-amz-meta-scal-s3-transition-attempt': undefined,
});
return this.retry({
actionDesc: 'put metadata',
logFields: { bucket, key, version },
maxRetries: 2,
actionFunc: cb => this._putMetadata(entry, objMD, log, cb),
shouldRetryFunc: err => err.retryable,
log,
}, err => {
this._putMetadata(entry, objMD, log, err => {
GarbageCollectorMetrics.onS3Request(log, 'putMetadata', 'archive', err);
if (!err) {
log.end().info('completed expiration of archived data',
Expand All @@ -301,6 +298,18 @@ class GarbageCollectorTask extends BackbeatTask {
});
}

_deleteArchivedSourceData(entry, log, done) {
this.retry({
actionDesc: 'delete archived source data',
logFields: { bucket: entry.getAttribute('target').bucket,
key: entry.getAttribute('target').key, version: entry.getAttribute('target').version },
maxRetries: 2,
actionFunc: cb => this._deleteArchivedSourceDataOnce(entry, log, cb),
shouldRetryFunc: err => err.retryable,
log,
}, done);
}

/**
* Execute the action specified in kafka queue entry
*
Expand Down
4 changes: 2 additions & 2 deletions lib/tasks/BackbeatTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BackbeatTask {
}

if (!shouldRetryFunc(err)) {
return doneOnce(err);
return doneOnce(...args);
}

const now = Date.now();
Expand All @@ -78,7 +78,7 @@ class BackbeatTask {
retriesMaxedOut,
timeoutReached,
}, logFields || {}), log);
return doneOnce(err);
return doneOnce(...args);
}

if (onRetryFunc) {
Expand Down
90 changes: 90 additions & 0 deletions tests/unit/gc/GarbageCollectorTask.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,94 @@ describe('GarbageCollectorTask', () => {
done();
});
});

it('should delete data', done => {
backbeatClient.batchDeleteResponse = { error: null, res: null };

const entry = ActionQueueEntry.create('deleteData')
.addContext({
origin: 'lifecycle',
ruleType: 'expiration',
bucketName: bucket,
objectKey: key,
versionId: version,
})
.setAttribute('serviceName', 'lifecycle-expiration')
.setAttribute('target', {
bucket,
key: version,
version: key,
accountId,
owner,
locations: [{
key: 'locationKey',
dataStoreName: 'dataStoreName',
size: 'size',
dataStoreVersionId: 'dataStoreVersionId',
}],
});
mdObj.setLocation(loc)
.setDataStoreName('locationName')
.setAmzStorageClass('STANDARD');

backbeatMetadataProxyClient.setMdObj(mdObj);

const batchDeleteDataSpy = sinon.spy(gcTask, '_batchDeleteData');

gcTask.processActionEntry(entry, (err, commitInfo) => {
assert.ifError(err);
assert.strictEqual(commitInfo, undefined);

const updatedMD = backbeatMetadataProxyClient.mdObj;
assert.deepStrictEqual(updatedMD.getLocation(), loc);
assert.strictEqual(updatedMD.getDataStoreName(), 'locationName');
assert.strictEqual(updatedMD.getAmzStorageClass(), 'STANDARD');

assert.strictEqual(batchDeleteDataSpy.callCount, 1);
batchDeleteDataSpy.restore();
done();
});
});

it('should retry delete operation if gc failed with retryable error', done => {
backbeatClient.batchDeleteResponse = { error: { statusCode: 500, retryable: true }, res: null };

const entry = ActionQueueEntry.create('deleteData')
.addContext({
origin: 'lifecycle',
ruleType: 'expiration',
bucketName: bucket,
objectKey: key,
versionId: version,
})
.setAttribute('serviceName', 'lifecycle-expiration')
.setAttribute('target', {
bucket,
key: version,
version: key,
accountId,
owner,
locations: [{
key: 'locationKey',
dataStoreName: 'dataStoreName',
size: 'size',
dataStoreVersionId: 'dataStoreVersionId',
}],
});
mdObj.setLocation(loc)
.setDataStoreName('locationName')
.setAmzStorageClass('STANDARD');

backbeatMetadataProxyClient.setMdObj(mdObj);

const batchDeleteDataSpy = sinon.spy(gcTask, '_batchDeleteData');

gcTask.processActionEntry(entry, err => {
assert.strictEqual(batchDeleteDataSpy.callCount, 3);
assert.strictEqual(err.statusCode, 500);
batchDeleteDataSpy.restore();
done();
});
});

});

0 comments on commit 8dbcf11

Please sign in to comment.