Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<!-- just rename the version and copy over -->
<!-- follow semver, with suggested guidance for versioning fork of OSS -->
<!-- see https://gofore.com/en/best-practices-for-forking-a-git-repo/ -->
<version>0.3+worklytics.10</version>
<version>0.3+worklytics.11</version>
<packaging>jar</packaging>
<licenses>
<license>
Expand Down
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<url>https://github.com/Worklytics/appengine-pipelines/</url>
<!-- follow semver, with suggested guidance for versioning fork of OSS -->
<!-- see https://gofore.com/en/best-practices-for-forking-a-git-repo/ -->
<version>0.3+worklytics.10</version>
<version>0.3+worklytics.11</version>
<packaging>jar</packaging>
<licenses>
<license>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.PromisedValue;
import com.google.appengine.tools.pipeline.Value;
import com.google.appengine.tools.pipeline.impl.PipelineManager;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -69,10 +70,10 @@ public Value<MapReduceResult<R>> run() {
ShardedJobAbstractSettings settings = this.settings;
if (settings.getWorkerQueueName() == null) {
String queue = getOnQueue();
if (queue == null) {
if (queue == null || DEFAULT_QUEUE_NAME.equals(queue)) {
queue = PipelineManager.ConfigProperty.INCREMENTAL_TASK_DEFAULT_QUEUE.getValue().orElse(DEFAULT_QUEUE_NAME);
log.warning("workerQueueName is null and current queue is not available in the pipeline"
+ " job, using 'default'");
queue = DEFAULT_QUEUE_NAME;
+ " job, using " + queue);
}
if (settings instanceof MapSettings) {
settings = ((MapSettings) settings).withWorkerQueueName(queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.appengine.tools.mapreduce.inputs.GoogleCloudStorageLineInput;
import com.google.appengine.tools.mapreduce.outputs.GoogleCloudStorageFileOutput;
import com.google.appengine.tools.pipeline.*;
import com.google.appengine.tools.pipeline.impl.PipelineManager;
import com.google.appengine.tools.pipeline.impl.backend.AppEngineEnvironment;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreOptions;
Expand All @@ -34,6 +35,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import static com.google.appengine.tools.pipeline.impl.PipelineManager.DEFAULT_QUEUE_NAME;


/**
* A Pipeline job that runs a MapReduce.
Expand Down Expand Up @@ -568,9 +571,9 @@ public Value<MapReduceResult<R>> run() {
if (settings.getWorkerQueueName() == null) {
String queue = getOnQueue();
if (queue == null) {
queue = PipelineManager.ConfigProperty.INCREMENTAL_TASK_DEFAULT_QUEUE.getValue().orElse(DEFAULT_QUEUE_NAME);
log.warning("workerQueueName is null and current queue is not available in the pipeline"
+ " job, using 'default'");
queue = "default";
+ " job, using " + queue);
}
settings = settings.toBuilder().workerQueueName(queue).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;

import javax.inject.Inject;
Expand Down Expand Up @@ -81,6 +83,14 @@ public class PipelineManager implements PipelineRunner, PipelineOrchestrator {

public static final String DEFAULT_QUEUE_NAME = "default";

@RequiredArgsConstructor
@Getter
public enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigProperty {
INCREMENTAL_TASK_DEFAULT_QUEUE("com.google.appengine.tools.pipeline.INCREMENTAL_TASK_DEFAULT_QUEUE");

final String propertyName;
}

final Provider<PipelineService> pipelineServiceProvider;
final ShardedJobRunner shardedJobRunner;
final PipelineBackEnd backEnd;
Expand Down Expand Up @@ -420,8 +430,8 @@ public void stopJob(@NonNull JobRunId jobHandle) throws NoSuchObjectException {

@Override
public <I, O, R> JobRunId start(MapSpecification<I, O, R> specification, MapSettings settings) {
if (settings.getWorkerQueueName() == null) {
settings = settings.toBuilder().workerQueueName(DEFAULT_QUEUE_NAME).build();
if (settings.getWorkerQueueName() == null || DEFAULT_QUEUE_NAME.equals(settings.getWorkerQueueName())) {
settings = settings.toBuilder().workerQueueName(shardedJobDefaultQueueName()).build();
}
return startNewPipeline(settings.toJobSettings(), new MapJob<>(specification, settings));
}
Expand All @@ -430,12 +440,14 @@ public <I, O, R> JobRunId start(MapSpecification<I, O, R> specification, MapSett
public <I, K, V, O, R> JobRunId start(@NonNull MapReduceSpecification<I, K, V, O, R> specification,
@NonNull MapReduceSettings settings) {
if (settings.getWorkerQueueName() == null) {
settings = settings.toBuilder().workerQueueName(DEFAULT_QUEUE_NAME).build();
settings = settings.toBuilder().workerQueueName(shardedJobDefaultQueueName()).build();
}
return startNewPipeline(settings.toJobSettings(), new MapReduceJob<>(specification, settings));
}


private String shardedJobDefaultQueueName() {
return ConfigProperty.INCREMENTAL_TASK_DEFAULT_QUEUE.getValue().orElse(DEFAULT_QUEUE_NAME);
}

public void cancelJob(@NonNull JobRunId jobHandle) throws NoSuchObjectException {
Key key = JobRecord.keyFromPipelineHandle(jobHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigPr
*/
DEFAULT_PIPELINES_SERVICE,
;

@Override
public String getPropertyName() {
return this.name();
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public class CloudTasksTaskQueue implements PipelineTaskQueue {
enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigProperty {
CLOUDTASKS_QUEUE_LOCATION,
;

@Override
public String getPropertyName() {
return name();
}
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import com.google.appengine.tools.pipeline.di.JobRunServiceComponentContainer;
import com.google.appengine.tools.pipeline.util.Pair;
import com.google.common.annotations.VisibleForTesting;
import lombok.Setter;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
Expand All @@ -39,7 +43,13 @@
*/
public class PipelineServlet extends HttpServlet {

public static final String BASE_URL_PROPERTY = "com.google.appengine.tools.pipeline.BASE_URL";
@RequiredArgsConstructor
@Getter
enum ConfigProperty implements com.google.appengine.tools.pipeline.util.ConfigProperty {
BASE_URL_PROPERTY("com.google.appengine.tools.pipeline.BASE_URL");

final String propertyName;
}

@Setter(onMethod_ = @VisibleForTesting)
JobRunServiceComponent component;
Expand Down Expand Up @@ -81,7 +91,7 @@ public void doGet(HttpServletRequest req, HttpServletResponse resp)
* This must match the URL in web.xml
*/
public static String baseUrl() {
String baseURL = System.getProperty(BASE_URL_PROPERTY, "/_ah/pipeline/");
String baseURL = ConfigProperty.BASE_URL_PROPERTY.getValue().orElse( "/_ah/pipeline/");
if (!baseURL.endsWith("/")) {
baseURL += "/";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@ enum ConfigProperty implements ConfigProperty {
*/
public interface ConfigProperty {

String name();
String getPropertyName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

advantage of name() is that you get implementation for free from an enum


default Optional<String> getValue() {
return Optional.ofNullable(System.getProperty(name(), System.getenv(name())));
return Optional.ofNullable(System.getProperty(getPropertyName(), System.getenv(getPropertyName())));
}

default Optional<Boolean> getBoolean() {
return getValue().map(Boolean::parseBoolean);
}
}
Loading