From 1b2a413a723ec34739d4e78842fc11fc106aea8e Mon Sep 17 00:00:00 2001 From: Israel Herraiz Date: Sat, 21 Dec 2024 13:18:25 +0100 Subject: [PATCH] Use a combiner instead of group by key to calculate the session --- .../google/cloud/pso/transforms/Session.java | 145 ++++++++++++------ 1 file changed, 96 insertions(+), 49 deletions(-) diff --git a/src/main/java/com/google/cloud/pso/transforms/Session.java b/src/main/java/com/google/cloud/pso/transforms/Session.java index f5e5c05..985d83d 100644 --- a/src/main/java/com/google/cloud/pso/transforms/Session.java +++ b/src/main/java/com/google/cloud/pso/transforms/Session.java @@ -19,9 +19,11 @@ import com.google.auto.value.AutoValue; import com.google.cloud.pso.data.CustomDataTypes.RideEvent; import com.google.cloud.pso.data.CustomDataTypes.RideSession; -import java.util.Optional; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; @@ -96,75 +98,120 @@ public PCollection expand(PCollection rideEvents) { .accumulatingFiredPanes() .withAllowedLateness(Duration.standardSeconds(lateDataWaitSeconds()))); - PCollection>> grouped = - sessions.apply("Group by ride id", GroupByKey.create()); + PCollection> accumulated = + sessions.apply("Combine events", Combine.perKey(new SessionPropertiesCombinerFn())); - return grouped.apply("Session properties", ParDo.of(new SessionPropertiesDoFn())); + return accumulated.apply("Session properties", ParDo.of(new SessionPropertiesDoFn())); + } + } + + private static class SessionPropertiesCombinerFn + extends CombineFn { + @Override + public RideAccumulator createAccumulator() { + return new RideAccumulator(); + } + + @Override + public RideAccumulator addInput(RideAccumulator accumulator, RideEvent input) { + Instant sessionBeginTimestamp = accumulator.beginTimestamp; + Instant sessionEndTimestamp = accumulator.endTimestamp; + + if (sessionBeginTimestamp == null || input.getTimestamp().isBefore(sessionBeginTimestamp)) { + accumulator.beginTimestamp = input.getTimestamp(); + accumulator.beginStatus = input.getRideStatus(); + } + if (sessionEndTimestamp == null || input.getTimestamp().isAfter(sessionEndTimestamp)) { + accumulator.endTimestamp = input.getTimestamp(); + accumulator.endStatus = input.getRideStatus(); + } + + accumulator.countEvents += 1; + + return accumulator; + } + + @Override + public RideAccumulator mergeAccumulators(Iterable accumulators) { + Instant begin = null; + Instant end = null; + String beginStatus = ""; + String endStatus = ""; + int count = 0; + + for (RideAccumulator accumulator : accumulators) { + count += accumulator.countEvents; + if (begin == null || accumulator.beginTimestamp.isBefore(begin)) { + begin = accumulator.beginTimestamp; + beginStatus = accumulator.beginStatus; + } + if (end == null || accumulator.endTimestamp.isAfter(end)) { + end = accumulator.endTimestamp; + endStatus = accumulator.endStatus; + } + } + + RideAccumulator merged = new RideAccumulator(); + merged.beginTimestamp = begin; + merged.endTimestamp = end; + merged.beginStatus = beginStatus; + merged.endStatus = endStatus; + merged.countEvents = count; + + return merged; + } + + @Override + public RideAccumulator extractOutput(RideAccumulator accumulator) { + return accumulator; } } private static class SessionPropertiesDoFn - extends DoFn>, RideSession> { + extends DoFn, RideSession> { // For the full list of processElement parameters, see: // https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html @ProcessElement public void processElement( - @Element KV> element, + @Element KV element, BoundedWindow window, PaneInfo pane, OutputReceiver receiver) { - // Traverse the iterable String sessionId = element.getKey(); - Iterable events = element.getValue(); + RideAccumulator accumulator = element.getValue(); String windowId = window.toString(); String paneInfo = pane.getTiming().toString(); - RideSession session = populateSessionProperties(events, sessionId, windowId, paneInfo); + + RideSession session = + RideSession.builder() + .setSessionId(sessionId) + .setSessionBeginTimestamp(accumulator.beginTimestamp) + .setSessionEndTimestamp(accumulator.endTimestamp) + .setCountEvents(accumulator.countEvents) + .setBeginStatus(accumulator.beginStatus) + .setEndStatus(accumulator.endStatus) + .setWindowId(windowId) + .setTriggerInfo(paneInfo) + .build(); + receiver.output(session); } + } - private RideSession populateSessionProperties( - Iterable events, String sessionId, String window, String paneInfo) { - Optional startTime = Optional.empty(); - Optional endTime = Optional.empty(); - String beginStatus = ""; - String endStatus = ""; - int count = 0; - for (RideEvent event : events) { - Instant currentTime = event.getTimestamp(); - String status = event.getRideStatus(); - // Initialize min and max variables - if (count == 0) { - startTime = Optional.of(currentTime); - beginStatus = status; - - endTime = Optional.of(currentTime); - endStatus = status; - } + @DefaultCoder(AvroCoder.class) + public static class RideAccumulator { + public Integer countEvents = 0; - if (currentTime.isBefore(startTime.get())) { - startTime = Optional.of(currentTime); - beginStatus = status; - } - if (currentTime.isAfter(endTime.get())) { - endTime = Optional.of(currentTime); - endStatus = status; - } + public Instant beginTimestamp = null; - count++; - } + public Instant endTimestamp = null; - return RideSession.builder() - .setSessionId(sessionId) - .setSessionBeginTimestamp(startTime.get()) - .setSessionEndTimestamp(endTime.get()) - .setCountEvents(count) - .setBeginStatus(beginStatus) - .setEndStatus(endStatus) - .setWindowId(window) - .setTriggerInfo(paneInfo) - .build(); - } + public String beginStatus = ""; + + public String endStatus = ""; + + public RideAccumulator() {} } private Session() {}