Skip to content

Commit

Permalink
Merge branch 'master' into test-importer-interface
Browse files Browse the repository at this point in the history
  • Loading branch information
FilippoULIVO authored Jul 1, 2023
2 parents 3725e0e + 1136476 commit 63ecc8e
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.datatransferproject.datatransfer.google.videos.GoogleVideosImporter;
import org.datatransferproject.spi.cloud.storage.AppCredentialStore;
import org.datatransferproject.spi.cloud.storage.JobStore;
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor;
import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutorExtension;
import org.datatransferproject.types.common.models.DataVertical;
import org.datatransferproject.spi.transfer.extension.TransferExtension;
import org.datatransferproject.spi.transfer.provider.Exporter;
Expand Down Expand Up @@ -107,6 +109,10 @@ public void initialize(ExtensionContext context) {
GoogleCredentialFactory credentialFactory =
new GoogleCredentialFactory(httpTransport, jsonFactory, appCredentials, monitor);

IdempotentImportExecutor idempotentImportExecutor = context.getService(
IdempotentImportExecutorExtension.class).getRetryingIdempotentImportExecutor(context);
boolean enableRetrying = context.getSetting("enableRetrying", false);

ImmutableMap.Builder<DataVertical, Importer> importerBuilder = ImmutableMap.builder();
importerBuilder.put(BLOBS, new DriveImporter(credentialFactory, jobStore, monitor));
importerBuilder.put(CONTACTS, new GoogleContactsImporter(credentialFactory));
Expand All @@ -120,7 +126,9 @@ public void initialize(ExtensionContext context) {
jobStore,
jsonFactory,
monitor,
context.getSetting("googleWritesPerSecond", 1.0)));
context.getSetting("googleWritesPerSecond", 1.0),
idempotentImportExecutor,
enableRetrying));
importerBuilder.put(VIDEOS, new GoogleVideosImporter(appCredentials, jobStore, monitor));
importerMap = importerBuilder.build();

