From 5ff04c135509ee49fcf3803ff6f6ee920507e1eb Mon Sep 17 00:00:00 2001 From: Israel Herraiz Date: Sat, 21 Dec 2024 16:46:38 +0100 Subject: [PATCH] Avoid setting job names with updates, ignore zero accumulators --- src/main/java/com/google/cloud/pso/RunPipeline.java | 11 ++++++----- .../java/com/google/cloud/pso/transforms/Session.java | 4 ++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/google/cloud/pso/RunPipeline.java b/src/main/java/com/google/cloud/pso/RunPipeline.java index 639e5c3..b5ef28e 100644 --- a/src/main/java/com/google/cloud/pso/RunPipeline.java +++ b/src/main/java/com/google/cloud/pso/RunPipeline.java @@ -19,6 +19,7 @@ import com.google.cloud.pso.options.TaxiSessionsOptions; import com.google.cloud.pso.pipelines.TaxiSessionsPipeline; import java.time.Instant; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -43,11 +44,11 @@ public static void main(String[] args) { } if (p != null) { - String jobName = options.getJobName(); - // Set name only if not empty. - // If jobName is already set, it might be because we are updating the job. - if (jobName == null || jobName.isEmpty()) jobName = getJobName(pipelineToRun); - p.getOptions().setJobName(jobName); + DataflowPipelineOptions dataflow = options.as(DataflowPipelineOptions.class); + // Set name only if we are not updating a Dataflow pipeline + if (!dataflow.isUpdate()) { + dataflow.setJobName(getJobName(pipelineToRun)); + } p.run().waitUntilFinish(); } else { System.out.println("Unrecognized pipeline type " + pipelineToRun); diff --git a/src/main/java/com/google/cloud/pso/transforms/Session.java b/src/main/java/com/google/cloud/pso/transforms/Session.java index 3e8b83f..cf1c62e 100644 --- a/src/main/java/com/google/cloud/pso/transforms/Session.java +++ b/src/main/java/com/google/cloud/pso/transforms/Session.java @@ -140,6 +140,10 @@ public RideAccumulator mergeAccumulators(Iterable accumulators) int count = 0; for (RideAccumulator accumulator : accumulators) { + if (accumulator.countEvents == 0) { + // Ignoring empty accumlator ("zeros") + continue; + } count += accumulator.countEvents; if (begin == null || accumulator.beginTimestamp.isBefore(begin)) { begin = accumulator.beginTimestamp;