diff --git a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobRunner.java b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobRunner.java index 072679ae..2d4bea25 100644 --- a/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobRunner.java +++ b/java/src/main/java/com/google/appengine/tools/mapreduce/impl/shardedjob/ShardedJobRunner.java @@ -871,13 +871,4 @@ public boolean cleanupJob(ShardedJobRunId jobId) { return true; } - - private String getWorkerServiceHostName(ShardedJobSettings settings) { - String version = Optional.ofNullable(settings.getVersion()).orElseGet(() -> - appEngineServicesService.getDefaultVersion(settings.getModule()) - ); - - return appEngineServicesService.getWorkerServiceHostName(settings.getModule(), version); - } - } diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java index bfad22bf..d06606ab 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/AppEngineServicesServiceImpl.java @@ -12,6 +12,7 @@ import javax.inject.Provider; import javax.inject.Singleton; import java.io.IOException; +import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -110,17 +111,19 @@ static VersionsClient getVersionsClientProvider() { } Cache defaultVersionCache = CacheBuilder.newBuilder() - .maximumSize(10) - .build(); + .maximumSize(10) // avoid unbounded growth leading to OOM + .expireAfterWrite(Duration.ofMinutes(10)) + .build(); + @Deprecated // shouldn't really be needed in prod deployments using CloudTasks rather than GAE Tasks API Cache hostnameCache = CacheBuilder.newBuilder() - .maximumSize(10) + .maximumSize(10) // avoid unbounded growth leading to OOM + .expireAfterWrite(Duration.ofMinutes(10)) .build(); // would only change on re-deployment volatile String location; - @Override public String getDefaultService() { return ConfigProperty.DEFAULT_PIPELINES_SERVICE.getValue() @@ -134,6 +137,7 @@ public String getDefaultVersion(String service) { return defaultVersionCache.get(nonNullService, () -> getDefaultVersionInternal(nonNullService)); } + @Deprecated // only for AppEngineTaskQueue, used in testing / local dev emulation @SneakyThrows @Override public String getWorkerServiceHostName(@NonNull String service, @NonNull String version) { @@ -171,7 +175,6 @@ private synchronized void fillLocation() { } } - private String getDefaultVersionInternal(@NonNull String service) { if (Objects.equals(service, appEngineEnvironment.getService())) { return appEngineEnvironment.getVersion(); diff --git a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java index 3312acc7..e81f452e 100644 --- a/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java +++ b/java/src/main/java/com/google/appengine/tools/pipeline/impl/backend/CloudTasksTaskQueue.java @@ -226,12 +226,12 @@ Task toCloudTask(QueueName queue, TaskSpec taskSpec) { AppEngineHttpRequest.Builder callbackRequest = AppEngineHttpRequest.newBuilder() .putAllHeaders(taskSpec.getHeaders()); - callbackRequest.setAppEngineRouting(AppEngineRouting.newBuilder() - .setService("jobs") - .setVersion("v871a")); - - - // .ifPresent(callbackRequest::setAppEngineRouting); + AppEngineRouting.Builder routing = AppEngineRouting.newBuilder(); + Optional.ofNullable(taskSpec.getService()) + .ifPresent(routing::setService); + Optional.ofNullable(taskSpec.getVersion()) + .ifPresent(routing::setVersion); + callbackRequest.setAppEngineRouting(routing.build()); if (taskSpec.getMethod() == TaskSpec.Method.POST) { callbackRequest.setHttpMethod(HttpMethod.POST);