Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 109 additions & 3 deletions docs/content.zh/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2465,6 +2465,115 @@ void testStateMutation() throws Exception {
{{< /tab >}}
{{< /tabs >}}

#### Testing with Timers and Context

The harness supports the `Context` parameter, timer registration via `TimeContext`, and `onTimer`
callbacks. Use `.withOnTimeColumn()` to configure the event time column and `.setWatermark()` to
advance watermarks and fire eligible timers.

{{< tabs "timer-testing" >}}
{{< tab "Java" >}}
```java
// A PTF that registers a named timer 5 seconds after each event, and emits when it fires.
@DataTypeHint("ROW<message STRING>")
public class TimerPTF extends ProcessTableFunction<Row> {
public void eval(
Context ctx,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
String name = input.getFieldAs("name");
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
timeCtx.registerOnTime("timeout-" + name, timeCtx.time().plus(Duration.ofSeconds(5)));
collect(Row.of("registered-" + name));
}

public void onTimer(OnTimerContext ctx) {
collect(Row.of("timer-fired-" + ctx.currentTimer()));
}
}

@Test
void testTimerRegistrationAndFiring() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(TimerPTF.class)
.withTableArgument("input",
DataTypes.of("ROW<partition STRING, name STRING, ts TIMESTAMP(3)>"))
.withPartitionBy("input", "partition")
.withOnTimeColumn("ts")
.build()) {

harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

// Verify the timer was registered
assertThat(harness.getPendingTimers()).hasSize(1);
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to assert the timestamp of a timer as well?


// Advance watermark past the timer's timestamp to fire it
harness.clearOutput();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output should still be empty, no?

harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));

assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "timer-fired-timeout-Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));

assertThat(harness.getPendingTimers()).isEmpty();
assertThat(harness.getFiredTimers()).hasSize(1);
}
}
```
{{< /tab >}}
{{< /tabs >}}

**Timers with State**: State persisted during `eval()` is accessible in `onTimer()`:

{{< tabs "timer-state-testing" >}}
{{< tab "Java" >}}
```java
@DataTypeHint("ROW<message STRING>")
public class TimerWithStatePTF extends ProcessTableFunction<Row> {
public static class CounterState {
public long count = 0L;
}

public void eval(
Context ctx,
@StateHint CounterState state,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
state.count++;
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
timeCtx.registerOnTime("check", timeCtx.time().plus(Duration.ofSeconds(5)));
}

public void onTimer(OnTimerContext ctx, @StateHint CounterState state) {
collect(Row.of("count=" + state.count));
}
}

@Test
void testTimerWithState() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(TimerWithStatePTF.class)
.withTableArgument("input",
DataTypes.of("ROW<partition STRING, ts TIMESTAMP(3)>"))
.withPartitionBy("input", "partition")
.withOnTimeColumn("ts")
.build()) {

harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 2)));

show that two timers can be fired from one wm advancement?


harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));
Comment on lines +2567 to +2570

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));
harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 8));
assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "count=2", LocalDateTime.of(2025, 1, 1, 0, 0, 6)),
Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 7)),);

}
}
```
{{< /tab >}}
{{< /tabs >}}

#### Optional Partitioning

For PTFs with `OPTIONAL_PARTITION_BY`, you can omit `withPartitionBy()` during harness setup. The
Expand Down Expand Up @@ -2582,8 +2691,5 @@ void testPOJO() throws Exception {

### PTF Features Unsupported by the TestHarness

- `Context` parameter
- Timers (`onTimer`)
- `on_time` / `rowtime`
Comment on lines -2585 to -2587

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙌

- Update traits (`SUPPORTS_UPDATES`, `REQUIRE_UPDATE_BEFORE`)
- State TTL (state is supported but TTL expiration is not yet implemented)
112 changes: 109 additions & 3 deletions docs/content/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2468,6 +2468,115 @@ void testStateMutation() throws Exception {
{{< /tab >}}
{{< /tabs >}}

#### Testing with Timers and Context

The harness supports the `Context` parameter, timer registration via `TimeContext`, and `onTimer`
callbacks. Use `.withOnTimeColumn()` to configure the event time column and `.setWatermark()` to
advance watermarks and fire eligible timers.

{{< tabs "timer-testing" >}}
{{< tab "Java" >}}
```java
// A PTF that registers a named timer 5 seconds after each event, and emits when it fires.
@DataTypeHint("ROW<message STRING>")
public class TimerPTF extends ProcessTableFunction<Row> {
public void eval(
Context ctx,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
String name = input.getFieldAs("name");
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
timeCtx.registerOnTime("timeout-" + name, timeCtx.time().plus(Duration.ofSeconds(5)));
collect(Row.of("registered-" + name));
}

public void onTimer(OnTimerContext ctx) {
collect(Row.of("timer-fired-" + ctx.currentTimer()));
}
}

@Test
void testTimerRegistrationAndFiring() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(TimerPTF.class)
.withTableArgument("input",
DataTypes.of("ROW<partition STRING, name STRING, ts TIMESTAMP(3)>"))
.withPartitionBy("input", "partition")
.withOnTimeColumn("ts")
.build()) {

harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

// Verify the timer was registered
assertThat(harness.getPendingTimers()).hasSize(1);
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");

// Advance watermark past the timer's timestamp to fire it
harness.clearOutput();
harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));

assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "timer-fired-timeout-Alice", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));

assertThat(harness.getPendingTimers()).isEmpty();
assertThat(harness.getFiredTimers()).hasSize(1);
}
}
```
{{< /tab >}}
{{< /tabs >}}

**Timers with State**: State persisted during `eval()` is accessible in `onTimer()`:

{{< tabs "timer-state-testing" >}}
{{< tab "Java" >}}
```java
@DataTypeHint("ROW<message STRING>")
public class TimerWithStatePTF extends ProcessTableFunction<Row> {
public static class CounterState {
public long count = 0L;
}

public void eval(
Context ctx,
@StateHint CounterState state,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.REQUIRE_ON_TIME})
Row input) {
state.count++;
TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
timeCtx.registerOnTime("check", timeCtx.time().plus(Duration.ofSeconds(5)));
}

public void onTimer(OnTimerContext ctx, @StateHint CounterState state) {
collect(Row.of("count=" + state.count));
}
}

@Test
void testTimerWithState() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(TimerWithStatePTF.class)
.withTableArgument("input",
DataTypes.of("ROW<partition STRING, ts TIMESTAMP(3)>"))
.withPartitionBy("input", "partition")
.withOnTimeColumn("ts")
.build()) {

harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));
}
}
```
{{< /tab >}}
{{< /tabs >}}

#### Optional Partitioning

For PTFs with `OPTIONAL_PARTITION_BY`, you can omit `withPartitionBy()` during harness setup. The
Expand Down Expand Up @@ -2585,8 +2694,5 @@ void testPOJO() throws Exception {

### PTF Features Unsupported by the TestHarness

- `Context` parameter
- Timers (`onTimer`)
- `on_time` / `rowtime`
- Update traits (`SUPPORTS_UPDATES`, `REQUIRE_UPDATE_BEFORE`)
- State TTL (state is supported but TTL expiration is not yet implemented)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.runtime.functions;

import org.apache.flink.types.Row;

import javax.annotation.Nullable;

/** Captures the per-invocation state for an eval() or onTimer() call in the test harness. */

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expand the comment a bit and add that partitionKey is always set and for eval() row and tableArgumentName are set and for onTimer() firingTimer?

class InvocationContext {
final Row partitionKey;
@Nullable final Row row;
@Nullable final String tableArgumentName;
@Nullable final Timer firingTimer;

private InvocationContext(
Row partitionKey,
@Nullable Row row,
@Nullable String tableArgumentName,
@Nullable Timer firingTimer) {
this.partitionKey = partitionKey;
this.row = row;
this.tableArgumentName = tableArgumentName;
this.firingTimer = firingTimer;
}

static InvocationContext forEval(Row partitionKey, Row row, String tableArgumentName) {
return new InvocationContext(partitionKey, row, tableArgumentName, null);
}

static InvocationContext forTimer(Timer timer) {
return new InvocationContext(timer.partitionKey, null, null, timer);
}

boolean isTimerInvocation() {
return firingTimer != null;
}

boolean isEvalInvocation() {
return tableArgumentName != null;
}
}
Loading