Skip to content

Commit 404c946

Browse files
finished multi backend apis
1 parent 033938e commit 404c946

File tree

10 files changed

+269
-265
lines changed

10 files changed

+269
-265
lines changed

conf/config.json

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,13 @@
3434
"host": "127.0.0.1",
3535
"port": 9990
3636
},
37-
3837
"mongo": {
39-
"replicaSetHosts": "localhost:27017",
40-
"authCredentials": {
41-
"password": "password",
42-
"username": "root"
43-
},
44-
"writeConcern": "majority",
45-
"replicaSet": "rs0",
46-
"readPreference": "primary",
47-
"database": "metadata"
38+
"replicaSetHosts":
39+
"localhost:27017,localhost:27018,localhost:27019",
40+
"writeConcern": "majority",
41+
"replicaSet": "rs0",
42+
"readPreference": "primary",
43+
"database": "metadata"
4844
},
4945
"kafka": {
5046
"topic": "backbeat-oplog",

extensions/gc/tasks/GarbageCollectorTask.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ const async = require('async');
22
const errors = require('arsenal').errors;
33
const { ObjectMD } = require('arsenal').models;
44

5-
65
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
76
const { BatchDeleteCommand } = require('../../../lib/clients/smithy/build/smithy/source/typescript-codegen');
87
const { GarbageCollectorMetrics } = require('../GarbageCollectorMetrics');
@@ -122,9 +121,9 @@ class GarbageCollectorTask extends BackbeatTask {
122121
return done(err);
123122
}
124123

125-
const cloudserverClient = this.getCloudserverClient(accountId);
126-
if (!cloudserverClient) {
127-
log.error('failed to get cloudserver client', { accountId });
124+
const backbeatClient = this.getBackbeatClient(accountId);
125+
if (!backbeatClient) {
126+
log.error('failed to get backbeat client', { accountId });
128127
return done(errors.InternalError
129128
.customizeDescription('Unable to obtain client'));
130129
}
@@ -134,7 +133,7 @@ class GarbageCollectorTask extends BackbeatTask {
134133
RequestUids: log.getSerializedUids(),
135134
});
136135

137-
return cloudserverClient.send(command)
136+
return backbeatClient.send(command)
138137
.then(() => done())
139138
.catch(err => done(err));
140139
});

