diff --git a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java index 2a5f85e524ed..9b2c21a9e0da 100644 --- a/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java +++ b/examples/java/iceberg/src/main/java/org/apache/beam/examples/iceberg/IcebergBatchWriteExample.java @@ -28,6 +28,8 @@ import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -74,9 +76,9 @@ private static Row flattenAnalyticsRow(Row row) { static class ExtractBrowserTransactionsFn extends DoFn> { @ProcessElement - public void processElement(ProcessContext c) { - Row row = c.element(); - c.output( + public void processElement(@Element Row element, OutputReceiver> receiver) { + Row row = element; + receiver.output( KV.of( Preconditions.checkStateNotNull(row.getString("browser")), Preconditions.checkStateNotNull(row.getInt64("transactions")))); @@ -85,13 +87,13 @@ public void processElement(ProcessContext c) { static class FormatCountsFn extends DoFn, Row> { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement(@Element KV element, OutputReceiver receiver) { Row row = Row.withSchema(AGGREGATED_SCHEMA) - .withFieldValue("browser", c.element().getKey()) - .withFieldValue("transaction_count", c.element().getValue()) + .withFieldValue("browser", element.getKey()) + .withFieldValue("transaction_count", element.getValue()) .build(); - c.output(row); + receiver.output(row); } } diff --git a/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java index 5b8e4a34cf2c..84155683e9cc 100644 --- a/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java +++ b/examples/java/sql/src/main/java/org/apache/beam/examples/SchemaTransformExample.java @@ -42,6 +42,8 @@ import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.ParDo; @@ -101,9 +103,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java b/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java index 9c2302c3de2f..9ca2dda313c6 100644 --- a/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java +++ b/examples/java/sql/src/main/java/org/apache/beam/examples/SqlTransformExample.java @@ -41,6 +41,8 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -95,9 +97,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java b/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java index 9e2a96b1eca9..0f968e88d392 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/ApproximateQuantilesExample.java @@ -24,6 +24,8 @@ import org.apache.beam.sdk.transforms.ApproximateQuantiles; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; @@ -70,9 +72,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java index 0ee7d7bcac89..1aafd50a1621 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CoCombineTransformExample.java @@ -46,6 +46,8 @@ import org.apache.beam.sdk.transforms.CombineFns; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.ParDo; @@ -185,13 +187,16 @@ public Long apply(Long input) { new DoFn< KV, KV>>>() { @ProcessElement - public void processElement(ProcessContext c) throws Exception { - CombineFns.CoCombineResult e = c.element().getValue(); + public void processElement( + @Element KV element, + OutputReceiver>>> receiver) + throws Exception { + CombineFns.CoCombineResult e = element.getValue(); ArrayList> o = new ArrayList>(); o.add(KV.of(minTag.getId(), e.get(minTag))); o.add(KV.of(maxTag.getId(), e.get(maxTag))); o.add(KV.of(sumTag.getId(), e.get(sumTag))); - c.output(KV.of(c.element().getKey(), o)); + receiver.output(KV.of(element.getKey(), o)); } })); @@ -210,9 +215,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java index c77708b5de20..46905ad71daf 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CoGroupByKeyExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -84,9 +86,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java b/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java index 24bed27c2360..5f6901ea5ec4 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CombineExample.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.PCollection; @@ -68,9 +70,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/CountExample.java b/examples/java/src/main/java/org/apache/beam/examples/CountExample.java index cb0bd0ecf943..f95ec3fe4759 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CountExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CountExample.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; @@ -63,9 +65,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java index 9bf9bf1ef00f..f02c262ccbab 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CountPerKeyExample.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -67,9 +69,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java b/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java index 5943ffa489d4..b5eb5ba3e9de 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/CreateExample.java @@ -27,6 +27,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -79,9 +81,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index 7c54e238da33..07c5be76d753 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -46,6 +46,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -115,18 +117,19 @@ public FilterTextFn(String pattern) { private final Counter unmatchedWords = Metrics.counter(FilterTextFn.class, "unmatchedWords"); @ProcessElement - public void processElement(ProcessContext c) { - if (filter.matcher(c.element().getKey()).matches()) { + public void processElement( + @Element KV element, OutputReceiver> receiver) { + if (filter.matcher(element.getKey()).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline // these log lines will appear only if the log level is set to "DEBUG" or lower. - LOG.debug("Matched: {}", c.element().getKey()); + LOG.debug("Matched: {}", element.getKey()); matchedWords.inc(); - c.output(c.element()); + receiver.output(element); } else { // Log at the "TRACE" level each element that is not matched. Different log levels // can be used to control the verbosity of logging providing an effective mechanism // to filter less important information. - LOG.trace("Did not match: {}", c.element().getKey()); + LOG.trace("Did not match: {}", element.getKey()); unmatchedWords.inc(); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java index d3ff9d663f44..28fd4af87066 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DistinctExample.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Distinct; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; @@ -68,9 +70,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java b/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java index 71d05ac7ade3..4e9493ac1cf2 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/FlatMapElementsExample.java @@ -24,6 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.InferableFunction; import org.apache.beam.sdk.transforms.ParDo; @@ -78,9 +80,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java b/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java index 78e898b9c173..212079579ee0 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -74,9 +76,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java index 20f70232ae94..5b26455e3c5f 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java +++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaPassengerCountJson.java @@ -55,6 +55,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.Values; @@ -109,10 +111,13 @@ public static void main(String[] args) { ParDo.of( new DoFn>() { @ProcessElement - public void processElement(ProcessContext c) throws JsonProcessingException { + public void processElement( + @Element String element, OutputReceiver> receiver) + throws JsonProcessingException { final VendorToPassengerDTO result = - om.readValue(c.element(), new TypeReference() {}); - c.output(KV.of(result.getVendorIdField(), result.getPassengerCountField())); + om.readValue(element, new TypeReference() {}); + receiver.output( + KV.of(result.getVendorIdField(), result.getPassengerCountField())); } })) .apply( @@ -124,11 +129,11 @@ public void processElement(ProcessContext c) throws JsonProcessingException { new DoFn, KV>() { @ProcessElement public void processElement( - ProcessContext c, OutputReceiver> out) { + OutputReceiver> out, + @Element KV element) { System.out.printf( - "Vendor: %s, Passengers: %s%n", - c.element().getKey(), c.element().getValue()); - out.output(c.element()); + "Vendor: %s, Passengers: %s%n", element.getKey(), element.getValue()); + out.output(element); } })); p.run().waitUntilFinish(); diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java index f0b09226865a..cacce131750c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java +++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaStreaming.java @@ -49,6 +49,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -175,8 +177,9 @@ static class RandomUserScoreGeneratorFn extends DoFn private static final Random RANDOM = new Random(); @ProcessElement - public void processElement(ProcessContext c) { - c.output(generate()); + public void processElement( + @Element Object element, OutputReceiver> receiver) { + receiver.output(generate()); } public KV generate() { @@ -290,17 +293,22 @@ public Map extractOutput(Map accumulator) { static class LogResults extends DoFn, Map> { @ProcessElement - public void processElement(ProcessContext c, IntervalWindow w) throws Exception { - Map map = c.element(); + public void processElement( + PaneInfo pane, + IntervalWindow w, + @Element Map element, + OutputReceiver> receiver) + throws Exception { + Map map = element; if (map == null) { - c.output(c.element()); + receiver.output(element); return; } String startTime = w.start().toString(dateTimeFormatter); String endTime = w.end().toString(dateTimeFormatter); - PaneInfo.Timing timing = c.pane().getTiming(); + PaneInfo.Timing timing = pane.getTiming(); switch (timing) { case EARLY: @@ -326,7 +334,7 @@ public void processElement(ProcessContext c, IntervalWindow w) throws Exception System.out.println(); } - c.output(c.element()); + receiver.output(element); } } @@ -340,9 +348,9 @@ public PCollection expand(PCollection input) { static class LogErrorFn extends DoFn { @ProcessElement - public void processElement(@Element BadRecord record, OutputReceiver receiver) { - System.out.println(record); - receiver.output(record); + public void processElement(@Element BadRecord element, OutputReceiver receiver) { + System.out.println(element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java index 9e2da248017d..fad5668945e7 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java +++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountAvro.java @@ -51,6 +51,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -103,10 +105,11 @@ public static void main(String[] args) { ParDo.of( new DoFn() { @ProcessElement - public void processElement(ProcessContext c) { - for (String word : c.element().split(TOKENIZER_PATTERN, 0)) { + public void processElement( + @Element String element, OutputReceiver receiver) { + for (String word : element.split(TOKENIZER_PATTERN, 0)) { if (!word.isEmpty()) { - c.output(word); + receiver.output(word); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java index 355614b43869..036cc6702123 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java +++ b/examples/java/src/main/java/org/apache/beam/examples/KafkaWordCountJson.java @@ -52,6 +52,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -104,10 +106,11 @@ public static void main(String[] args) { ParDo.of( new DoFn() { @ProcessElement - public void processElement(ProcessContext c) { - for (String word : c.element().split(TOKENIZER_PATTERN, 0)) { + public void processElement( + @Element String element, OutputReceiver receiver) { + for (String word : element.split(TOKENIZER_PATTERN, 0)) { if (!word.isEmpty()) { - c.output(word); + receiver.output(word); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java b/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java index 155834bc0a43..c9dc40170a57 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/KeysExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -70,9 +72,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java b/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java index 090779de7c36..192dc893795b 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/KvSwapExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.KvSwap; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -69,9 +71,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java b/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java index 5e9662a2f1ab..ef4c50d24c73 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/LatestExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Latest; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithTimestamps; @@ -84,9 +86,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java b/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java index 93d885750ae8..60908cf83a48 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MapElementsExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -76,9 +78,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java b/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java index 9173d11754a3..845b107a2214 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MaxExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; @@ -63,9 +65,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java index f5eda8179929..5ad156a343e1 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MaxPerKeyExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -67,9 +69,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java b/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java index a31907977dfb..fbb51636253a 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MeanExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Mean; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; @@ -63,9 +65,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java index aecccee067e4..97f598d4abb4 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MeanPerKeyExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Mean; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -67,9 +69,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinExample.java b/examples/java/src/main/java/org/apache/beam/examples/MinExample.java index a76bcdc5ee3f..db46f1323121 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; @@ -63,9 +65,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java index d3c0312feaf3..e8c3c51d3c23 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinPerKeyExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -67,9 +69,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("LogOutput: {} {}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("LogOutput: {} {}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java b/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java index b34f2bdd16bf..c202aedf76c0 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/PartitionExample.java @@ -32,6 +32,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Partition; import org.apache.beam.sdk.values.PCollection; @@ -192,9 +194,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java index 7bd35b101960..2534039227ac 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java @@ -31,6 +31,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; @@ -97,8 +99,8 @@ public void teardown() { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - String element = c.element(); + public void processElement(@Element String element, OutputReceiver receiver) + throws Exception { try { Preconditions.checkNotNull(rateLimiter).allow(1); } catch (Exception e) { @@ -108,7 +110,7 @@ public void processElement(ProcessContext c) throws Exception { // Simulate external API call LOG.info("Processing: {}", element); Thread.sleep(100); - c.output("Processed: " + element); + receiver.output("Processed: " + element); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java b/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java index a0d467718bed..d80bc9fedbaa 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/RegexExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Regex; import org.apache.beam.sdk.values.PCollection; @@ -75,9 +77,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java b/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java index ed1d90606d3b..8d9db0cf3a86 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/SampleExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sample; import org.apache.beam.sdk.values.KV; @@ -76,9 +78,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/SumExample.java b/examples/java/src/main/java/org/apache/beam/examples/SumExample.java index 00fcc8697926..a571d8dc5596 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/SumExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/SumExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.PCollection; @@ -63,9 +65,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java b/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java index 45d4a9ffd852..a2c92ef0c80e 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/SumPerKeyExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; @@ -67,9 +69,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java b/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java index 23e3db6cfd96..54e783231908 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/ToStringExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.values.KV; @@ -74,9 +76,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/TopExample.java b/examples/java/src/main/java/org/apache/beam/examples/TopExample.java index 520af0f66550..f602aa345569 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/TopExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/TopExample.java @@ -23,6 +23,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Top; import org.apache.beam.sdk.values.PCollection; @@ -64,9 +66,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java b/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java index 3fc9e84fcb39..1b9839ac03f0 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/ValuesExample.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.values.KV; @@ -70,9 +72,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java b/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java index 01c1b1c3f31d..d40d6cc333ef 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/ViewExample.java @@ -23,6 +23,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.SideInput; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.KV; @@ -83,10 +86,8 @@ public static void main(String[] args) { @ProcessElement public void processElement( @Element KV person, - OutputReceiver> out, - ProcessContext context) { - Map citiesToCountries = - context.sideInput(citiesToCountriesView); + @SideInput("citiesToCountries") Map citiesToCountries, + OutputReceiver> out) { String city = person.getValue(); String country = citiesToCountries.get(city); if (country == null) { @@ -95,7 +96,7 @@ public void processElement( out.output(KV.of(person.getKey(), country)); } }) - .withSideInputs(citiesToCountriesView)); + .withSideInput("citiesToCountries", citiesToCountriesView)); // [END main_section] output.apply("Log", ParDo.of(new LogOutput<>("Output: "))); @@ -112,9 +113,11 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement( + @Element KV element, OutputReceiver> receiver) + throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java b/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java index 244a42bc9613..d6a3dff2fa04 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowExample.java @@ -25,6 +25,8 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -87,9 +89,9 @@ public LogOutput(String prefix) { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) throws Exception { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index 11114043e830..d92aabfe23cb 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -52,6 +52,8 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; @@ -131,10 +133,11 @@ public PCollection>> expand(PCollection, CompletionCandidate>() { @ProcessElement - public void processElement(ProcessContext c) { - c.output( - new CompletionCandidate( - c.element().getKey(), c.element().getValue())); + public void processElement( + @Element KV element, + OutputReceiver receiver) { + receiver.output( + new CompletionCandidate(element.getKey(), element.getValue())); } })); @@ -210,9 +213,11 @@ public int partitionFor(KV> elem, int numParti private static class FlattenTops extends DoFn>, CompletionCandidate> { @ProcessElement - public void processElement(ProcessContext c) { - for (CompletionCandidate cc : c.element().getValue()) { - c.output(cc); + public void processElement( + @Element KV> element, + OutputReceiver receiver) { + for (CompletionCandidate cc : element.getValue()) { + receiver.output(cc); } } } @@ -267,10 +272,12 @@ public AllPrefixes(int minPrefix, int maxPrefix) { } @ProcessElement - public void processElement(ProcessContext c) { - String word = c.element().value; + public void processElement( + @Element CompletionCandidate element, + OutputReceiver> receiver) { + String word = element.value; for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { - c.output(KV.of(word.substring(0, i), c.element())); + receiver.output(KV.of(word.substring(0, i), element)); } } } @@ -332,23 +339,24 @@ public String toString() { /** Takes as input a set of strings, and emits each #hashtag found therein. */ static class ExtractHashtags extends DoFn { @ProcessElement - public void processElement(ProcessContext c) { - Matcher m = Pattern.compile("#\\S+").matcher(c.element()); + public void processElement(@Element String element, OutputReceiver receiver) { + Matcher m = Pattern.compile("#\\S+").matcher(element); while (m.find()) { - c.output(m.group().substring(1)); + receiver.output(m.group().substring(1)); } } } static class FormatForBigquery extends DoFn>, TableRow> { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + @Element KV> element, OutputReceiver receiver) { List completions = new ArrayList<>(); - for (CompletionCandidate cc : c.element().getValue()) { + for (CompletionCandidate cc : element.getValue()) { completions.add(new TableRow().set("count", cc.getCount()).set("tag", cc.getValue())); } - TableRow row = new TableRow().set("prefix", c.element().getKey()).set("tags", completions); - c.output(row); + TableRow row = new TableRow().set("prefix", element.getKey()).set("tags", completions); + receiver.output(row); } /** Defines the BigQuery schema used for the output. */ @@ -386,15 +394,16 @@ public FormatForDatastore(String kind, String ancestorKey) { } @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + @Element KV> element, OutputReceiver receiver) { Entity.Builder entityBuilder = Entity.newBuilder(); com.google.datastore.v1.Key key = - makeKey(makeKey(kind, ancestorKey).build(), kind, c.element().getKey()).build(); + makeKey(makeKey(kind, ancestorKey).build(), kind, element.getKey()).build(); entityBuilder.setKey(key); List candidates = new ArrayList<>(); Map properties = new HashMap<>(); - for (CompletionCandidate tag : c.element().getValue()) { + for (CompletionCandidate tag : element.getValue()) { Entity.Builder tagEntity = Entity.newBuilder(); properties.put("tag", makeValue(tag.value).build()); properties.put("count", makeValue(tag.count).build()); @@ -402,7 +411,7 @@ public void processElement(ProcessContext c) { } properties.put("candidates", makeValue(candidates).build()); entityBuilder.putAllProperties(properties); - c.output(entityBuilder.build()); + receiver.output(entityBuilder.build()); } } @@ -527,11 +536,13 @@ public static void runAutocompletePipeline(Options options) throws IOException { ParDo.of( new DoFn>, Long>() { @ProcessElement - public void process(ProcessContext c) { - KV> elm = c.element(); + public void process( + @Element KV> element, + OutputReceiver receiver) { + KV> elm = element; Long listHash = - c.element().getValue().stream().mapToLong(cc -> cc.hashCode()).sum(); - c.output(Long.valueOf(elm.getKey().hashCode()) + listHash); + element.getValue().stream().mapToLong(cc -> cc.hashCode()).sum(); + receiver.output(Long.valueOf(elm.getKey().hashCode()) + listHash); } })) .apply(Sum.longsGlobally()); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java index e4ce5e3eb17e..ea5f2e5399e7 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java @@ -34,6 +34,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; /** @@ -55,12 +57,12 @@ public class StreamingWordExtract { /** A {@link DoFn} that tokenizes lines of text into individual words. */ static class ExtractWords extends DoFn { @ProcessElement - public void processElement(ProcessContext c) { - String[] words = c.element().split(ExampleUtils.TOKENIZER_PATTERN, -1); + public void processElement(@Element String element, OutputReceiver receiver) { + String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1); for (String word : words) { if (!word.isEmpty()) { - c.output(word); + receiver.output(word); } } } @@ -69,8 +71,8 @@ public void processElement(ProcessContext c) { /** A {@link DoFn} that uppercases a word. */ static class Uppercase extends DoFn { @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().toUpperCase()); + public void processElement(@Element String element, OutputReceiver receiver) { + receiver.output(element.toUpperCase()); } } @@ -78,8 +80,8 @@ public void processElement(ProcessContext c) { static class StringToRowConverter extends DoFn { /** In this example, put the whole string into single BigQuery field. */ @ProcessElement - public void processElement(ProcessContext c) { - c.output(new TableRow().set("string_field", c.element())); + public void processElement(@Element String element, OutputReceiver receiver) { + receiver.output(new TableRow().set("string_field", element)); } static TableSchema getSchema() { diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index b3e5fd04fa7e..a61bd4c88651 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -56,6 +56,9 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Distinct; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.SideInput; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Keys; import org.apache.beam.sdk.transforms.PTransform; @@ -236,9 +239,11 @@ public PCollection>> expand( ParDo.of( new DoFn, KV>() { @ProcessElement - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - String line = c.element().getValue(); + public void processElement( + @Element KV element, + OutputReceiver> receiver) { + URI uri = element.getKey(); + String line = element.getValue(); for (String word : line.split("\\W+", -1)) { // Log INFO messages when the word “love” is found. if ("love".equalsIgnoreCase(word)) { @@ -246,7 +251,7 @@ public void processElement(ProcessContext c) { } if (!word.isEmpty()) { - c.output(KV.of(uri, word.toLowerCase())); + receiver.output(KV.of(uri, word.toLowerCase())); } } } @@ -281,11 +286,13 @@ public void processElement(ProcessContext c) { ParDo.of( new DoFn, Long>, KV>>() { @ProcessElement - public void processElement(ProcessContext c) { - URI uri = c.element().getKey().getKey(); - String word = c.element().getKey().getValue(); - Long occurrences = c.element().getValue(); - c.output(KV.of(uri, KV.of(word, occurrences))); + public void processElement( + @Element KV, Long> element, + OutputReceiver>> receiver) { + URI uri = element.getKey().getKey(); + String word = element.getKey().getValue(); + Long occurrences = element.getValue(); + receiver.output(KV.of(uri, KV.of(word, occurrences))); } })); @@ -322,16 +329,18 @@ public void processElement(ProcessContext c) { ParDo.of( new DoFn, KV>>() { @ProcessElement - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); + public void processElement( + @Element KV element, + OutputReceiver>> receiver) { + URI uri = element.getKey(); + Long wordTotal = element.getValue().getOnly(wordTotalsTag); for (KV wordAndCount : - c.element().getValue().getAll(wordCountsTag)) { + element.getValue().getAll(wordCountsTag)) { String word = wordAndCount.getKey(); Long wordCount = wordAndCount.getValue(); Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); - c.output(KV.of(word, KV.of(uri, termFrequency))); + receiver.output(KV.of(word, KV.of(uri, termFrequency))); } } })); @@ -348,17 +357,19 @@ public void processElement(ProcessContext c) { ParDo.of( new DoFn, KV>() { @ProcessElement - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Long documentCount = c.element().getValue(); - Long documentTotal = c.sideInput(totalDocuments); + public void processElement( + @SideInput("totalDocuments") Long documentTotal, + @Element KV element, + OutputReceiver> receiver) { + String word = element.getKey(); + Long documentCount = element.getValue(); Double documentFrequency = documentCount.doubleValue() / documentTotal.doubleValue(); - c.output(KV.of(word, documentFrequency)); + receiver.output(KV.of(word, documentFrequency)); } }) - .withSideInputs(totalDocuments)); + .withSideInput("totalDocuments", totalDocuments)); // Join the term frequency and document frequency // collections, each keyed on the word. @@ -380,15 +391,17 @@ public void processElement(ProcessContext c) { ParDo.of( new DoFn, KV>>() { @ProcessElement - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Double df = c.element().getValue().getOnly(dfTag); + public void processElement( + @Element KV element, + OutputReceiver>> receiver) { + String word = element.getKey(); + Double df = element.getValue().getOnly(dfTag); - for (KV uriAndTf : c.element().getValue().getAll(tfTag)) { + for (KV uriAndTf : element.getValue().getAll(tfTag)) { URI uri = uriAndTf.getKey(); Double tf = uriAndTf.getValue(); Double tfIdf = tf * Math.log(1 / df); - c.output(KV.of(word, KV.of(uri, tfIdf))); + receiver.output(KV.of(word, KV.of(uri, tfIdf))); } } })); @@ -419,13 +432,15 @@ public PDone expand(PCollection>> wordToUriAndTfIdf) ParDo.of( new DoFn>, String>() { @ProcessElement - public void processElement(ProcessContext c) { - c.output( + public void processElement( + @Element KV> element, + OutputReceiver receiver) { + receiver.output( String.format( "%s,\t%s,\t%f", - c.element().getKey(), - c.element().getValue().getKey(), - c.element().getValue().getValue())); + element.getKey(), + element.getValue().getKey(), + element.getValue().getValue())); } })) .apply(TextIO.write().to(output).withSuffix(".csv")); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index b06cd8da9d43..5d4668049c4c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -48,6 +48,8 @@ import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -93,8 +95,8 @@ public class TopWikipediaSessions { /** Extracts user and timestamp from a TableRow representing a Wikipedia edit. */ static class ExtractUserAndTimestamp extends DoFn { @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow element, OutputReceiver receiver) { + TableRow row = element; int timestamp; // TODO(BEAM-5390): Avoid this workaround. try { @@ -105,7 +107,7 @@ public void processElement(ProcessContext c) { String userName = (String) row.get("contributor_username"); if (userName != null) { // Sets the implicit timestamp field to be used in windowing. - c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); + receiver.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); } } } @@ -143,18 +145,24 @@ public PCollection>> expand(PCollection> static class SessionsToStringsDoFn extends DoFn, KV> { @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - c.output(KV.of(c.element().getKey() + " : " + window, c.element().getValue())); + public void processElement( + BoundedWindow window, + @Element KV element, + OutputReceiver> receiver) { + receiver.output(KV.of(element.getKey() + " : " + window, element.getValue())); } } static class FormatOutputDoFn extends DoFn>, String> { @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - for (KV item : c.element()) { + public void processElement( + BoundedWindow window, + @Element List> element, + OutputReceiver receiver) { + for (KV item : element) { String session = item.getKey(); long count = item.getValue(); - c.output(session + " : " + count + " : " + ((IntervalWindow) window).start()); + receiver.output(session + " : " + count + " : " + ((IntervalWindow) window).start()); } } } @@ -187,10 +195,11 @@ public PCollection expand(PCollection input) { ParDo.of( new DoFn() { @ProcessElement - public void processElement(ProcessContext c) { - if (Math.abs((long) c.element().hashCode()) + public void processElement( + @Element String element, OutputReceiver receiver) { + if (Math.abs((long) element.hashCode()) <= Integer.MAX_VALUE * samplingThreshold) { - c.output(c.element()); + receiver.output(element); } } })) diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java index 6f75e2e03d99..b22e3fed6698 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java @@ -58,6 +58,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -182,13 +185,14 @@ static class ExtractTimestamps extends DoFn { DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss"); @ProcessElement - public void processElement(DoFn.ProcessContext c) throws Exception { - String[] items = c.element().split(",", -1); + public void processElement(@Element String element, OutputReceiver receiver) + throws Exception { + String[] items = element.split(",", -1); if (items.length > 0) { try { String timestamp = items[0]; - c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp))); + receiver.outputWithTimestamp(element, new Instant(dateTimeFormat.parseMillis(timestamp))); } catch (IllegalArgumentException e) { // Skip the invalid input. } @@ -206,8 +210,9 @@ public void processElement(DoFn.ProcessContext c) throws Excepti static class ExtractFlowInfoFn extends DoFn> { @ProcessElement - public void processElement(ProcessContext c) { - String[] items = c.element().split(",", -1); + public void processElement( + @Element String element, OutputReceiver> receiver) { + String[] items = element.split(",", -1); if (items.length < 48) { // Skip the invalid input. return; @@ -236,7 +241,7 @@ public void processElement(ProcessContext c) { laneAvgOccupancy, laneAvgSpeed, totalFlow); - c.output(KV.of(stationId, laneInfo)); + receiver.output(KV.of(stationId, laneInfo)); } } } @@ -270,12 +275,15 @@ public LaneInfo apply(Iterable input) { */ static class FormatMaxesFn extends DoFn, TableRow> { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + @Element KV element, + @Timestamp Instant timestamp, + OutputReceiver receiver) { - LaneInfo laneInfo = c.element().getValue(); + LaneInfo laneInfo = element.getValue(); TableRow row = new TableRow() - .set("station_id", c.element().getKey()) + .set("station_id", element.getKey()) .set("direction", laneInfo.getDirection()) .set("freeway", laneInfo.getFreeway()) .set("lane_max_flow", laneInfo.getLaneFlow()) @@ -284,8 +292,8 @@ public void processElement(ProcessContext c) { .set("avg_speed", laneInfo.getLaneAS()) .set("total_flow", laneInfo.getTotalFlow()) .set("recorded_timestamp", laneInfo.getRecordedTimestamp()) - .set("window_timestamp", c.timestamp().toString()); - c.output(row); + .set("window_timestamp", timestamp.toString()); + receiver.output(row); } /** Defines the BigQuery schema used for the output. */ diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java index 958415626863..38ba4322f3e6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java @@ -63,6 +63,9 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -187,12 +190,13 @@ static class ExtractTimestamps extends DoFn { DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss"); @ProcessElement - public void processElement(DoFn.ProcessContext c) throws Exception { - String[] items = c.element().split(","); + public void processElement(@Element String element, OutputReceiver receiver) + throws Exception { + String[] items = element.split(","); String timestamp = tryParseTimestamp(items); if (timestamp != null) { try { - c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp))); + receiver.outputWithTimestamp(element, new Instant(dateTimeFormat.parseMillis(timestamp))); } catch (IllegalArgumentException e) { // Skip the invalid input. } @@ -207,8 +211,11 @@ public void processElement(DoFn.ProcessContext c) throws Excepti static class ExtractStationSpeedFn extends DoFn> { @ProcessElement - public void processElement(ProcessContext c) { - String[] items = c.element().split(","); + public void processElement( + @Timestamp Instant timestamp, + @Element String element, + OutputReceiver> receiver) { + String[] items = element.split(","); String stationType = tryParseStationType(items); // For this analysis, use only 'main line' station types if ("ML".equals(stationType)) { @@ -216,11 +223,10 @@ public void processElement(ProcessContext c) { String stationId = tryParseStationId(items); // For this simple example, filter out everything but some hardwired routes. if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) { - StationSpeed stationSpeed = - new StationSpeed(stationId, avgSpeed, c.timestamp().getMillis()); + StationSpeed stationSpeed = new StationSpeed(stationId, avgSpeed, timestamp.getMillis()); // The tuple key is the 'route' name stored in the 'sdStations' hash. KV outputValue = KV.of(sdStations.get(stationId), stationSpeed); - c.output(outputValue); + receiver.output(outputValue); } } } @@ -234,13 +240,16 @@ public void processElement(ProcessContext c) { */ static class GatherStats extends DoFn>, KV> { @ProcessElement - public void processElement(ProcessContext c) throws IOException { - String route = c.element().getKey(); + public void processElement( + @Element KV> element, + OutputReceiver> receiver) + throws IOException { + String route = element.getKey(); double speedSum = 0.0; int speedCount = 0; int speedups = 0; int slowdowns = 0; - List infoList = Lists.newArrayList(c.element().getValue()); + List infoList = Lists.newArrayList(element.getValue()); // StationSpeeds sort by embedded timestamp. Collections.sort(infoList); Map prevSpeeds = new HashMap<>(); @@ -268,22 +277,25 @@ public void processElement(ProcessContext c) throws IOException { double speedAvg = speedSum / speedCount; boolean slowdownEvent = slowdowns >= 2 * speedups; RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent); - c.output(KV.of(route, routeInfo)); + receiver.output(KV.of(route, routeInfo)); } } /** Format the results of the slowdown calculations to a TableRow, to save to BigQuery. */ static class FormatStatsFn extends DoFn, TableRow> { @ProcessElement - public void processElement(ProcessContext c) { - RouteInfo routeInfo = c.element().getValue(); + public void processElement( + @Element KV element, + @Timestamp Instant timestamp, + OutputReceiver receiver) { + RouteInfo routeInfo = element.getValue(); TableRow row = new TableRow() .set("avg_speed", routeInfo.getAvgSpeed()) .set("slowdown_event", routeInfo.getSlowdownEvent()) - .set("route", c.element().getKey()) - .set("window_timestamp", c.timestamp().toString()); - c.output(row); + .set("route", element.getKey()) + .set("window_timestamp", timestamp.toString()); + receiver.output(row); } /** Defines the BigQuery schema used for the output. */ diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java index cf097c8ea979..6e5321dc54ab 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java @@ -37,6 +37,9 @@ import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -199,20 +202,24 @@ public void close() { @ProcessElement @SuppressWarnings("argument") - public void process(@Element KV> element, ProcessContext context) { + public void process( + @Element KV> element, + OutputReceiver mainReceiver, + MultiOutputReceiver multiReceiver) { Iterable rows = element.getValue(); try { for (Row outputRow : getTokenizedRow(rows)) { - context.output(outputRow); + mainReceiver.output(outputRow); } } catch (Exception e) { for (Row outputRow : rows) { - context.output( - failureTag, - FailsafeElement.of(outputRow, outputRow) - .setErrorMessage(e.getMessage()) - .setStacktrace(Throwables.getStackTraceAsString(e))); + multiReceiver + .get(failureTag) + .output( + FailsafeElement.of(outputRow, outputRow) + .setErrorMessage(e.getMessage()) + .setStacktrace(Throwables.getStackTraceAsString(e))); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java index fe8f4c1afad8..4e17a72ec3ba 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigQueryIO.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -90,9 +92,9 @@ public static FailsafeElement wrapBigQueryInsertError( public static class RowToTableRowFn extends DoFn { @ProcessElement - public void processElement(ProcessContext context) { - Row row = context.element(); - context.output(BigQueryUtils.toTableRow(row)); + public void processElement(@Element Row element, OutputReceiver receiver) { + Row row = element; + receiver.output(BigQueryUtils.toTableRow(row)); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java index d7d1c3e97232..435620a4b011 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/TokenizationBigTableIO.java @@ -75,8 +75,10 @@ static class TransformToBigTableFormat extends DoFn>> out, ProcessContext c) { - DataTokenizationOptions options = c.getPipelineOptions().as(DataTokenizationOptions.class); + @Element Row in, + OutputReceiver>> out, + PipelineOptions pipelineOptions) { + DataTokenizationOptions options = pipelineOptions.as(DataTokenizationOptions.class); // Mapping every field in provided Row to Mutation.SetCell, which will create/update // cell content with provided data Set mutations = diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java index d827e4b30cb3..3d25bf78e937 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/CsvConverters.java @@ -42,6 +42,10 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.SideInput; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sample; @@ -264,8 +268,6 @@ public static Builder newBuilder() { @Override public PCollectionTuple expand(PCollectionTuple lines) { - PCollectionView headersView = null; - // Convert csv lines into Failsafe elements so that we can recover over multiple transforms. PCollection> lineFailsafeElements = lines @@ -285,16 +287,14 @@ public PCollectionTuple expand(PCollectionTuple lines) { return lineFailsafeElements.apply( "LineToDocumentUsingSchema", - ParDo.of( - new FailsafeElementToJsonFn( - headersView, schema, delimiter(), udfDeadletterTag())) + ParDo.of(new FailsafeElementToJsonFn(schema, delimiter(), udfDeadletterTag())) .withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag()))); } // Run if using headers - headersView = lines.get(headerTag()).apply(Sample.any(1)).apply(View.asSingleton()); + PCollectionView headersView = + lines.get(headerTag()).apply(Sample.any(1)).apply(View.asSingleton()); - PCollectionView finalHeadersView = headersView; lines .get(headerTag()) .apply( @@ -302,23 +302,24 @@ headersView, schema, delimiter(), udfDeadletterTag())) ParDo.of( new DoFn() { @ProcessElement - public void processElement(ProcessContext c) { - String headers = c.sideInput(finalHeadersView); - if (!c.element().equals(headers)) { + public void processElement( + @SideInput("finalHeadersView") String headers, + @Element String element, + OutputReceiver receiver) { + if (!element.equals(headers)) { LOG.error("Headers do not match, consistency cannot be guaranteed"); throw new RuntimeException( "Headers do not match, consistency cannot be guaranteed"); } + receiver.output(element); } }) - .withSideInputs(finalHeadersView)); + .withSideInput("finalHeadersView", headersView)); return lineFailsafeElements.apply( "LineToDocumentWithHeaders", - ParDo.of( - new FailsafeElementToJsonFn( - headersView, jsonSchemaPath(), delimiter(), udfDeadletterTag())) - .withSideInputs(headersView) + ParDo.of(new FailsafeElementToJsonFn(jsonSchemaPath(), delimiter(), udfDeadletterTag())) + .withSideInput("headersView", headersView) .withOutputTags(udfOutputTag(), TupleTagList.of(udfDeadletterTag()))); } @@ -348,8 +349,7 @@ public abstract Builder setUdfDeadletterTag( /** * The {@link FailsafeElementToJsonFn} class creates a Json string from a failsafe element. * - *

{@link FailsafeElementToJsonFn#FailsafeElementToJsonFn(PCollectionView, String, String, - * TupleTag)} + *

{@link FailsafeElementToJsonFn#FailsafeElementToJsonFn(String, String, TupleTag)} */ public static class FailsafeElementToJsonFn extends DoFn, FailsafeElement> { @@ -357,45 +357,46 @@ public static class FailsafeElementToJsonFn @Nullable public final String jsonSchema; public final String delimiter; public final TupleTag> udfDeadletterTag; - @Nullable private final PCollectionView headersView; private Counter successCounter = Metrics.counter(FailsafeElementToJsonFn.class, SUCCESSFUL_TO_JSON_COUNTER); private Counter failedCounter = Metrics.counter(FailsafeElementToJsonFn.class, FAILED_TO_JSON_COUNTER); FailsafeElementToJsonFn( - PCollectionView headersView, String jsonSchema, String delimiter, TupleTag> udfDeadletterTag) { - this.headersView = headersView; this.jsonSchema = jsonSchema; this.delimiter = delimiter; this.udfDeadletterTag = udfDeadletterTag; } @ProcessElement - public void processElement(ProcessContext context) { - FailsafeElement element = context.element(); + public void processElement( + @Element FailsafeElement element, + @SideInput("headersView") String headersStr, + OutputReceiver> receiver, + MultiOutputReceiver multiReceiver) { List header = null; - if (this.headersView != null) { - header = Arrays.asList(context.sideInput(this.headersView).split(this.delimiter)); + if (headersStr != null) { + header = Arrays.asList(headersStr.split(this.delimiter)); } List record = Arrays.asList(element.getOriginalPayload().split(this.delimiter)); try { String json = buildJsonString(header, record, this.jsonSchema); - context.output(FailsafeElement.of(element.getOriginalPayload(), json)); + receiver.output(FailsafeElement.of(element.getOriginalPayload(), json)); successCounter.inc(); } catch (Exception e) { failedCounter.inc(); - context.output( - this.udfDeadletterTag, - FailsafeElement.of(element) - .setErrorMessage(e.getMessage()) - .setStacktrace(Throwables.getStackTraceAsString(e))); + multiReceiver + .get(this.udfDeadletterTag) + .output( + FailsafeElement.of(element) + .setErrorMessage(e.getMessage()) + .setStacktrace(Throwables.getStackTraceAsString(e))); } } } @@ -407,9 +408,9 @@ public void processElement(ProcessContext context) { static class LineToFailsafeElementFn extends DoFn> { @ProcessElement - public void processElement(ProcessContext context) { - String message = context.element(); - context.output(FailsafeElement.of(message, message)); + public void processElement( + @Element String message, OutputReceiver> receiver) { + receiver.output(FailsafeElement.of(message, message)); } } @@ -510,8 +511,8 @@ static class GetCsvHeadersFn extends DoFn { } @ProcessElement - public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver) { - ReadableFile f = context.element(); + public void processElement(MultiOutputReceiver outputReceiver, @Element ReadableFile element) { + ReadableFile f = element; String headers; List records = null; String delimiter = String.valueOf(this.csvFormat.getDelimiter()); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java index 01377add0858..4c4307571ef0 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java @@ -29,6 +29,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -123,16 +126,17 @@ public FailedStringToCsvRowFn() { } @ProcessElement - public void processElement(ProcessContext context) { - FailsafeElement failsafeElement = context.element(); + public void processElement( + @Element FailsafeElement failsafeElement, + @Timestamp org.joda.time.Instant timestamp, + OutputReceiver receiver) { ArrayList outputRow = new ArrayList<>(); final String message = failsafeElement.getOriginalPayload(); // Format the timestamp for insertion - String timestamp = - TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC)); + String timestampStr = TIMESTAMP_FORMATTER.print(timestamp.toDateTime(DateTimeZone.UTC)); - outputRow.add(timestamp); + outputRow.add(timestampStr); outputRow.add(MoreObjects.firstNonNull(failsafeElement.getErrorMessage(), "")); // Only set the payload if it's populated on the message. @@ -140,7 +144,7 @@ public void processElement(ProcessContext context) { outputRow.add(message); } - context.output(String.join(csvDelimiter, outputRow)); + receiver.output(String.join(csvDelimiter, outputRow)); } } @@ -198,19 +202,21 @@ public static class FailedStringToTableRowFn DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); @ProcessElement - public void processElement(ProcessContext context) { - FailsafeElement failsafeElement = context.element(); + public void processElement( + @Timestamp org.joda.time.Instant timestamp, + @Element FailsafeElement element, + OutputReceiver receiver) { + FailsafeElement failsafeElement = element; final String message = failsafeElement.getOriginalPayload(); // Format the timestamp for insertion - String timestamp = - TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC)); + String timestampStr = TIMESTAMP_FORMATTER.print(timestamp.toDateTime(DateTimeZone.UTC)); // Build the table row @SuppressWarnings("nullness") // TableRow.set not annotated but does accept nulls final TableRow failedRow = new TableRow() - .set("timestamp", timestamp) + .set("timestamp", timestampStr) .set("errorMessage", failsafeElement.getErrorMessage()) .set("stacktrace", failsafeElement.getStacktrace()); @@ -221,7 +227,7 @@ public void processElement(ProcessContext context) { .set("payloadBytes", message.getBytes(StandardCharsets.UTF_8)); } - context.output(failedRow); + receiver.output(failedRow); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index a3ed04bb1c48..0018201f3686 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -34,6 +34,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.SideInput; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.Mean; import org.apache.beam.sdk.transforms.PTransform; @@ -126,21 +129,23 @@ public PCollection> expand(PCollection> Metrics.counter("main", "SpammerUsers"); @ProcessElement - public void processElement(ProcessContext c) { - Integer score = c.element().getValue(); - Double gmc = c.sideInput(globalMeanScore); + public void processElement( + @SideInput("globalMeanScore") Double gmc, + @Element KV element, + OutputReceiver> receiver) { + Integer score = element.getValue(); if (score > (gmc * SCORE_WEIGHT)) { LOG.info( "user {} spammer score {} with mean {}", - c.element().getKey(), + element.getKey(), score, gmc); numSpammerUsers.inc(); - c.output(c.element()); + receiver.output(element); } } }) - .withSideInputs(globalMeanScore)); + .withSideInput("globalMeanScore", globalMeanScore)); return filtered; } } @@ -149,10 +154,10 @@ public void processElement(ProcessContext c) { /** Calculate and output an element's session duration. */ private static class UserSessionInfoFn extends DoFn, Integer> { @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { + public void processElement(BoundedWindow window, OutputReceiver receiver) { IntervalWindow w = (IntervalWindow) window; int duration = new Duration(w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes(); - c.output(duration); + receiver.output(duration); } } @@ -192,22 +197,21 @@ public interface Options extends LeaderBoard.Options { configureWindowedWrite() { Map>> tableConfigure = new HashMap<>(); tableConfigure.put( - "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey())); + "team", new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> e.getKey())); tableConfigure.put( - "total_score", - new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); + "total_score", new WriteToBigQuery.FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue())); tableConfigure.put( "window_start", new WriteToBigQuery.FieldInfo<>( "STRING", - (c, w) -> { + (e, w, t, p) -> { IntervalWindow window = (IntervalWindow) w; return GameConstants.DATE_TIME_FORMATTER.print(window.start()); })); tableConfigure.put( "processing_time", new WriteToBigQuery.FieldInfo<>( - "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); + "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); return tableConfigure; } @@ -222,12 +226,12 @@ protected static Map> configureSession "window_start", new WriteToBigQuery.FieldInfo<>( "STRING", - (c, w) -> { + (e, w, t, p) -> { IntervalWindow window = (IntervalWindow) w; return GameConstants.DATE_TIME_FORMATTER.print(window.start()); })); tableConfigure.put( - "mean_duration", new WriteToBigQuery.FieldInfo<>("FLOAT", (c, w) -> c.element())); + "mean_duration", new WriteToBigQuery.FieldInfo<>("FLOAT", (e, w, t, p) -> e)); return tableConfigure; } @@ -288,14 +292,17 @@ public static void main(String[] args) throws Exception { ParDo.of( new DoFn() { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + @SideInput("spammersView") Map spammers, + @Element GameActionInfo element, + OutputReceiver receiver) { // If the user is not in the spammers Map, output the data element. - if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) { - c.output(c.element()); + if (spammers.get(element.getUser().trim()) == null) { + receiver.output(element); } } }) - .withSideInputs(spammersView)) + .withSideInput("spammersView", spammersView)) // Extract and sum teamname/score pairs from the event data. .apply("ExtractTeamScore", new ExtractAndSumScore("team")) // [END DocInclude_FilterAndCalc] diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java index c57d9ba6b8c8..a94f699282a8 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java @@ -112,11 +112,11 @@ public interface Options extends UserScore.Options { */ protected static Map>> configureOutput() { Map>> config = new HashMap<>(); - config.put("team", (c, w) -> c.element().getKey()); - config.put("total_score", (c, w) -> c.element().getValue()); + config.put("team", (e, w, t, p) -> e.getKey()); + config.put("total_score", (e, w, t, p) -> e.getValue()); config.put( "window_start", - (c, w) -> { + (e, w, t, p) -> { IntervalWindow window = (IntervalWindow) w; return GameConstants.DATE_TIME_FORMATTER.print(window.start()); }); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index 832c0ad79e76..3750b33aa791 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -135,25 +135,24 @@ public interface Options extends ExampleOptions, StreamingOptions { Map>> tableConfigure = new HashMap<>(); tableConfigure.put( - "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey())); + "team", new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> e.getKey())); tableConfigure.put( - "total_score", - new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); + "total_score", new WriteToBigQuery.FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue())); tableConfigure.put( "window_start", new WriteToBigQuery.FieldInfo<>( "STRING", - (c, w) -> { + (e, w, t, p) -> { IntervalWindow window = (IntervalWindow) w; return GameConstants.DATE_TIME_FORMATTER.print(window.start()); })); tableConfigure.put( "processing_time", new WriteToBigQuery.FieldInfo<>( - "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); + "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); tableConfigure.put( "timing", - new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.pane().getTiming().toString())); + new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> p.getTiming().toString())); return tableConfigure; } @@ -165,10 +164,9 @@ public interface Options extends ExampleOptions, StreamingOptions { configureBigQueryWrite() { Map>> tableConfigure = new HashMap<>(); tableConfigure.put( - "user", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey())); + "user", new WriteToBigQuery.FieldInfo<>("STRING", (e, w, t, p) -> e.getKey())); tableConfigure.put( - "total_score", - new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); + "total_score", new WriteToBigQuery.FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue())); return tableConfigure; } @@ -184,7 +182,7 @@ public interface Options extends ExampleOptions, StreamingOptions { tableConfigure.put( "processing_time", new WriteToBigQuery.FieldInfo<>( - "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); + "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); return tableConfigure; } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java index b28db261ab0e..3caa1e619526 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java @@ -37,6 +37,8 @@ import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -101,12 +103,12 @@ public interface Options extends LeaderBoard.Options { private static Map>> configureCompleteWindowedTableWrite() { Map>> tableConfigure = new HashMap<>(); - tableConfigure.put("team", new FieldInfo<>("STRING", (c, w) -> c.element().getKey())); - tableConfigure.put("total_score", new FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); + tableConfigure.put("team", new FieldInfo<>("STRING", (e, w, t, p) -> e.getKey())); + tableConfigure.put("total_score", new FieldInfo<>("INTEGER", (e, w, t, p) -> e.getValue())); tableConfigure.put( "processing_time", new FieldInfo<>( - "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); + "STRING", (e, w, t, p) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); return tableConfigure; } @@ -203,9 +205,11 @@ public UpdateTeamScoreFn(int thresholdScore) { */ @ProcessElement public void processElement( - ProcessContext c, @StateId(TOTAL_SCORE) ValueState totalScore) { - String teamName = c.element().getKey(); - GameActionInfo gInfo = c.element().getValue(); + @Element KV element, + @StateId(TOTAL_SCORE) ValueState totalScore, + OutputReceiver> receiver) { + String teamName = element.getKey(); + GameActionInfo gInfo = element.getValue(); // ValueState cells do not contain a default value. If the state is possibly not written, make // sure to check for null on read. @@ -218,7 +222,7 @@ public void processElement( // the new total is 2002, and the threshold is 1000, 1999 / 1000 = 1, 2002 / 1000 = 2. // Therefore, this team passed the threshold. if (oldTotalScore / this.thresholdScore < totalScore.read() / this.thresholdScore) { - c.output(KV.of(teamName, totalScore.read())); + receiver.output(KV.of(teamName, totalScore.read())); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index b30b4665d265..054ce7a52935 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -34,6 +34,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -161,18 +163,18 @@ static class ParseEventFn extends DoFn { private final Counter numParseErrors = Metrics.counter("main", "ParseErrors"); @ProcessElement - public void processElement(ProcessContext c) { - String[] components = c.element().split(",", -1); + public void processElement(@Element String element, OutputReceiver receiver) { + String[] components = element.split(",", -1); try { String user = components[0].trim(); String team = components[1].trim(); Integer score = Integer.parseInt(components[2].trim()); Long timestamp = Long.parseLong(components[3].trim()); GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp); - c.output(gInfo); + receiver.output(gInfo); } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) { numParseErrors.inc(); - LOG.info("Parse error on {}", c.element(), e); + LOG.info("Parse error on {}", element, e); } } } @@ -232,8 +234,8 @@ public interface Options extends PipelineOptions { */ protected static Map>> configureOutput() { Map>> config = new HashMap<>(); - config.put("user", (c, w) -> c.element().getKey()); - config.put("total_score", (c, w) -> c.element().getValue()); + config.put("user", (e, w, t, p) -> e.getKey()); + config.put("total_score", (e, w, t, p) -> e.getValue()); return config; } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java index eef4bc932682..a6438bedccad 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java @@ -30,6 +30,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -64,11 +67,15 @@ public WriteToBigQuery( } /** - * A {@link Serializable} function from a {@link DoFn.ProcessContext} and {@link BoundedWindow} to - * the value for that field. + * A {@link Serializable} function from an element and {@link BoundedWindow} to the value for that + * field. */ public interface FieldFn extends Serializable { - Object apply(DoFn.ProcessContext context, BoundedWindow window); + Object apply( + InputT element, + BoundedWindow window, + org.joda.time.Instant timestamp, + org.apache.beam.sdk.transforms.windowing.PaneInfo pane); } /** Define a class to hold information about output table field definitions. */ @@ -96,16 +103,21 @@ FieldFn getFieldFn() { protected class BuildRowFn extends DoFn { @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { + public void processElement( + @Element InputT element, + @Timestamp org.joda.time.Instant timestamp, + org.apache.beam.sdk.transforms.windowing.PaneInfo pane, + BoundedWindow window, + OutputReceiver receiver) { TableRow row = new TableRow(); for (Map.Entry> entry : fieldInfo.entrySet()) { String key = entry.getKey(); FieldInfo fcnInfo = entry.getValue(); FieldFn fcn = fcnInfo.getFieldFn(); - row.set(key, fcn.apply(c, window)); + row.set(key, fcn.apply(element, window, timestamp, pane)); } - c.output(row); + receiver.output(row); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java index 330769e0c79e..5dcb88eb2432 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java @@ -32,6 +32,9 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -70,26 +73,35 @@ public WriteToText( } /** - * A {@link Serializable} function from a {@link DoFn.ProcessContext} and {@link BoundedWindow} to - * the value for that field. + * A {@link Serializable} function from an element and {@link BoundedWindow} to the value for that + * field. */ public interface FieldFn extends Serializable { - Object apply(DoFn.ProcessContext context, BoundedWindow window); + Object apply( + InputT element, + BoundedWindow window, + org.joda.time.Instant timestamp, + org.apache.beam.sdk.transforms.windowing.PaneInfo pane); } /** Convert each key/score pair into a row as specified by fieldFn. */ protected class BuildRowFn extends DoFn { @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { + public void processElement( + @Element InputT element, + @Timestamp org.joda.time.Instant timestamp, + org.apache.beam.sdk.transforms.windowing.PaneInfo pane, + BoundedWindow window, + OutputReceiver receiver) { List fields = new ArrayList<>(); for (Map.Entry> entry : fieldFn.entrySet()) { String key = entry.getKey(); FieldFn fcn = entry.getValue(); - fields.add(key + ": " + fcn.apply(c, window)); + fields.add(key + ": " + fcn.apply(element, window, timestamp, pane)); } String result = fields.stream().collect(Collectors.joining(", ")); - c.output(result); + receiver.output(result); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java index 36fa18a34e0d..a99254c328b8 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java @@ -24,6 +24,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; @@ -44,15 +47,20 @@ public WriteWindowedToBigQuery( /** Convert each key/score pair into a BigQuery TableRow. */ protected class BuildRowFn extends DoFn { @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { + public void processElement( + @Element T element, + @Timestamp org.joda.time.Instant timestamp, + org.apache.beam.sdk.transforms.windowing.PaneInfo pane, + BoundedWindow window, + OutputReceiver receiver) { TableRow row = new TableRow(); for (Map.Entry> entry : fieldInfo.entrySet()) { String key = entry.getKey(); FieldInfo fcnInfo = entry.getValue(); - row.set(key, fcnInfo.getFieldFn().apply(c, window)); + row.set(key, fcnInfo.getFieldFn().apply(element, window, timestamp, pane)); } - c.output(row); + receiver.output(row); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java index 3b2653b7601e..4ea3902605b3 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java @@ -32,6 +32,9 @@ import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -85,10 +88,10 @@ public class BigQueryStreamingTornadoes { */ static class ExtractTornadoesFn extends DoFn { @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow element, OutputReceiver receiver) { + TableRow row = element; if (Boolean.TRUE.equals(row.get("tornado"))) { - c.output(Integer.parseInt((String) row.get("month"))); + receiver.output(Integer.parseInt((String) row.get("month"))); } } } @@ -99,13 +102,16 @@ public void processElement(ProcessContext c) { */ static class FormatCountsFn extends DoFn, TableRow> { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + @Element KV element, + @Timestamp Instant timestamp, + OutputReceiver receiver) { TableRow row = new TableRow() - .set("ts", c.timestamp().toString()) - .set("month", c.element().getKey()) - .set("tornado_count", c.element().getValue()); - c.output(row); + .set("ts", timestamp.toString()) + .set("month", element.getKey()) + .set("tornado_count", element.getValue()); + receiver.output(row); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java index 43d720e35268..e2e564169a9a 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java @@ -32,6 +32,8 @@ import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -79,10 +81,10 @@ public class BigQueryTornadoes { */ static class ExtractTornadoesFn extends DoFn { @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow element, OutputReceiver receiver) { + TableRow row = element; if ((Boolean) row.get("tornado")) { - c.output(Integer.parseInt((String) row.get("month"))); + receiver.output(Integer.parseInt((String) row.get("month"))); } } } @@ -93,12 +95,11 @@ public void processElement(ProcessContext c) { */ static class FormatCountsFn extends DoFn, TableRow> { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + @Element KV element, OutputReceiver receiver) { TableRow row = - new TableRow() - .set("month", c.element().getKey()) - .set("tornado_count", c.element().getValue()); - c.output(row); + new TableRow().set("month", element.getKey()).set("tornado_count", element.getValue()); + receiver.output(row); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java index 2a581d769dc7..5f5b5d06b1f8 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java @@ -33,6 +33,8 @@ import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -78,12 +80,11 @@ static class ExtractLargeWordsFn extends DoFn> { private final Counter smallerWords = Metrics.counter(ExtractLargeWordsFn.class, "smallerWords"); @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow row, OutputReceiver> receiver) { String playName = (String) row.get("corpus"); String word = (String) row.get("word"); if (word.length() >= MIN_WORD_LENGTH) { - c.output(KV.of(word, playName)); + receiver.output(KV.of(word, playName)); } else { // Track how many smaller words we're not including. This information will be // visible in the Monitoring UI. @@ -98,10 +99,11 @@ public void processElement(ProcessContext c) { */ static class FormatShakespeareOutputFn extends DoFn, TableRow> { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + @Element KV element, OutputReceiver receiver) { TableRow row = - new TableRow().set("word", c.element().getKey()).set("all_plays", c.element().getValue()); - c.output(row); + new TableRow().set("word", element.getKey()).set("all_plays", element.getValue()); + receiver.output(row); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java index 9187bb83d7da..e683ac0fa8c5 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java @@ -31,6 +31,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.SideInput; import org.apache.beam.sdk.transforms.Mean; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -87,8 +90,8 @@ public class FilterExamples { */ static class ProjectionFn extends DoFn { @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow element, OutputReceiver receiver) { + TableRow row = element; // Grab year, month, day, mean_temp from the row Integer year = Integer.parseInt((String) row.get("year")); Integer month = Integer.parseInt((String) row.get("month")); @@ -101,7 +104,7 @@ public void processElement(ProcessContext c) { .set("month", month) .set("day", day) .set("mean_temp", meanTemp); - c.output(outRow); + receiver.output(outRow); } } @@ -119,12 +122,12 @@ public FilterSingleMonthDataFn(Integer monthFilter) { } @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow element, OutputReceiver receiver) { + TableRow row = element; Integer month; month = (Integer) row.get("month"); if (month.equals(this.monthFilter)) { - c.output(row); + receiver.output(row); } } } @@ -135,10 +138,10 @@ public void processElement(ProcessContext c) { */ static class ExtractTempFn extends DoFn { @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow element, OutputReceiver receiver) { + TableRow row = element; Double meanTemp = Double.parseDouble(row.get("mean_temp").toString()); - c.output(meanTemp); + receiver.output(meanTemp); } } @@ -178,16 +181,17 @@ public PCollection expand(PCollection rows) { ParDo.of( new DoFn() { @ProcessElement - public void processElement(ProcessContext c) { - Double meanTemp = - Double.parseDouble(c.element().get("mean_temp").toString()); - Double gTemp = c.sideInput(globalMeanTemp); + public void processElement( + @SideInput("globalMeanTemp") Double gTemp, + @Element TableRow element, + OutputReceiver receiver) { + Double meanTemp = Double.parseDouble(element.get("mean_temp").toString()); if (meanTemp < gTemp) { - c.output(c.element()); + receiver.output(element); } } }) - .withSideInputs(globalMeanTemp)); + .withSideInput("globalMeanTemp", globalMeanTemp)); return filteredRows; } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java index f78df0c09461..e6f8573705a2 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -90,13 +92,14 @@ static PCollection joinEvents( ParDo.of( new DoFn, KV>() { @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); - String countryCode = e.getKey(); - String countryName = e.getValue().getOnly(countryInfoTag); - for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { + public void processElement( + @Element KV element, + OutputReceiver> receiver) { + String countryCode = element.getKey(); + String countryName = element.getValue().getOnly(countryInfoTag); + for (String eventInfo : element.getValue().getAll(eventInfoTag)) { // Generate a string that combines information from both collection values - c.output( + receiver.output( KV.of( countryCode, "Country name: " + countryName + ", Event info: " + eventInfo)); @@ -111,10 +114,11 @@ public void processElement(ProcessContext c) { ParDo.of( new DoFn, String>() { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + @Element KV element, OutputReceiver receiver) { String outputstring = - "Country code: " + c.element().getKey() + ", " + c.element().getValue(); - c.output(outputstring); + "Country code: " + element.getKey() + ", " + element.getValue(); + receiver.output(outputstring); } })); return formattedResults; @@ -126,14 +130,13 @@ public void processElement(ProcessContext c) { */ static class ExtractEventDataFn extends DoFn> { @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow row, OutputReceiver> receiver) { String countryCode = (String) row.get("ActionGeo_CountryCode"); String sqlDate = (String) row.get("SQLDATE"); String actor1Name = (String) row.get("Actor1Name"); String sourceUrl = (String) row.get("SOURCEURL"); String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl; - c.output(KV.of(countryCode, eventInfo)); + receiver.output(KV.of(countryCode, eventInfo)); } } @@ -143,11 +146,10 @@ public void processElement(ProcessContext c) { */ static class ExtractCountryInfoFn extends DoFn> { @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow row, OutputReceiver> receiver) { String countryCode = (String) row.get("FIPSCC"); String countryName = (String) row.get("HumanName"); - c.output(KV.of(countryCode, countryName)); + receiver.output(KV.of(countryCode, countryName)); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java index 8760d562d040..85df56a58258 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java @@ -30,6 +30,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -73,23 +75,22 @@ public class MaxPerKeyExamples { */ static class ExtractTempFn extends DoFn> { @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement( + @Element TableRow row, OutputReceiver> receiver) { Integer month = Integer.parseInt((String) row.get("month")); Double meanTemp = Double.parseDouble(row.get("mean_temp").toString()); - c.output(KV.of(month, meanTemp)); + receiver.output(KV.of(month, meanTemp)); } } /** Format the results to a TableRow, to save to BigQuery. */ static class FormatMaxesFn extends DoFn, TableRow> { @ProcessElement - public void processElement(ProcessContext c) { + public void processElement( + @Element KV element, OutputReceiver receiver) { TableRow row = - new TableRow() - .set("month", c.element().getKey()) - .set("max_mean_temp", c.element().getValue()); - c.output(row); + new TableRow().set("month", element.getKey()).set("max_mean_temp", element.getValue()); + receiver.output(row); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java index 713af1d50953..585337953732 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MinimalBigQueryTornadoes.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -73,10 +75,10 @@ public class MinimalBigQueryTornadoes { */ static class ExtractTornadoesFn extends DoFn { @ProcessElement - public void processElement(ProcessContext c) { - TableRow row = c.element(); + public void processElement(@Element TableRow element, OutputReceiver receiver) { + TableRow row = element; if ((Boolean) row.get("tornado")) { - c.output(Integer.parseInt((String) row.get("month"))); + receiver.output(Integer.parseInt((String) row.get("month"))); } } } @@ -87,8 +89,9 @@ public void processElement(ProcessContext c) { */ static class FormatCountsFn extends DoFn, String> { @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().getKey() + ": " + c.element().getValue()); + public void processElement( + @Element KV element, OutputReceiver receiver) { + receiver.output(element.getKey() + ": " + element.getValue()); } } @@ -134,9 +137,9 @@ static class LogOutput extends DoFn { } @ProcessElement - public void processElement(ProcessContext c) { - LOG.info("{}{}", prefix, c.element()); - c.output(c.element()); + public void processElement(@Element T element, OutputReceiver receiver) { + LOG.info("{}{}", prefix, element); + receiver.output(element); } } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index cd3c6dd84157..5270077ea037 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -38,6 +38,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -46,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; @@ -364,15 +368,18 @@ public PCollection expand(PCollection> flowInfo) { new DoFn>, KV>() { @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Iterable flows = c.element().getValue(); + public void processElement( + @Element KV> element, + OutputReceiver> receiver) + throws Exception { + Iterable flows = element.getValue(); Integer sum = 0; Long numberOfRecords = 0L; for (Integer value : flows) { sum += value; numberOfRecords++; } - c.output(KV.of(c.element().getKey(), sum + "," + numberOfRecords)); + receiver.output(KV.of(element.getKey(), sum + "," + numberOfRecords)); } })); PCollection output = results.apply(ParDo.of(new FormatTotalFlow(triggerType))); @@ -392,21 +399,27 @@ public FormatTotalFlow(String triggerType) { } @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - String[] values = c.element().getValue().split(",", -1); + public void processElement( + PaneInfo pane, + @Timestamp Instant timestamp, + BoundedWindow window, + @Element KV element, + OutputReceiver receiver) + throws Exception { + String[] values = element.getValue().split(",", -1); TableRow row = new TableRow() .set("trigger_type", triggerType) - .set("freeway", c.element().getKey()) + .set("freeway", element.getKey()) .set("total_flow", Integer.parseInt(values[0])) .set("number_of_records", Long.parseLong(values[1])) .set("window", window.toString()) - .set("isFirst", c.pane().isFirst()) - .set("isLast", c.pane().isLast()) - .set("timing", c.pane().getTiming().toString()) - .set("event_time", c.timestamp().toString()) + .set("isFirst", pane.isFirst()) + .set("isLast", pane.isLast()) + .set("timing", pane.getTiming().toString()) + .set("event_time", timestamp.toString()) .set("processing_time", Instant.now().toString()); - c.output(row); + receiver.output(row); } } @@ -418,8 +431,9 @@ static class ExtractFlowInfo extends DoFn> { private static final int VALID_NUM_FIELDS = 50; @ProcessElement - public void processElement(ProcessContext c) throws Exception { - String[] laneInfo = c.element().split(",", -1); + public void processElement( + @Element String element, OutputReceiver> receiver) throws Exception { + String[] laneInfo = element.split(",", -1); if ("timestamp".equals(laneInfo[0])) { // Header row return; @@ -435,7 +449,7 @@ public void processElement(ProcessContext c) throws Exception { if (totalFlow == null || totalFlow <= 0) { return; } - c.output(KV.of(freeway, totalFlow)); + receiver.output(KV.of(freeway, totalFlow)); } } @@ -509,7 +523,8 @@ public void setup() { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement(@Element String element, OutputReceiver receiver) + throws Exception { Instant timestamp = Instant.now(); if (random.nextDouble() < THRESHOLD) { int range = MAX_DELAY - MIN_DELAY; @@ -517,7 +532,7 @@ public void processElement(ProcessContext c) throws Exception { long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes); timestamp = new Instant(timestamp.getMillis() - delayInMillis); } - c.outputWithTimestamp(c.element(), timestamp); + receiver.outputWithTimestamp(element, timestamp); } } diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java index 4f24c69f74b7..d9ab2757dd18 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java @@ -76,6 +76,10 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.SideInput; +import org.apache.beam.sdk.transforms.DoFn.Timestamp; import org.apache.beam.sdk.transforms.Latest; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; @@ -540,14 +544,15 @@ public static PCollection coGroupByKeyTuple( ParDo.of( new DoFn, String>() { @ProcessElement - public void processElement(ProcessContext c) { - KV e = c.element(); + public void processElement( + @Element KV element, OutputReceiver receiver) { + KV e = element; String name = e.getKey(); Iterable emailsIter = e.getValue().getAll(emailsTag); Iterable phonesIter = e.getValue().getAll(phonesTag); String formattedResult = Snippets.formatCoGbkResults(name, emailsIter, phonesIter); - c.output(formattedResult); + receiver.output(formattedResult); } })); // [END CoGroupByKeyTuple] @@ -642,20 +647,23 @@ public void process( new DoFn>() { @ProcessElement - public void process(ProcessContext c, @Timestamp Instant timestamp) { - Iterable> si = c.sideInput(mapIterable); + public void process( + @Timestamp Instant timestamp, + @Element Long element, + @SideInput("mapIterable") Iterable> si, + OutputReceiver> receiver) { // Take an element from the side input iterable (likely length 1) Map keyMap = si.iterator().next(); - c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now()); + receiver.outputWithTimestamp(KV.of(1L, element), Instant.now()); LOG.info( "Value is {} with timestamp {}, using key A from side input with time {}.", - c.element(), + element, timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")), keyMap.get("Key_A")); } }) - .withSideInputs(mapIterable)); + .withSideInput("mapIterable", mapIterable)); p.run(); } @@ -701,9 +709,9 @@ public static void accessingValueProviderInfoAfterRunSnip1(String[] args) { // Define the DoFn that logs the ValueProvider value. @ProcessElement - public void process(ProcessContext c) { + public void process(PipelineOptions options) { - MyOptions ops = c.getPipelineOptions().as(MyOptions.class); + MyOptions ops = options.as(MyOptions.class); // This example logs the ValueProvider value, but you could store it by // pushing it to an external database. @@ -943,11 +951,13 @@ public void process(@Element String src, OutputReceiver o) { ParDo.of( new DoFn() { @ProcessElement - public void process(ProcessContext c) { - c.output((long) c.sideInput(sideInput).size()); + public void process( + @SideInput("sideInput") List sideInputValue, + OutputReceiver receiver) { + receiver.output((long) sideInputValue.size()); } }) - .withSideInputs(sideInput)); + .withSideInput("sideInput", sideInput)); // [END PeriodicallyUpdatingSideInputs] return result; } @@ -1188,7 +1198,10 @@ private static class BundleFinalization { private static class BundleFinalizationDoFn extends DoFn { // [START BundleFinalize] @ProcessElement - public void processElement(ProcessContext c, BundleFinalizer bundleFinalizer) { + public void processElement( + @Element String element, + OutputReceiver receiver, + BundleFinalizer bundleFinalizer) { // ... produce output ... bundleFinalizer.afterBundleCommit( diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java index 4aa20fc10dfb..b7fb7b82db31 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java @@ -28,6 +28,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.slf4j.Logger; @@ -84,11 +86,13 @@ public void setUp() throws Exception { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement( + @Element KV element, OutputReceiver> receiver) + throws Exception { try { // Our Library takes a single command in position 0 which it will echo back in the result SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs(); - Command command = new Command(0, String.valueOf(c.element().getValue())); + Command command = new Command(0, String.valueOf(element.getValue())); commands.putCommand(command); // The ProcessingKernel deals with the execution of the process @@ -97,7 +101,7 @@ public void processElement(ProcessContext c) throws Exception { // Run the command and work through the results List results = kernel.exec(commands); for (String s : results) { - c.output(KV.of(c.element().getKey(), s)); + receiver.output(KV.of(element.getKey(), s)); } } catch (Exception ex) { LOG.error("Error processing element ", ex); diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java index 19c83c6eb73c..0d9e257fc6c0 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java @@ -29,6 +29,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -146,8 +148,8 @@ static String canonicalFormat(TableRow row) { static class FormatResults extends DoFn { @ProcessElement - public void processElement(ProcessContext c) throws Exception { - TableRow element = c.element(); + public void processElement(@Element TableRow element, OutputReceiver receiver) + throws Exception { TableRow row = new TableRow() .set("trigger_type", element.get("trigger_type")) @@ -158,7 +160,7 @@ public void processElement(ProcessContext c) throws Exception { .set("isLast", element.get("isLast")) .set("timing", element.get("timing")) .set("window", element.get("window")); - c.output(canonicalFormat(row)); + receiver.output(canonicalFormat(row)); } } } diff --git a/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java index 01055d9658a9..9981b4040e3b 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java +++ b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java @@ -41,6 +41,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -144,11 +146,13 @@ public void setUp() throws Exception { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement( + @Element KV element, OutputReceiver> receiver) + throws Exception { try { // Our Library takes a single command in position 0 which it will echo back in the result SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs(); - Command command = new Command(0, String.valueOf(c.element().getValue())); + Command command = new Command(0, String.valueOf(element.getValue())); commands.putCommand(command); // The ProcessingKernel deals with the execution of the process @@ -157,7 +161,7 @@ public void processElement(ProcessContext c) throws Exception { // Run the command and work through the results List results = kernel.exec(commands); for (String s : results) { - c.output(KV.of(c.element().getKey(), s)); + receiver.output(KV.of(element.getKey(), s)); } } catch (Exception ex) { LOG.error("Error processing element ", ex); diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt index 05cb404ac6d2..7be4df2c8a91 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt @@ -25,6 +25,8 @@ import org.apache.beam.sdk.options.Description import org.apache.beam.sdk.options.PipelineOptionsFactory import org.apache.beam.sdk.testing.PAssert import org.apache.beam.sdk.transforms.DoFn +import org.apache.beam.sdk.transforms.DoFn.Element +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver import org.apache.beam.sdk.transforms.ParDo import org.apache.beam.sdk.values.KV import org.slf4j.LoggerFactory @@ -84,18 +86,18 @@ public object DebuggingWordCount { private val unmatchedWords = Metrics.counter(FilterTextFn::class.java, "unmatchedWords") @ProcessElement - fun processElement(c: ProcessContext) { - if (filter.matcher(c.element().key).matches()) { + fun processElement(@Element element: KV, receiver: OutputReceiver>) { + if (filter.matcher(element.key).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline // these log lines will appear only if the log level is set to "DEBUG" or lower. - LOG.debug("Matched: ${c.element().key}") + LOG.debug("Matched: ${element.key}") matchedWords.inc() - c.output(c.element()) + receiver.output(element) } else { // Log at the "TRACE" level each element that is not matched. Different log levels // can be used to control the verbosity of logging providing an effective mechanism // to filter less important information. - LOG.trace("Did not match: ${c.element().key}") + LOG.trace("Did not match: ${element.key}") unmatchedWords.inc() } } diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt index ec56bc659970..8edc98872350 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/BigQueryTornadoes.kt @@ -27,6 +27,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.WriteResult import org.apache.beam.sdk.options.* import org.apache.beam.sdk.transforms.Count import org.apache.beam.sdk.transforms.DoFn +import org.apache.beam.sdk.transforms.DoFn.Element +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver import org.apache.beam.sdk.transforms.PTransform import org.apache.beam.sdk.transforms.ParDo import org.apache.beam.sdk.values.KV @@ -73,10 +75,9 @@ object BigQueryTornadoes { */ internal class ExtractTornadoesFn : DoFn() { @ProcessElement - fun processElement(c: ProcessContext) { - val row = c.element() + fun processElement(@Element row: TableRow, receiver: OutputReceiver) { if (row["tornado"] as Boolean) { - c.output(Integer.parseInt(row["month"] as String)) + receiver.output(Integer.parseInt(row["month"] as String)) } } } @@ -87,11 +88,11 @@ object BigQueryTornadoes { */ internal class FormatCountsFn : DoFn, TableRow>() { @ProcessElement - fun processElement(c: ProcessContext) { + fun processElement(@Element element: KV, receiver: OutputReceiver) { val row = TableRow() - .set("month", c.element().key) - .set("tornado_count", c.element().value) - c.output(row) + .set("month", element.key) + .set("tornado_count", element.value) + receiver.output(row) } } diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt index 9e388031002d..4d89b692dd82 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/CombinePerKeyExamples.kt @@ -26,6 +26,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.WriteResult import org.apache.beam.sdk.metrics.Metrics import org.apache.beam.sdk.options.* import org.apache.beam.sdk.transforms.* +import org.apache.beam.sdk.transforms.DoFn.Element +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver import org.apache.beam.sdk.values.KV import org.apache.beam.sdk.values.PCollection @@ -70,12 +72,11 @@ object CombinePerKeyExamples { private val smallerWords = Metrics.counter(ExtractLargeWordsFn::class.java, "smallerWords") @ProcessElement - fun processElement(c: ProcessContext) { - val row = c.element() + fun processElement(@Element row: TableRow, receiver: OutputReceiver>) { val playName = row["corpus"] as String val word = row["word"] as String if (word.length >= MIN_WORD_LENGTH) { - c.output(KV.of(word, playName)) + receiver.output(KV.of(word, playName)) } else { // Track how many smaller words we're not including. This information will be // visible in the Monitoring UI. @@ -90,9 +91,9 @@ object CombinePerKeyExamples { */ internal class FormatShakespeareOutputFn : DoFn, TableRow>() { @ProcessElement - fun processElement(c: ProcessContext) { - val row = TableRow().set("word", c.element().key).set("all_plays", c.element().value) - c.output(row) + fun processElement(@Element element: KV, receiver: OutputReceiver) { + val row = TableRow().set("word", element.key).set("all_plays", element.value) + receiver.output(row) } } diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt index 2625f5bfec10..218188448965 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/FilterExamples.kt @@ -25,6 +25,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO import org.apache.beam.sdk.io.gcp.bigquery.WriteResult import org.apache.beam.sdk.options.* import org.apache.beam.sdk.transforms.* +import org.apache.beam.sdk.transforms.DoFn.Element +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver +import org.apache.beam.sdk.transforms.DoFn.SideInput import org.apache.beam.sdk.values.PCollection import java.util.logging.Logger @@ -80,8 +83,7 @@ object FilterExamples { */ internal class ProjectionFn : DoFn() { @ProcessElement - fun processElement(c: ProcessContext) { - val row = c.element() + fun processElement(@Element row: TableRow, receiver: OutputReceiver) { // Grab year, month, day, mean_temp from the row val year = Integer.parseInt(row["year"] as String) val month = Integer.parseInt(row["month"] as String) @@ -94,7 +96,7 @@ object FilterExamples { .set("month", month) .set("day", day) .set("mean_temp", meanTemp) - c.output(outRow) + receiver.output(outRow) } } @@ -108,11 +110,10 @@ object FilterExamples { internal class FilterSingleMonthDataFn(private var monthFilter: Int?) : DoFn() { @ProcessElement - fun processElement(c: ProcessContext) { - val row = c.element() + fun processElement(@Element row: TableRow, receiver: OutputReceiver) { val month = row["month"] if (month == this.monthFilter) { - c.output(row) + receiver.output(row) } } } @@ -123,10 +124,9 @@ object FilterExamples { */ internal class ExtractTempFn : DoFn() { @ProcessElement - fun processElement(c: ProcessContext) { - val row = c.element() + fun processElement(@Element row: TableRow, receiver: OutputReceiver) { val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString()) - c.output(meanTemp) + receiver.output(meanTemp) } } @@ -158,15 +158,17 @@ object FilterExamples { ParDo.of( object : DoFn() { @ProcessElement - fun processElement(c: ProcessContext) { - val meanTemp = java.lang.Double.parseDouble(c.element()["mean_temp"].toString()) - val gTemp = c.sideInput(globalMeanTemp) + fun processElement( + @SideInput("globalMeanTemp") gTemp: Double, + @Element element: TableRow, + receiver: OutputReceiver) { + val meanTemp = java.lang.Double.parseDouble(element["mean_temp"].toString()) if (meanTemp < gTemp) { - c.output(c.element()) + receiver.output(element) } } }) - .withSideInputs(globalMeanTemp)) + .withSideInput("globalMeanTemp", globalMeanTemp)) } } diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt index 2f2215e1d96a..7f81629ae463 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/JoinExamples.kt @@ -26,6 +26,8 @@ import org.apache.beam.sdk.options.PipelineOptions import org.apache.beam.sdk.options.PipelineOptionsFactory import org.apache.beam.sdk.options.Validation import org.apache.beam.sdk.transforms.DoFn +import org.apache.beam.sdk.transforms.DoFn.Element +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver import org.apache.beam.sdk.transforms.ParDo import org.apache.beam.sdk.transforms.join.CoGbkResult import org.apache.beam.sdk.transforms.join.CoGroupByKey @@ -89,13 +91,14 @@ object JoinExamples { ParDo.of( object : DoFn, KV>() { @ProcessElement - fun processElement(c: ProcessContext) { - val e = c.element() - val countryCode = e.key - val countryName = e.value.getOnly(countryInfoTag) - for (ei in c.element().value.getAll(eventInfoTag)) { + fun processElement( + @Element element: KV, + receiver: OutputReceiver>) { + val countryCode = element.key + val countryName = element.value.getOnly(countryInfoTag) + for (ei in element.value.getAll(eventInfoTag)) { // Generate a string that combines information from both collection values - c.output( + receiver.output( KV.of( countryCode, "Country name: $countryName, Event info: $ei")) @@ -109,9 +112,9 @@ object JoinExamples { ParDo.of( object : DoFn, String>() { @ProcessElement - fun processElement(c: ProcessContext) { - val outputString = "Country code: ${c.element().key}, ${c.element().value}" - c.output(outputString) + fun processElement(@Element element: KV, receiver: OutputReceiver) { + val outputString = "Country code: ${element.key}, ${element.value}" + receiver.output(outputString) } })) } @@ -122,14 +125,13 @@ object JoinExamples { */ internal class ExtractEventDataFn : DoFn>() { @ProcessElement - fun processElement(c: ProcessContext) { - val row = c.element() + fun processElement(@Element row: TableRow, receiver: OutputReceiver>) { val countryCode = row["ActionGeo_CountryCode"] as String val sqlDate = row["SQLDATE"] as String val actor1Name = row["Actor1Name"] as String val sourceUrl = row["SOURCEURL"] as String val eventInfo = "Date: $sqlDate, Actor1: $actor1Name, url: $sourceUrl" - c.output(KV.of(countryCode, eventInfo)) + receiver.output(KV.of(countryCode, eventInfo)) } } @@ -139,11 +141,10 @@ object JoinExamples { */ internal class ExtractCountryInfoFn : DoFn>() { @ProcessElement - fun processElement(c: ProcessContext) { - val row = c.element() + fun processElement(@Element row: TableRow, receiver: OutputReceiver>) { val countryCode = row["FIPSCC"] as String val countryName = row["HumanName"] as String - c.output(KV.of(countryCode, countryName)) + receiver.output(KV.of(countryCode, countryName)) } } diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt index 11418d3933cf..ad16ecde9657 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/MaxPerKeyExamples.kt @@ -26,6 +26,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO import org.apache.beam.sdk.io.gcp.bigquery.WriteResult import org.apache.beam.sdk.options.* import org.apache.beam.sdk.transforms.DoFn +import org.apache.beam.sdk.transforms.DoFn.Element +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver import org.apache.beam.sdk.transforms.Max import org.apache.beam.sdk.transforms.PTransform import org.apache.beam.sdk.transforms.ParDo @@ -73,22 +75,21 @@ object MaxPerKeyExamples { */ internal class ExtractTempFn : DoFn>() { @ProcessElement - fun processElement(c: ProcessContext) { - val row = c.element() + fun processElement(@Element row: TableRow, receiver: OutputReceiver>) { val month = Integer.parseInt(row["month"] as String) val meanTemp = java.lang.Double.parseDouble(row["mean_temp"].toString()) - c.output(KV.of(month, meanTemp)) + receiver.output(KV.of(month, meanTemp)) } } /** Format the results to a TableRow, to save to BigQuery. */ internal class FormatMaxesFn : DoFn, TableRow>() { @ProcessElement - fun processElement(c: ProcessContext) { + fun processElement(@Element element: KV, receiver: OutputReceiver) { val row = TableRow() - .set("month", c.element().key) - .set("max_mean_temp", c.element().value) - c.output(row) + .set("month", element.key) + .set("max_mean_temp", element.value) + receiver.output(row) } } diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt index 4afa7d0dfc70..bb8c0900e319 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook/TriggerExample.kt @@ -34,6 +34,9 @@ import org.apache.beam.sdk.options.Description import org.apache.beam.sdk.options.PipelineOptionsFactory import org.apache.beam.sdk.options.StreamingOptions import org.apache.beam.sdk.transforms.DoFn +import org.apache.beam.sdk.transforms.DoFn.Element +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver +import org.apache.beam.sdk.transforms.DoFn.Timestamp import org.apache.beam.sdk.transforms.GroupByKey import org.apache.beam.sdk.transforms.PTransform import org.apache.beam.sdk.transforms.ParDo @@ -364,15 +367,17 @@ object TriggerExample { @ProcessElement @Throws(Exception::class) - fun processElement(c: ProcessContext) { - val flows = c.element().value + fun processElement( + @Element element: KV>, + receiver: OutputReceiver>) { + val flows = element.value var sum = 0 var numberOfRecords = 0L for (value in flows) { sum += value numberOfRecords++ } - c.output(KV.of(c.element().key, "$sum,$numberOfRecords")) + receiver.output(KV.of(element.key, "$sum,$numberOfRecords")) } })) return results.apply(ParDo.of(FormatTotalFlow(triggerType))) @@ -387,20 +392,25 @@ object TriggerExample { @ProcessElement @Throws(Exception::class) - fun processElement(c: ProcessContext, window: BoundedWindow) { - val values = c.element().value.split(",".toRegex()).toTypedArray() + fun processElement( + @Element element: KV, + window: BoundedWindow, + pane: PaneInfo, + @Timestamp timestamp: Instant, + receiver: OutputReceiver) { + val values = element.value.split(",".toRegex()).toTypedArray() val row = TableRow() .set("trigger_type", triggerType) - .set("freeway", c.element().key) + .set("freeway", element.key) .set("total_flow", Integer.parseInt(values[0])) .set("number_of_records", java.lang.Long.parseLong(values[1])) .set("window", window.toString()) - .set("isFirst", c.pane().isFirst) - .set("isLast", c.pane().isLast) - .set("timing", c.pane().timing.toString()) - .set("event_time", c.timestamp().toString()) + .set("isFirst", pane.isFirst) + .set("isLast", pane.isLast) + .set("timing", pane.timing.toString()) + .set("event_time", timestamp.toString()) .set("processing_time", Instant.now().toString()) - c.output(row) + receiver.output(row) } } @@ -412,8 +422,8 @@ object TriggerExample { @ProcessElement @Throws(Exception::class) - fun processElement(c: ProcessContext) { - val laneInfo = c.element().split(",".toRegex()).toTypedArray() + fun processElement(@Element element: String, receiver: OutputReceiver>) { + val laneInfo = element.split(",".toRegex()).toTypedArray() if ("timestamp" == laneInfo[0]) { // Header row return @@ -429,7 +439,7 @@ object TriggerExample { if (totalFlow == null || totalFlow <= 0) { return } - c.output(KV.of(freeway, totalFlow)) + receiver.output(KV.of(freeway, totalFlow)) } companion object { @@ -486,7 +496,7 @@ object TriggerExample { @ProcessElement @Throws(Exception::class) - fun processElement(c: ProcessContext) { + fun processElement(@Element element: String, receiver: OutputReceiver) { var timestamp = Instant.now() if (random.nextDouble() < THRESHOLD) { val range = MAX_DELAY - MIN_DELAY @@ -494,7 +504,7 @@ object TriggerExample { val delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes.toLong()) timestamp = Instant(timestamp.millis - delayInMillis) } - c.outputWithTimestamp(c.element(), timestamp) + receiver.outputWithTimestamp(element, timestamp) } companion object { diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt index d2f58c215a56..eb2099372482 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/snippets/Snippets.kt @@ -33,6 +33,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations import org.apache.beam.sdk.io.gcp.bigquery.TableDestination import org.apache.beam.sdk.io.gcp.bigquery.WriteResult import org.apache.beam.sdk.transforms.* +import org.apache.beam.sdk.transforms.DoFn.Element +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver import org.apache.beam.sdk.transforms.join.CoGbkResult import org.apache.beam.sdk.transforms.join.CoGroupByKey import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple @@ -360,13 +362,12 @@ object Snippets { ParDo.of( object : DoFn, String>() { @ProcessElement - fun processElement(c: ProcessContext) { - val e = c.element() - val name = e.key - val emailsIter = e.value.getAll(emailsTag) - val phonesIter = e.value.getAll(phonesTag) + fun processElement(@Element element: KV, receiver: OutputReceiver) { + val name = element.key + val emailsIter = element.value.getAll(emailsTag) + val phonesIter = element.value.getAll(phonesTag) val formattedResult = formatCoGbkResults(name, emailsIter, phonesIter) - c.output(formattedResult) + receiver.output(formattedResult) } })) }