Expand All @@ -141,4 +149,4 @@ public void initialize(ExtensionContext context) {

initialized = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,29 @@ public class GooglePhotosImporter
private final Map<UUID, GooglePhotosInterface> photosInterfacesMap;
private final GooglePhotosInterface photosInterface;
private final HashMap<UUID, BaseMultilingualDictionary> multilingualStrings = new HashMap<>();
private IdempotentImportExecutor retryingIdempotentExecutor;
private Boolean enableRetrying;

public GooglePhotosImporter(
GoogleCredentialFactory credentialFactory,
JobStore jobStore,
JsonFactory jsonFactory,
Monitor monitor,
double writesPerSecond,
IdempotentImportExecutor retryingIdempotentExecutor,
boolean enableRetrying) {
this(
credentialFactory,
jobStore,
jsonFactory,
new HashMap<>(),
null,
new ConnectionProvider(jobStore),
monitor,
writesPerSecond,
retryingIdempotentExecutor,
enableRetrying);
}

public GooglePhotosImporter(
GoogleCredentialFactory credentialFactory,
Expand Down Expand Up @@ -106,6 +129,30 @@ public GooglePhotosImporter(
ConnectionProvider connectionProvider,
Monitor monitor,
double writesPerSecond) {
this(
credentialFactory,
jobStore,
jsonFactory,
photosInterfacesMap,
photosInterface,
connectionProvider,
monitor,
writesPerSecond,
null,
false);
}

GooglePhotosImporter(
GoogleCredentialFactory credentialFactory,
JobStore jobStore,
JsonFactory jsonFactory,
Map<UUID, GooglePhotosInterface> photosInterfacesMap,
GooglePhotosInterface photosInterface,
ConnectionProvider connectionProvider,
Monitor monitor,
double writesPerSecond,
IdempotentImportExecutor retryingIdempotentExecutor,
boolean enableRetrying) {
this.credentialFactory = credentialFactory;
this.jobStore = jobStore;
this.jsonFactory = jsonFactory;
Expand All @@ -114,6 +161,8 @@ public GooglePhotosImporter(
this.connectionProvider = connectionProvider;
this.monitor = monitor;
this.writesPerSecond = writesPerSecond;
this.retryingIdempotentExecutor = retryingIdempotentExecutor;
this.enableRetrying = enableRetrying;
}

// TODO(aksingh737) WARNING: stop maintaining this code here; this needs to be reconciled against
Expand All @@ -131,10 +180,12 @@ public ImportResult importItem(
// Nothing to do
return ImportResult.OK;
}
GPhotosUpload gPhotosUpload = new GPhotosUpload(jobId, idempotentImportExecutor, authData);
IdempotentImportExecutor executor =
(retryingIdempotentExecutor != null && enableRetrying) ? retryingIdempotentExecutor : idempotentImportExecutor;
GPhotosUpload gPhotosUpload = new GPhotosUpload(jobId, executor, authData);

for (PhotoAlbum album : data.getAlbums()) {
idempotentImportExecutor.executeAndSwallowIOExceptions(
executor.executeAndSwallowIOExceptions(
album.getId(), album.getName(), () -> importSingleAlbum(jobId, authData, album));
}
long bytes = importPhotos(data.getPhotos(), gPhotosUpload);
Expand All @@ -157,7 +208,7 @@ String importSingleAlbum(UUID jobId, TokensAndUrlAuthData authData, PhotoAlbum i

@VisibleForTesting // TODO(aksingh737,jzacsh) stop exposing this to unit tests
public long importPhotos(Collection<PhotoModel> photos, GPhotosUpload gPhotosUpload)
throws Exception {
throws Exception {
return gPhotosUpload.uploadItemsViaBatching(photos, this::importPhotoBatch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ImportResult call() throws Exception {
Collection<ErrorDetail> errors = idempotentImportExecutor.getRecentErrors();
success = result.getType() == ImportResult.ResultType.OK && errors.isEmpty();

if (!success) {
if (!success && errors.iterator().hasNext() && !errors.iterator().next().canSkip()) {
throw new IOException(
"Problem with importer, forcing a retry, "
+ "first error: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public abstract class ErrorDetail {
private static final String DATA_KEY = "Data";

public static ErrorDetail.Builder builder() {
return new org.datatransferproject.types.transfer.errors.AutoValue_ErrorDetail.Builder();
return new org.datatransferproject.types.transfer.errors.AutoValue_ErrorDetail.Builder()
.setCanSkip(false);
}

@JsonProperty("id")
Expand All @@ -48,6 +49,9 @@ public static ErrorDetail.Builder builder() {
@JsonProperty("exception")
public abstract String exception();

@JsonProperty("canSkip")
public abstract boolean canSkip();

@AutoValue.Builder
public abstract static class Builder {
@JsonCreator
Expand All @@ -65,5 +69,8 @@ private static ErrorDetail.Builder create() {

@JsonProperty("exception")
public abstract Builder setException(String exception);

@JsonProperty("canSkip")
public abstract Builder setCanSkip(boolean canSkip);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@
public class RetryException extends Exception {

private final int triesSoFar;
private final boolean canSkip;

RetryException(int triesSoFar, Exception exception) {
this(triesSoFar, exception, false);
}

RetryException(int triesSoFar, Exception exception, boolean canSkip) {
super(exception);
this.triesSoFar = triesSoFar;
this.canSkip = canSkip;
}

@Override
Expand All @@ -36,4 +42,8 @@ public Exception getCause() {
public int getTriesSoFar() {
return triesSoFar;
}

public boolean canSkip() {
return canSkip;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
@JsonSubTypes({
@JsonSubTypes.Type(value = UniformRetryStrategy.class, name = "Uniform"),
@JsonSubTypes.Type(value = ExponentialBackoffStrategy.class, name = "Exponential"),
@JsonSubTypes.Type(value = NoRetryStrategy.class, name = "Fatal")
@JsonSubTypes.Type(value = NoRetryStrategy.class, name = "Fatal"),
@JsonSubTypes.Type(value = SkipRetryStrategy.class, name = "Skip")
})
public interface RetryStrategy {

Expand All @@ -52,4 +53,9 @@ public interface RetryStrategy {
* Gets milliseconds until the next retry, given elapsed time so far
*/
long getRemainingIntervalMillis(int tries, long elapsedMillis);

/** Shows whether exception should be skipped */
default boolean canSkip() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ public T call() throws RetryException {
monitor.debug(
() ->
String.format("Strategy canTryAgain returned false after %d retries", attempts));
throw new RetryException(attempts, mostRecentException);
if (strategy.canSkip()) {
throw new RetryException(attempts, mostRecentException, true);
} else {
throw new RetryException(attempts, mostRecentException);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2023 The Data Transfer Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.datatransferproject.types.transfer.retry;

/** {@link RetryStrategy} that allows exception to be skipped. Useful for known non fatal errors. */
public class SkipRetryStrategy implements RetryStrategy {

public SkipRetryStrategy() {}

@Override
public boolean canTryAgain(int tries) {
return false;
}

@Override
public long getNextIntervalMillis(int tries) {
return -1L;
}

@Override
public long getRemainingIntervalMillis(int tries, long elapsedMillis) {
return -1L;
}

@Override
public boolean canSkip() {
return true;
}

@Override
public String toString() {
return "SkipRetryStrategy{}";
}
}

0 comments on commit 63ecc8e

Please sign in to comment.