Skip to content

[FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness#28326

Open
autophagy wants to merge 1 commit into
apache:masterfrom
autophagy:flink-39378-timers-2
Open

[FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness#28326
autophagy wants to merge 1 commit into
apache:masterfrom
autophagy:flink-39378-timers-2

Conversation

@autophagy

@autophagy autophagy commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

What is the purpose of the change

This PR adds support for Context, OnTimerContext, TimeContext, timer registration and firing, watermark management, and rowtime support to the PTF Test Harness.

When a PTF registers a timer, the timer is stored in the TimerManager and keyed by the partition row. When the user advances the watermark using
either setWatermark or setWatermarkForTable, the manager then fires all pending timers below or equal to the watermark in a deterministic order.

I moved per-invocation state stuff into an InvocationContext to make separation of concerns a little easier to follow.

Rows emitted from test harness setups that configure an on_time column also contain the rowtime column, and the PTF rejects at the point of registration if the user tries to register a timer with PASS_COLUMNS_THROUGH
enabled (similar to live, per the PROCESS_INVALID_PASS_THROUGH_TIMERS test on live.)

Some open questions remain. One is that there's an edge case where if a user's onTimer registers a new timer at or before the current watermark, it then fires, and then registers,
then fires, etc. Should there be some sort of max depth check here?

The second is that when defining the on time parameter in SQL, you pass in DESCRIPTOR(ts), but in the harness you just pass in the string name of the column. Would it instead be better to support withOnTime("DESCRIPTOR(ts)") to better mirror SQL, rather than withOnTimeColumn("ts")?

Brief change log

  • Added support for Context, OnTimerContext, TimeContext in ProcessTableFunctionTestHarness
  • Added support for timer registration, watermark tracking and timer firing in ProcessTableFunctionTestHarness
  • Added an InvocationContext to capture per-invocation state to simplify the collector logic
  • Reworked derriving the output type from the system inference to account for on_time and uid
  • Fixed an issue in createStateConverter where the incorrect state converters were being used for Map/List state.

Verifying this change

This change added tests and can be verified as follows:

  • Added tests to ProcessFunctionTestHarnessesTest to cover timer firing, watermark advancement and context.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

2.1.156 (Claude Code)

@flinkbot

flinkbot commented Jun 5, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@autophagy autophagy marked this pull request as ready for review June 5, 2026 11:28
@fhueske fhueske changed the title Timers [FLINK-39378][table] Add support for Context, timers and on_time to PTF test harness Jun 5, 2026
@autophagy autophagy force-pushed the flink-39378-timers-2 branch from b05a727 to efe9d3a Compare June 8, 2026 09:59

@fhueske fhueske left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @autophagy!

I've reviewed the changes except for the test (will do that later this week).
Overall, the changes are good. Left a bunch of comments with suggestions and questions.

Cheers, Fabian


// Verify the timer was registered
assertThat(harness.getPendingTimers()).hasSize(1);
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to assert the timestamp of a timer as well?

assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");

// Advance watermark past the timer's timestamp to fire it
harness.clearOutput();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the output should still be empty, no?


harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 1)));
harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 2)));

show that two timers can be fired from one wm advancement?

Comment on lines +2567 to +2570
harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));
harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 8));
assertThat(harness.getOutput())
.containsExactly(
Row.of("P1", "count=2", LocalDateTime.of(2025, 1, 1, 0, 0, 6)),
Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 7)),);

Comment on lines -2585 to -2587
- `Context` parameter
- Timers (`onTimer`)
- `on_time` / `rowtime`

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙌

private static Method findOnTimerMethod(
Class<?> functionClass, List<ArgumentInfo> arguments) {
List<Method> candidates = ExtractionUtils.collectMethods(functionClass, "onTimer");
if (candidates.isEmpty()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we also fail if we have more than one candidate?
How would the framework decide which onTimer method to call?

ListView.class.isAssignableFrom(stateDataType.getConversionClass())
|| MapView.class.isAssignableFrom(
stateDataType.getConversionClass())
? stateDataType.getChildren().get(0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woundn't this just return the key type for MapView?

Comment on lines +1966 to +1971
Optional<List<StaticArgument>> staticArgsOpt = systemTypeInference.getStaticArguments();
List<StaticArgument> staticArgs =
staticArgsOpt.orElseThrow(
() ->
new IllegalStateException(
"SystemTypeInference has no static arguments"));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
Optional<List<StaticArgument>> staticArgsOpt = systemTypeInference.getStaticArguments();
List<StaticArgument> staticArgs =
staticArgsOpt.orElseThrow(
() ->
new IllegalStateException(
"SystemTypeInference has no static arguments"));
List<StaticArgument> staticArgs =
systemTypeInference.getStaticArguments().orElseThrow(
() ->
new IllegalStateException(
"SystemTypeInference has no static arguments"));

Can be folded?

* eligible to fire.
*/
@Internal
class TestHarnessTimerManager {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to add a separate test for this class.

Comment on lines +102 to +104
if (name != null) {
timerSet.removeIf(t -> name.equals(t.name));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also remove unnamed timers.
There can only be one timer per timestamp and name (also if name = null).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants