diff --git a/java/appengine-pipeline-0.3+worklytics.10-pom.xml b/java/appengine-pipeline-0.3+worklytics.11-pom.xml similarity index 99% rename from java/appengine-pipeline-0.3+worklytics.10-pom.xml rename to java/appengine-pipeline-0.3+worklytics.11-pom.xml index ef279700..d8def549 100644 --- a/java/appengine-pipeline-0.3+worklytics.10-pom.xml +++ b/java/appengine-pipeline-0.3+worklytics.11-pom.xml @@ -10,7 +10,7 @@ - 0.3+worklytics.10 + 0.3+worklytics.11 jar diff --git a/java/pom.xml b/java/pom.xml index 5b4028a4..f3be4caf 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -8,7 +8,7 @@ https://github.com/Worklytics/appengine-pipelines/ - 0.3+worklytics.10 + 0.3+worklytics.11 jar diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java index 677e17ad..d90e1422 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java @@ -17,6 +17,7 @@ import com.google.appengine.tools.pipeline.JobSetting; import com.google.appengine.tools.pipeline.PromisedValue; import com.google.appengine.tools.pipeline.Value; +import com.google.appengine.tools.pipeline.impl.PipelineManager; import com.google.cloud.datastore.DatastoreOptions; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -69,10 +70,10 @@ public Value> run() { ShardedJobAbstractSettings settings = this.settings; if (settings.getWorkerQueueName() == null) { String queue = getOnQueue(); - if (queue == null) { + if (queue == null || DEFAULT_QUEUE_NAME.equals(queue)) { + queue = PipelineManager.ConfigProperty.INCREMENTAL_TASK_DEFAULT_QUEUE.getValue().orElse(DEFAULT_QUEUE_NAME); log.warning("workerQueueName is null and current queue is not available in the pipeline" - + " job, using 'default'"); - queue = DEFAULT_QUEUE_NAME; + + " job, using " + queue); } if (settings instanceof MapSettings) { settings = ((MapSettings) settings).withWorkerQueueName(queue); diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java index 49aea3a6..2aef9a53 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java @@ -13,6 +13,7 @@ import com.google.appengine.tools.mapreduce.inputs.GoogleCloudStorageLineInput; import com.google.appengine.tools.mapreduce.outputs.GoogleCloudStorageFileOutput; import com.google.appengine.tools.pipeline.*; +import com.google.appengine.tools.pipeline.impl.PipelineManager; import com.google.appengine.tools.pipeline.impl.backend.AppEngineEnvironment; import com.google.cloud.datastore.Datastore; import com.google.cloud.datastore.DatastoreOptions; @@ -34,6 +35,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import static com.google.appengine.tools.pipeline.impl.PipelineManager.DEFAULT_QUEUE_NAME; + /** * A Pipeline job that runs a MapReduce. @@ -568,9 +571,9 @@ public Value> run() { if (settings.getWorkerQueueName() == null) { String queue = getOnQueue(); if (queue == null) { + queue = PipelineManager.ConfigProperty.INCREMENTAL_TASK_DEFAULT_QUEUE.getValue().orElse(DEFAULT_QUEUE_NAME); log.warning("workerQueueName is null and current queue is not available in the pipeline" - + " job, using 'default'"); - queue = "default"; + + " job, using " + queue); } settings = settings.toBuilder().workerQueueName(queue).build(); } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java index 67242939..5d393f55 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java @@ -53,7 +53,9 @@ import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; import lombok.AllArgsConstructor; +import lombok.Getter; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import lombok.extern.java.Log; import javax.inject.Inject; @@ -81,6 +83,14 @@ public class PipelineManager implements PipelineRunner, PipelineOrchestrator { public static final String DEFAULT_QUEUE_NAME = "default"; + @RequiredArgsConstructor + @Getter + public enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigProperty { + INCREMENTAL_TASK_DEFAULT_QUEUE("com.google.appengine.tools.pipeline.INCREMENTAL_TASK_DEFAULT_QUEUE"); + + final String propertyName; + } + final Provider pipelineServiceProvider; final ShardedJobRunner shardedJobRunner; final PipelineBackEnd backEnd; @@ -420,8 +430,8 @@ public void stopJob(@NonNull JobRunId jobHandle) throws NoSuchObjectException { @Override public JobRunId start(MapSpecification specification, MapSettings settings) { - if (settings.getWorkerQueueName() == null) { - settings = settings.toBuilder().workerQueueName(DEFAULT_QUEUE_NAME).build(); + if (settings.getWorkerQueueName() == null || DEFAULT_QUEUE_NAME.equals(settings.getWorkerQueueName())) { + settings = settings.toBuilder().workerQueueName(shardedJobDefaultQueueName()).build(); } return startNewPipeline(settings.toJobSettings(), new MapJob<>(specification, settings)); } @@ -430,12 +440,14 @@ public JobRunId start(MapSpecification specification, MapSett public JobRunId start(@NonNull MapReduceSpecification specification, @NonNull MapReduceSettings settings) { if (settings.getWorkerQueueName() == null) { - settings = settings.toBuilder().workerQueueName(DEFAULT_QUEUE_NAME).build(); + settings = settings.toBuilder().workerQueueName(shardedJobDefaultQueueName()).build(); } return startNewPipeline(settings.toJobSettings(), new MapReduceJob<>(specification, settings)); } - + private String shardedJobDefaultQueueName() { + return ConfigProperty.INCREMENTAL_TASK_DEFAULT_QUEUE.getValue().orElse(DEFAULT_QUEUE_NAME); + } public void cancelJob(@NonNull JobRunId jobHandle) throws NoSuchObjectException { Key key = JobRecord.keyFromPipelineHandle(jobHandle); diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java index d06606ab..d356a762 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java @@ -48,6 +48,11 @@ enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigPr */ DEFAULT_PIPELINES_SERVICE, ; + + @Override + public String getPropertyName() { + return this.name(); + } } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java index e81f452e..6f36dba0 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java @@ -41,6 +41,11 @@ public class CloudTasksTaskQueue implements PipelineTaskQueue { enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigProperty { CLOUDTASKS_QUEUE_LOCATION, ; + + @Override + public String getPropertyName() { + return name(); + } } @NonNull diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java index f9fef35f..fb642968 100755 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java @@ -20,12 +20,16 @@ import com.google.appengine.tools.pipeline.di.JobRunServiceComponentContainer; import com.google.appengine.tools.pipeline.util.Pair; import com.google.common.annotations.VisibleForTesting; -import lombok.Setter; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; + import java.io.IOException; import java.util.Arrays; import java.util.Optional; @@ -39,7 +43,13 @@ */ public class PipelineServlet extends HttpServlet { - public static final String BASE_URL_PROPERTY = "com.google.appengine.tools.pipeline.BASE_URL"; + @RequiredArgsConstructor + @Getter + enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigProperty { + BASE_URL_PROPERTY("com.google.appengine.tools.pipeline.BASE_URL"); + + final String propertyName; + } @Setter(onMethod_ = @VisibleForTesting) JobRunServiceComponent component; @@ -81,7 +91,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp) * This must match the URL in web.xml */ public static String baseUrl() { - String baseURL = System.getProperty(BASE_URL_PROPERTY, "/_ah/pipeline/"); + String baseURL = ConfigProperty.BASE_URL_PROPERTY.getValue().orElse( "/_ah/pipeline/"); if (!baseURL.endsWith("/")) { baseURL += "/"; } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/util/ConfigProperty.java b/java/src/main/java/com/google/appengine/tools/pipeline/util/ConfigProperty.java index 9e2a6004..c01c09d6 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/util/ConfigProperty.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/util/ConfigProperty.java @@ -15,13 +15,10 @@ enum ConfigProperty implements ConfigProperty { */ public interface ConfigProperty { - String name(); + String getPropertyName(); default Optional getValue() { - return Optional.ofNullable(System.getProperty(name(), System.getenv(name()))); + return Optional.ofNullable(System.getProperty(getPropertyName(), System.getenv(getPropertyName()))); } - default Optional getBoolean() { - return getValue().map(Boolean::parseBoolean); - } }