diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java index 7f7004df..cb393c5c 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineBackEnd.java @@ -748,11 +748,33 @@ public void deletePipeline(JobRunId pipelineRunId, boolean force) } } - deleteAll(JobRecord.DATA_STORE_KIND, pipelineKey); - deleteAll(Slot.DATA_STORE_KIND, pipelineKey); - deleteAll(ShardedValue.DATA_STORE_KIND, pipelineKey); - deleteAll(Barrier.DATA_STORE_KIND, pipelineKey); - deleteAll(JobInstanceRecord.DATA_STORE_KIND, pipelineKey); - deleteAll(FanoutTaskRecord.DATA_STORE_KIND, pipelineKey); + List kindsToDelete = Arrays.asList( + JobRecord.DATA_STORE_KIND, + Slot.DATA_STORE_KIND, + ShardedValue.DATA_STORE_KIND, + Barrier.DATA_STORE_KIND, + JobInstanceRecord.DATA_STORE_KIND, + ExceptionRecord.DATA_STORE_KIND + ); + + kindsToDelete.parallelStream() + .forEach(kind -> deleteAll(kind, pipelineKey)); + } + + private R attemptWithRetries(Retryer retryer, final Operation operation) { + try { + return retryer.call(operation); + } catch (ExecutionException e) { + log.log(Level.WARNING, "Non-retryable exception during " + operation.getName(), e.getCause()); + throw new RuntimeException(e.getCause()); + } catch (RetryException e) { + if (e.getCause() instanceof RuntimeException) { + log.warning(e.getCause().getMessage() + " during " + operation.getName() + + " throwing after 5 multiple attempts "); + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e); + } + } } }