extensions/replication/tasks/CopyLocationTask.js

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,8 @@ class CopyLocationTask extends BackbeatTask {
8080
// Disable retries, use our own retry policy (mandatory for
8181
// putObject route in order to fetch data again from source).
8282
const { transport, s3, auth } = this.sourceConfig;
83-
const requestHandler = transport === 'https' ? {
84-
httpsAgent: this.sourceHTTPAgent,
85-
requestTimeout: TIMEOUT_MS,
86-
} : {
87-
httpAgent: this.sourceHTTPAgent,
83+
const requestHandler = {
84+
[transport === 'https' ? 'httpsAgent' : 'httpAgent']: this.sourceHTTPAgent,
8885
requestTimeout: TIMEOUT_MS,
8986
};
9087
const config = new CloudserverClientConfig({
@@ -98,7 +95,7 @@ class CopyLocationTask extends BackbeatTask {
9895
maxAttempts: 1,
9996
requestHandler: requestHandler,
10097
});
101-
this.cloudserverClient = new CloudserverClient({ config });
98+
this.backbeatClient = new CloudserverClient({ config });
10299
this.backbeatMetadataProxy = new BackbeatMetadataProxy(
103100
`${transport}://${s3.host}:${s3.port}`, auth, this.sourceHTTPAgent);
104101
this.backbeatMetadataProxy
@@ -244,7 +241,7 @@ class CopyLocationTask extends BackbeatTask {
244241
RequestUids: log.getSerializedUids(),
245242
});
246243

247-
return this.cloudserverClient.send(getObjectCommand)
244+
return this.backbeatClient.send(getObjectCommand)
248245
.then(response => {
249246
incomingMsg = response.Body;
250247
log.debug('putting data', actionEntry.getLogInfo());
@@ -293,7 +290,6 @@ class CopyLocationTask extends BackbeatTask {
293290
Bucket: bucket,
294291
Key: key,
295292
CanonicalID: objMD.getOwnerId(),
296-
ContentLength: size,
297293
ContentMD5: objMD.getContentMd5(),
298294
StorageType: this.destType,
299295
StorageClass: this.site,
@@ -309,7 +305,7 @@ class CopyLocationTask extends BackbeatTask {
309305
RequestUids: log.getSerializedUids(),
310306
});
311307

312-
return this.cloudserverClient.send(command)
308+
return this.backbeatClient.send(command)
313309
.then(data => {
314310
actionEntry.setSuccess({
315311
location: data.location,
@@ -387,7 +383,7 @@ class CopyLocationTask extends BackbeatTask {
387383
RequestUids: log.getSerializedUids(),
388384
});
389385

390-
return this.cloudserverClient.send(getObjectCommand)
386+
return this.backbeatClient.send(getObjectCommand)
391387
.then(response => {
392388
return this._putMPUPart(actionEntry, objMD, response.Body, size,
393389
uploadId, partNumber, log, done);
@@ -455,7 +451,7 @@ class CopyLocationTask extends BackbeatTask {
455451
RequestUids: log.getSerializedUids(),
456452
});
457453

458-
return this.cloudserverClient.send(command)
454+
return this.backbeatClient.send(command)
459455
.then(() => {
460456
return cb();
461457
})
@@ -500,7 +496,7 @@ class CopyLocationTask extends BackbeatTask {
500496
RequestUids: log.getSerializedUids(),
501497
});
502498

503-
return this.cloudserverClient.send(command)
499+
return this.backbeatClient.send(command)
504500
.then(data => {
505501
actionEntry.setSuccess({
506502
location: data.location,
@@ -552,7 +548,7 @@ class CopyLocationTask extends BackbeatTask {
552548
RequestUids: log.getSerializedUids(),
553549
});
554550

555-
return this.cloudserverClient.send(command)
551+
return this.backbeatClient.send(command)
556552
.then(data => {
557553
this._replicationMetric
558554
.withEntry(actionEntry)
@@ -606,7 +602,7 @@ class CopyLocationTask extends BackbeatTask {
606602
RequestUids: log.getSerializedUids(),
607603
});
608604

609-
return this.cloudserverClient.send(command)
605+
return this.backbeatClient.send(command)
610606
.then(data => {
611607
return cb(null, data.uploadId);
612608
})

extensions/replication/tasks/MultipleBackendTask.js

Lines changed: 72 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
99
const ReplicateObject = require('./ReplicateObject');
1010
const {
1111
GetObjectCommand,
12-
MultipleBackendPutObjectCommand
12+
MultipleBackendPutObjectCommand,
13+
MultipleBackendDeleteObjectCommand,
14+
MultipleBackendInitiateMPUCommand,
15+
MultipleBackendAbortMPUCommand,
16+
MultipleBackendPutObjectTaggingCommand,
17+
MultipleBackendDeleteObjectTaggingCommand
1318
} = require('../../../lib/clients/smithy/build/smithy/source/typescript-codegen');
1419
const { attachReqUids } = require('../../../lib/clients/utils');
1520
const getExtMetrics = require('../utils/getExtMetrics');
@@ -304,16 +309,20 @@ class MultipleBackendTask extends ReplicateObject {
304309
uploadId,
305310
});
306311
const doneOnce = jsutil.once(cb);
307-
const destReq = this.backbeatSource.multipleBackendAbortMPU({
312+
const command = new MultipleBackendAbortMPUCommand({
308313
Bucket: sourceEntry.getBucket(),
309314
Key: sourceEntry.getObjectKey(),
310315
StorageType: sourceEntry.getReplicationStorageType(),
311316
StorageClass: this.site,
312317
UploadId: uploadId,
318+
RequestUids: log.getSerializedUids(),
313319
});
314-
attachReqUids(destReq, log);
315-
return destReq.send(err => {
316-
if (err) {
320+
321+
return this.backbeatSource.send(command)
322+
.then(() => {
323+
return doneOnce();
324+
})
325+
.catch(err => {
317326
// eslint-disable-next-line no-param-reassign
318327
err.origin = 'source';
319328
log.error('an error occurred aborting multipart upload', {
@@ -324,9 +333,7 @@ class MultipleBackendTask extends ReplicateObject {
324333
error: err.message,
325334
});
326335
return doneOnce(err);
327-
}
328-
return doneOnce();
329-
});
336+
});
330337
}
331338

332339
/**
@@ -450,7 +457,7 @@ class MultipleBackendTask extends ReplicateObject {
450457
const uploadId = uuid().replace(/-/g, '');
451458
return setImmediate(() => cb(null, uploadId));
452459
}
453-
const destReq = this.backbeatSource.multipleBackendInitiateMPU({
460+
const command = new MultipleBackendInitiateMPUCommand({
454461
Bucket: sourceEntry.getBucket(),
455462
Key: sourceEntry.getObjectKey(),
456463
StorageType: sourceEntry.getReplicationStorageType(),
@@ -463,10 +470,14 @@ class MultipleBackendTask extends ReplicateObject {
463470
sourceEntry.getContentDisposition() || undefined,
464471
ContentEncoding: sourceEntry.getContentEncoding() || undefined,
465472
Tags: JSON.stringify(sourceEntry.getTags()),
473+
RequestUids: log.getSerializedUids(),
466474
});
467-
attachReqUids(destReq, log);
468-
return destReq.send((err, data) => {
469-
if (err) {
475+
476+
return this.backbeatSource.send(command)
477+
.then(data => {
478+
return cb(null, data.uploadId);
479+
})
480+
.catch(err => {
470481
// eslint-disable-next-line no-param-reassign
471482
err.origin = 'source';
472483
log.error('an error occurred on initating MPU to S3', {
@@ -477,9 +488,7 @@ class MultipleBackendTask extends ReplicateObject {
477488
error: err.message,
478489
});
479490
return cb(err);
480-
}
481-
return cb(null, data.uploadId);
482-
});
491+
});
483492
}
484493

485494
/**
@@ -907,36 +916,38 @@ class MultipleBackendTask extends ReplicateObject {
907916
log.debug('replicating object tags', {
908917
entry: sourceEntry.getLogInfo(),
909918
});
910-
const destReq = this.backbeatSource
911-
.multipleBackendPutObjectTagging({
912-
Bucket: sourceEntry.getBucket(),
913-
Key: sourceEntry.getObjectKey(),
914-
StorageType: sourceEntry.getReplicationStorageType(),
915-
StorageClass: this.site,
916-
DataStoreVersionId:
917-
sourceEntry.getReplicationSiteDataStoreVersionId(this.site),
918-
Tags: JSON.stringify(sourceEntry.getTags()),
919-
SourceBucket: sourceEntry.getBucket(),
920-
SourceVersionId: sourceEntry.getVersionId(),
921-
ReplicationEndpointSite: this.site,
922-
});
923-
attachReqUids(destReq, log);
919+
const command = new MultipleBackendPutObjectTaggingCommand({
920+
Bucket: sourceEntry.getBucket(),
921+
Key: sourceEntry.getObjectKey(),
922+
StorageType: sourceEntry.getReplicationStorageType(),
923+
StorageClass: this.site,
924+
DataStoreVersionId:
925+
sourceEntry.getReplicationSiteDataStoreVersionId(this.site),
926+
Tags: JSON.stringify(sourceEntry.getTags()),
927+
SourceBucket: sourceEntry.getBucket(),
928+
SourceVersionId: sourceEntry.getVersionId(),
929+
ReplicationEndpointSite: this.site,
930+
RequestUids: log.getSerializedUids(),
931+
});
932+
924933
const writeStartTime = Date.now();
925-
return destReq.send((err, data) => {
926-
if (err) {
934+
return this.backbeatSource.send(command)
935+
.then(data => {
936+
sourceEntry.setReplicationSiteDataStoreVersionId(this.site,
937+
data.versionId);
938+
// TODO : review metadata metrics
939+
this._publishMetadataWriteMetrics(JSON.stringify(command.input), writeStartTime);
940+
return doneOnce();
941+
})
942+
.catch(err => {
927943
log.error('an error occurred putting object tagging to S3', {
928944
method: 'MultipleBackendTask._putObjectTaggingOnce',
929945
entry: sourceEntry.getLogInfo(),
930946
origin: 'target',
931947
error: err.message,
932948
});
933949
return doneOnce(err);
934-
}
935-
sourceEntry.setReplicationSiteDataStoreVersionId(this.site,
936-
data.versionId);
937-
this._publishMetadataWriteMetrics(destReq.httpRequest.body, writeStartTime);
938-
return doneOnce();
939-
});
950+
});
940951
}
941952

942953
_deleteObjectTagging(sourceEntry, log, cb) {
@@ -961,7 +972,7 @@ class MultipleBackendTask extends ReplicateObject {
961972
log.debug('replicating delete object tagging', {
962973
entry: sourceEntry.getLogInfo(),
963974
});
964-
const destReq = this.backbeatSource.multipleBackendDeleteObjectTagging({
975+
const command = new MultipleBackendDeleteObjectTaggingCommand({
965976
Bucket: sourceEntry.getBucket(),
966977
Key: sourceEntry.getObjectKey(),
967978
StorageType: sourceEntry.getReplicationStorageType(),
@@ -971,11 +982,19 @@ class MultipleBackendTask extends ReplicateObject {
971982
SourceBucket: sourceEntry.getBucket(),
972983
SourceVersionId: sourceEntry.getVersionId(),
973984
ReplicationEndpointSite: this.site,
985+
RequestUids: log.getSerializedUids(),
974986
});
975-
attachReqUids(destReq, log);
987+
976988
const writeStartTime = Date.now();
977-
return destReq.send((err, data) => {
978-
if (err) {
989+
return this.backbeatSource.send(command)
990+
.then(data => {
991+
sourceEntry.setReplicationSiteDataStoreVersionId(this.site,
992+
data.versionId);
993+
// TODO : review metadata metrics
994+
this._publishMetadataWriteMetrics(JSON.stringify(command.input), writeStartTime);
995+
return doneOnce();
996+
})
997+
.catch(err => {
979998
log.error('an error occurred on deleting object tagging', {
980999
method: 'MultipleBackendTask._deleteObjectTaggingOnce',
9811000
entry: sourceEntry.getLogInfo(),
@@ -984,12 +1003,7 @@ class MultipleBackendTask extends ReplicateObject {
9841003
error: err.message,
9851004
});
9861005
return doneOnce(err);
987-
}
988-
sourceEntry.setReplicationSiteDataStoreVersionId(this.site,
989-
data.versionId);
990-
this._publishMetadataWriteMetrics(destReq.httpRequest.body, writeStartTime);
991-
return doneOnce();
992-
});
1006+
});
9931007
}
9941008

9951009
_putDeleteMarker(sourceEntry, log, cb) {
@@ -1038,16 +1052,22 @@ class MultipleBackendTask extends ReplicateObject {
10381052
* @return {undefined}
10391053
*/
10401054
_sendMultipleBackendDeleteObject(sourceEntry, log, doneOnce) {
1041-
const destReq = this.backbeatSource.multipleBackendDeleteObject({
1055+
const command = new MultipleBackendDeleteObjectCommand({
10421056
Bucket: sourceEntry.getBucket(),
10431057
Key: sourceEntry.getObjectKey(),
10441058
StorageType: sourceEntry.getReplicationStorageType(),
10451059
StorageClass: this.site,
1060+
RequestUids: log.getSerializedUids(),
10461061
});
1047-
attachReqUids(destReq, log);
1062+
10481063
const writeStartTime = Date.now();
1049-
return destReq.send(err => {
1050-
if (err) {
1064+
return this.backbeatSource.send(command)
1065+
.then(() => {
1066+
// TODO : This metric is changed, needs double checking
1067+
this._publishMetadataWriteMetrics(JSON.stringify(command.input), writeStartTime);
1068+
return doneOnce();
1069+
})
1070+
.catch(err => {
10511071
// eslint-disable-next-line no-param-reassign
10521072
err.origin = 'source';
10531073
log.error('an error occurred on putting delete marker to S3', {
@@ -1058,10 +1078,7 @@ class MultipleBackendTask extends ReplicateObject {
10581078
error: err.message,
10591079
});
10601080
return doneOnce(err);
1061-
}
1062-
this._publishMetadataWriteMetrics(destReq.httpRequest.body, writeStartTime);
1063-
return doneOnce();
1064-
});
1081+
});
10651082
}
10661083

10671084
/**

0 commit comments

Comments
 (0)