diff --git a/java/appengine-pipeline-0.3+worklytics.10-pom.xml b/java/appengine-pipeline-0.3+worklytics.11-pom.xml
similarity index 99%
rename from java/appengine-pipeline-0.3+worklytics.10-pom.xml
rename to java/appengine-pipeline-0.3+worklytics.11-pom.xml
index ef279700..d8def549 100644
--- a/java/appengine-pipeline-0.3+worklytics.10-pom.xml
+++ b/java/appengine-pipeline-0.3+worklytics.11-pom.xml
@@ -10,7 +10,7 @@
- 0.3+worklytics.10
+ 0.3+worklytics.11
jar
diff --git a/java/pom.xml b/java/pom.xml
index 5b4028a4..f3be4caf 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -8,7 +8,7 @@
https://github.com/Worklytics/appengine-pipelines/
- 0.3+worklytics.10
+ 0.3+worklytics.11
jar
diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java
index 677e17ad..d90e1422 100644
--- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java
+++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapJob.java
@@ -17,6 +17,7 @@
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.PromisedValue;
import com.google.appengine.tools.pipeline.Value;
+import com.google.appengine.tools.pipeline.impl.PipelineManager;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -69,10 +70,10 @@ public Value> run() {
ShardedJobAbstractSettings settings = this.settings;
if (settings.getWorkerQueueName() == null) {
String queue = getOnQueue();
- if (queue == null) {
+ if (queue == null || DEFAULT_QUEUE_NAME.equals(queue)) {
+ queue = PipelineManager.ConfigProperty.INCREMENTAL_TASK_DEFAULT_QUEUE.getValue().orElse(DEFAULT_QUEUE_NAME);
log.warning("workerQueueName is null and current queue is not available in the pipeline"
- + " job, using 'default'");
- queue = DEFAULT_QUEUE_NAME;
+ + " job, using " + queue);
}
if (settings instanceof MapSettings) {
settings = ((MapSettings) settings).withWorkerQueueName(queue);
diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java
index 49aea3a6..2aef9a53 100644
--- a/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java
+++ b/java/src/main/java/com/google/appengine/tools/mapreduce/MapReduceJob.java
@@ -13,6 +13,7 @@
import com.google.appengine.tools.mapreduce.inputs.GoogleCloudStorageLineInput;
import com.google.appengine.tools.mapreduce.outputs.GoogleCloudStorageFileOutput;
import com.google.appengine.tools.pipeline.*;
+import com.google.appengine.tools.pipeline.impl.PipelineManager;
import com.google.appengine.tools.pipeline.impl.backend.AppEngineEnvironment;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
@@ -34,6 +35,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import static com.google.appengine.tools.pipeline.impl.PipelineManager.DEFAULT_QUEUE_NAME;
+
/**
* A Pipeline job that runs a MapReduce.
@@ -568,9 +571,9 @@ public Value> run() {
if (settings.getWorkerQueueName() == null) {
String queue = getOnQueue();
if (queue == null) {
+ queue = PipelineManager.ConfigProperty.INCREMENTAL_TASK_DEFAULT_QUEUE.getValue().orElse(DEFAULT_QUEUE_NAME);
log.warning("workerQueueName is null and current queue is not available in the pipeline"
- + " job, using 'default'");
- queue = "default";
+ + " job, using " + queue);
}
settings = settings.toBuilder().workerQueueName(queue).build();
}
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
index 67242939..5d393f55 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/PipelineManager.java
@@ -53,7 +53,9 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import lombok.AllArgsConstructor;
+import lombok.Getter;
import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import javax.inject.Inject;
@@ -81,6 +83,14 @@ public class PipelineManager implements PipelineRunner, PipelineOrchestrator {
public static final String DEFAULT_QUEUE_NAME = "default";
+ @RequiredArgsConstructor
+ @Getter
+ public enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigProperty {
+ INCREMENTAL_TASK_DEFAULT_QUEUE("com.google.appengine.tools.pipeline.INCREMENTAL_TASK_DEFAULT_QUEUE");
+
+ final String propertyName;
+ }
+
final Provider pipelineServiceProvider;
final ShardedJobRunner shardedJobRunner;
final PipelineBackEnd backEnd;
@@ -420,8 +430,8 @@ public void stopJob(@NonNull JobRunId jobHandle) throws NoSuchObjectException {
@Override
public JobRunId start(MapSpecification specification, MapSettings settings) {
- if (settings.getWorkerQueueName() == null) {
- settings = settings.toBuilder().workerQueueName(DEFAULT_QUEUE_NAME).build();
+ if (settings.getWorkerQueueName() == null || DEFAULT_QUEUE_NAME.equals(settings.getWorkerQueueName())) {
+ settings = settings.toBuilder().workerQueueName(shardedJobDefaultQueueName()).build();
}
return startNewPipeline(settings.toJobSettings(), new MapJob<>(specification, settings));
}
@@ -430,12 +440,14 @@ public JobRunId start(MapSpecification specification, MapSett
public JobRunId start(@NonNull MapReduceSpecification specification,
@NonNull MapReduceSettings settings) {
if (settings.getWorkerQueueName() == null) {
- settings = settings.toBuilder().workerQueueName(DEFAULT_QUEUE_NAME).build();
+ settings = settings.toBuilder().workerQueueName(shardedJobDefaultQueueName()).build();
}
return startNewPipeline(settings.toJobSettings(), new MapReduceJob<>(specification, settings));
}
-
+ private String shardedJobDefaultQueueName() {
+ return ConfigProperty.INCREMENTAL_TASK_DEFAULT_QUEUE.getValue().orElse(DEFAULT_QUEUE_NAME);
+ }
public void cancelJob(@NonNull JobRunId jobHandle) throws NoSuchObjectException {
Key key = JobRecord.keyFromPipelineHandle(jobHandle);
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java
index d06606ab..d356a762 100644
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java
@@ -48,6 +48,11 @@ enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigPr
*/
DEFAULT_PIPELINES_SERVICE,
;
+
+ @Override
+ public String getPropertyName() {
+ return this.name();
+ }
}
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java
index e81f452e..6f36dba0 100644
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java
@@ -41,6 +41,11 @@ public class CloudTasksTaskQueue implements PipelineTaskQueue {
enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigProperty {
CLOUDTASKS_QUEUE_LOCATION,
;
+
+ @Override
+ public String getPropertyName() {
+ return name();
+ }
}
@NonNull
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java
index f9fef35f..fb642968 100755
--- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/servlets/PipelineServlet.java
@@ -20,12 +20,16 @@
import com.google.appengine.tools.pipeline.di.JobRunServiceComponentContainer;
import com.google.appengine.tools.pipeline.util.Pair;
import com.google.common.annotations.VisibleForTesting;
-import lombok.Setter;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
@@ -39,7 +43,13 @@
*/
public class PipelineServlet extends HttpServlet {
- public static final String BASE_URL_PROPERTY = "com.google.appengine.tools.pipeline.BASE_URL";
+ @RequiredArgsConstructor
+ @Getter
+ enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigProperty {
+ BASE_URL_PROPERTY("com.google.appengine.tools.pipeline.BASE_URL");
+
+ final String propertyName;
+ }
@Setter(onMethod_ = @VisibleForTesting)
JobRunServiceComponent component;
@@ -81,7 +91,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp)
* This must match the URL in web.xml
*/
public static String baseUrl() {
- String baseURL = System.getProperty(BASE_URL_PROPERTY, "/_ah/pipeline/");
+ String baseURL = ConfigProperty.BASE_URL_PROPERTY.getValue().orElse( "/_ah/pipeline/");
if (!baseURL.endsWith("/")) {
baseURL += "/";
}
diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/util/ConfigProperty.java b/java/src/main/java/com/google/appengine/tools/pipeline/util/ConfigProperty.java
index 9e2a6004..c01c09d6 100644
--- a/java/src/main/java/com/google/appengine/tools/pipeline/util/ConfigProperty.java
+++ b/java/src/main/java/com/google/appengine/tools/pipeline/util/ConfigProperty.java
@@ -15,13 +15,10 @@ enum ConfigProperty implements ConfigProperty {
*/
public interface ConfigProperty {
- String name();
+ String getPropertyName();
default Optional getValue() {
- return Optional.ofNullable(System.getProperty(name(), System.getenv(name())));
+ return Optional.ofNullable(System.getProperty(getPropertyName(), System.getenv(getPropertyName())));
}
- default Optional getBoolean() {
- return getValue().map(Boolean::parseBoolean);
- }
}