-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness #28326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2465,6 +2465,115 @@ void testStateMutation() throws Exception { | |||||||||||||||||||
| {{< /tab >}} | ||||||||||||||||||||
| {{< /tabs >}} | ||||||||||||||||||||
|
|
||||||||||||||||||||
| #### Testing with Timers and Context | ||||||||||||||||||||
|
|
||||||||||||||||||||
| The harness supports the `Context` parameter, timer registration via `TimeContext`, and `onTimer` | ||||||||||||||||||||
| callbacks. Use `.withOnTimeColumn()` to configure the event time column and `.setWatermark()` to | ||||||||||||||||||||
| advance watermarks and fire eligible timers. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| {{< tabs "timer-testing" >}} | ||||||||||||||||||||
| {{< tab "Java" >}} | ||||||||||||||||||||
| ```java | ||||||||||||||||||||
| // A PTF that registers a named timer 5 seconds after each event, and emits when it fires. | ||||||||||||||||||||
| @DataTypeHint("ROW<message STRING>") | ||||||||||||||||||||
| public class TimerPTF extends ProcessTableFunction<Row> { | ||||||||||||||||||||
| public void eval( | ||||||||||||||||||||
| Context ctx, | ||||||||||||||||||||
| @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME}) | ||||||||||||||||||||
| Row input) { | ||||||||||||||||||||
| String name = input.getFieldAs("name"); | ||||||||||||||||||||
| TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class); | ||||||||||||||||||||
| timeCtx.registerOnTime("timeout-" + name, timeCtx.time().plus(Duration.ofSeconds(5))); | ||||||||||||||||||||
| collect(Row.of("registered-" + name)); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| public void onTimer(OnTimerContext ctx) { | ||||||||||||||||||||
| collect(Row.of("timer-fired-" + ctx.currentTimer())); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| @Test | ||||||||||||||||||||
| void testTimerRegistrationAndFiring() throws Exception { | ||||||||||||||||||||
| try (ProcessTableFunctionTestHarness<Row> harness = | ||||||||||||||||||||
| ProcessTableFunctionTestHarness.ofClass(TimerPTF.class) | ||||||||||||||||||||
| .withTableArgument("input", | ||||||||||||||||||||
| DataTypes.of("ROW<partition STRING, name STRING, ts TIMESTAMP(3)>")) | ||||||||||||||||||||
| .withPartitionBy("input", "partition") | ||||||||||||||||||||
| .withOnTimeColumn("ts") | ||||||||||||||||||||
| .build()) { | ||||||||||||||||||||
|
|
||||||||||||||||||||
| harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 1))); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Verify the timer was registered | ||||||||||||||||||||
| assertThat(harness.getPendingTimers()).hasSize(1); | ||||||||||||||||||||
| assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice"); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Advance watermark past the timer's timestamp to fire it | ||||||||||||||||||||
| harness.clearOutput(); | ||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the output should still be empty, no? |
||||||||||||||||||||
| harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7)); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| assertThat(harness.getOutput()) | ||||||||||||||||||||
| .containsExactly( | ||||||||||||||||||||
| Row.of("P1", "timer-fired-timeout-Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 6))); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| assertThat(harness.getPendingTimers()).isEmpty(); | ||||||||||||||||||||
| assertThat(harness.getFiredTimers()).hasSize(1); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| ``` | ||||||||||||||||||||
| {{< /tab >}} | ||||||||||||||||||||
| {{< /tabs >}} | ||||||||||||||||||||
|
|
||||||||||||||||||||
| **Timers with State**: State persisted during `eval()` is accessible in `onTimer()`: | ||||||||||||||||||||
|
|
||||||||||||||||||||
| {{< tabs "timer-state-testing" >}} | ||||||||||||||||||||
| {{< tab "Java" >}} | ||||||||||||||||||||
| ```java | ||||||||||||||||||||
| @DataTypeHint("ROW<message STRING>") | ||||||||||||||||||||
| public class TimerWithStatePTF extends ProcessTableFunction<Row> { | ||||||||||||||||||||
| public static class CounterState { | ||||||||||||||||||||
| public long count = 0L; | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| public void eval( | ||||||||||||||||||||
| Context ctx, | ||||||||||||||||||||
| @StateHint CounterState state, | ||||||||||||||||||||
| @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME}) | ||||||||||||||||||||
| Row input) { | ||||||||||||||||||||
| state.count++; | ||||||||||||||||||||
| TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class); | ||||||||||||||||||||
| timeCtx.registerOnTime("check", timeCtx.time().plus(Duration.ofSeconds(5))); | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| public void onTimer(OnTimerContext ctx, @StateHint CounterState state) { | ||||||||||||||||||||
| collect(Row.of("count=" + state.count)); | ||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| @Test | ||||||||||||||||||||
| void testTimerWithState() throws Exception { | ||||||||||||||||||||
| try (ProcessTableFunctionTestHarness<Row> harness = | ||||||||||||||||||||
| ProcessTableFunctionTestHarness.ofClass(TimerWithStatePTF.class) | ||||||||||||||||||||
| .withTableArgument("input", | ||||||||||||||||||||
| DataTypes.of("ROW<partition STRING, ts TIMESTAMP(3)>")) | ||||||||||||||||||||
| .withPartitionBy("input", "partition") | ||||||||||||||||||||
| .withOnTimeColumn("ts") | ||||||||||||||||||||
| .build()) { | ||||||||||||||||||||
|
|
||||||||||||||||||||
| harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1))); | ||||||||||||||||||||
| harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1))); | ||||||||||||||||||||
| harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1))); | ||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
show that two timers can be fired from one wm advancement? |
||||||||||||||||||||
|
|
||||||||||||||||||||
| harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7)); | ||||||||||||||||||||
| assertThat(harness.getOutput()) | ||||||||||||||||||||
| .containsExactly( | ||||||||||||||||||||
| Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 6))); | ||||||||||||||||||||
|
Comment on lines
+2567
to
+2570
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
| } | ||||||||||||||||||||
| } | ||||||||||||||||||||
| ``` | ||||||||||||||||||||
| {{< /tab >}} | ||||||||||||||||||||
| {{< /tabs >}} | ||||||||||||||||||||
|
|
||||||||||||||||||||
| #### Optional Partitioning | ||||||||||||||||||||
|
|
||||||||||||||||||||
| For PTFs with `OPTIONAL_PARTITION_BY`, you can omit `withPartitionBy()` during harness setup. The | ||||||||||||||||||||
|
|
@@ -2582,8 +2691,5 @@ void testPOJO() throws Exception { | |||||||||||||||||||
|
|
||||||||||||||||||||
| ### PTF Features Unsupported by the TestHarness | ||||||||||||||||||||
|
|
||||||||||||||||||||
| - `Context` parameter | ||||||||||||||||||||
| - Timers (`onTimer`) | ||||||||||||||||||||
| - `on_time` / `rowtime` | ||||||||||||||||||||
|
Comment on lines
-2585
to
-2587
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🙌 |
||||||||||||||||||||
| - Update traits (`SUPPORTS_UPDATES`, `REQUIRE_UPDATE_BEFORE`) | ||||||||||||||||||||
| - State TTL (state is supported but TTL expiration is not yet implemented) | ||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.flink.table.runtime.functions; | ||
|
|
||
| import org.apache.flink.types.Row; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| /** Captures the per-invocation state for an eval() or onTimer() call in the test harness. */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. expand the comment a bit and add that |
||
| class InvocationContext { | ||
| final Row partitionKey; | ||
| @Nullable final Row row; | ||
| @Nullable final String tableArgumentName; | ||
| @Nullable final Timer firingTimer; | ||
|
|
||
| private InvocationContext( | ||
| Row partitionKey, | ||
| @Nullable Row row, | ||
| @Nullable String tableArgumentName, | ||
| @Nullable Timer firingTimer) { | ||
| this.partitionKey = partitionKey; | ||
| this.row = row; | ||
| this.tableArgumentName = tableArgumentName; | ||
| this.firingTimer = firingTimer; | ||
| } | ||
|
|
||
| static InvocationContext forEval(Row partitionKey, Row row, String tableArgumentName) { | ||
| return new InvocationContext(partitionKey, row, tableArgumentName, null); | ||
| } | ||
|
|
||
| static InvocationContext forTimer(Timer timer) { | ||
| return new InvocationContext(timer.partitionKey, null, null, timer); | ||
| } | ||
|
|
||
| boolean isTimerInvocation() { | ||
| return firingTimer != null; | ||
| } | ||
|
|
||
| boolean isEvalInvocation() { | ||
| return tableArgumentName != null; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to assert the timestamp of a timer as well?