Skip to content

Commit

Permalink
Use a combiner instead of group by key to calculate the session
Browse files Browse the repository at this point in the history
  • Loading branch information
iht committed Dec 21, 2024
1 parent 8606bfc commit 1b2a413
Showing 1 changed file with 96 additions and 49 deletions.
145 changes: 96 additions & 49 deletions src/main/java/com/google/cloud/pso/transforms/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.pso.data.CustomDataTypes.RideEvent;
import com.google.cloud.pso.data.CustomDataTypes.RideSession;
import java.util.Optional;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
Expand Down Expand Up @@ -96,75 +98,120 @@ public PCollection<RideSession> expand(PCollection<RideEvent> rideEvents) {
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(lateDataWaitSeconds())));

PCollection<KV<String, Iterable<RideEvent>>> grouped =
sessions.apply("Group by ride id", GroupByKey.create());
PCollection<KV<String, RideAccumulator>> accumulated =
sessions.apply("Combine events", Combine.perKey(new SessionPropertiesCombinerFn()));

return grouped.apply("Session properties", ParDo.of(new SessionPropertiesDoFn()));
return accumulated.apply("Session properties", ParDo.of(new SessionPropertiesDoFn()));
}
}

private static class SessionPropertiesCombinerFn
extends CombineFn<RideEvent, RideAccumulator, RideAccumulator> {
@Override
public RideAccumulator createAccumulator() {
return new RideAccumulator();
}

@Override
public RideAccumulator addInput(RideAccumulator accumulator, RideEvent input) {
Instant sessionBeginTimestamp = accumulator.beginTimestamp;
Instant sessionEndTimestamp = accumulator.endTimestamp;

if (sessionBeginTimestamp == null || input.getTimestamp().isBefore(sessionBeginTimestamp)) {
accumulator.beginTimestamp = input.getTimestamp();
accumulator.beginStatus = input.getRideStatus();
}
if (sessionEndTimestamp == null || input.getTimestamp().isAfter(sessionEndTimestamp)) {
accumulator.endTimestamp = input.getTimestamp();
accumulator.endStatus = input.getRideStatus();
}

accumulator.countEvents += 1;

return accumulator;
}

@Override
public RideAccumulator mergeAccumulators(Iterable<RideAccumulator> accumulators) {
Instant begin = null;
Instant end = null;
String beginStatus = "";
String endStatus = "";
int count = 0;

for (RideAccumulator accumulator : accumulators) {
count += accumulator.countEvents;
if (begin == null || accumulator.beginTimestamp.isBefore(begin)) {
begin = accumulator.beginTimestamp;
beginStatus = accumulator.beginStatus;
}
if (end == null || accumulator.endTimestamp.isAfter(end)) {
end = accumulator.endTimestamp;
endStatus = accumulator.endStatus;
}
}

RideAccumulator merged = new RideAccumulator();
merged.beginTimestamp = begin;
merged.endTimestamp = end;
merged.beginStatus = beginStatus;
merged.endStatus = endStatus;
merged.countEvents = count;

return merged;
}

@Override
public RideAccumulator extractOutput(RideAccumulator accumulator) {
return accumulator;
}
}

private static class SessionPropertiesDoFn
extends DoFn<KV<String, Iterable<RideEvent>>, RideSession> {
extends DoFn<KV<String, RideAccumulator>, RideSession> {

// For the full list of processElement parameters, see:
// https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.ProcessElement.html
@ProcessElement
public void processElement(
@Element KV<String, Iterable<RideEvent>> element,
@Element KV<String, RideAccumulator> element,
BoundedWindow window,
PaneInfo pane,
OutputReceiver<RideSession> receiver) {
// Traverse the iterable
String sessionId = element.getKey();
Iterable<RideEvent> events = element.getValue();
RideAccumulator accumulator = element.getValue();
String windowId = window.toString();
String paneInfo = pane.getTiming().toString();
RideSession session = populateSessionProperties(events, sessionId, windowId, paneInfo);

RideSession session =
RideSession.builder()
.setSessionId(sessionId)
.setSessionBeginTimestamp(accumulator.beginTimestamp)
.setSessionEndTimestamp(accumulator.endTimestamp)
.setCountEvents(accumulator.countEvents)
.setBeginStatus(accumulator.beginStatus)
.setEndStatus(accumulator.endStatus)
.setWindowId(windowId)
.setTriggerInfo(paneInfo)
.build();

receiver.output(session);
}
}

private RideSession populateSessionProperties(
Iterable<RideEvent> events, String sessionId, String window, String paneInfo) {
Optional<Instant> startTime = Optional.empty();
Optional<Instant> endTime = Optional.empty();
String beginStatus = "";
String endStatus = "";
int count = 0;
for (RideEvent event : events) {
Instant currentTime = event.getTimestamp();
String status = event.getRideStatus();
// Initialize min and max variables
if (count == 0) {
startTime = Optional.of(currentTime);
beginStatus = status;

endTime = Optional.of(currentTime);
endStatus = status;
}
@DefaultCoder(AvroCoder.class)
public static class RideAccumulator {
public Integer countEvents = 0;

if (currentTime.isBefore(startTime.get())) {
startTime = Optional.of(currentTime);
beginStatus = status;
}
if (currentTime.isAfter(endTime.get())) {
endTime = Optional.of(currentTime);
endStatus = status;
}
public Instant beginTimestamp = null;

count++;
}
public Instant endTimestamp = null;

return RideSession.builder()
.setSessionId(sessionId)
.setSessionBeginTimestamp(startTime.get())
.setSessionEndTimestamp(endTime.get())
.setCountEvents(count)
.setBeginStatus(beginStatus)
.setEndStatus(endStatus)
.setWindowId(window)
.setTriggerInfo(paneInfo)
.build();
}
public String beginStatus = "";

public String endStatus = "";

public RideAccumulator() {}
}

private Session() {}
Expand Down

0 comments on commit 1b2a413

Please sign in to comment.