Skip to content

Commit 6ac0145

Browse files
committed
[FLINK-8994] [tests] Let general purpose DataStream job include Avro as state
This closes apache#6435.
1 parent 0d6040b commit 6ac0145

File tree

5 files changed

+126
-8
lines changed

5 files changed

+126
-8
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ tmp
1717
*.log
1818
.DS_Store
1919
build-target
20+
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/avro/
2021
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/
2122
flink-runtime-web/web-dashboard/assets/fonts/
2223
flink-runtime-web/web-dashboard/node_modules/

flink-end-to-end-tests/flink-datastream-allround-test/pom.xml

+34
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,40 @@ under the License.
7676
</execution>
7777
</executions>
7878
</plugin>
79+
<plugin>
80+
<groupId>org.apache.avro</groupId>
81+
<artifactId>avro-maven-plugin</artifactId>
82+
<version>${avro.version}</version>
83+
<executions>
84+
<execution>
85+
<phase>generate-sources</phase>
86+
<goals>
87+
<goal>schema</goal>
88+
</goals>
89+
<configuration>
90+
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
91+
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
92+
<fieldVisibility>PRIVATE</fieldVisibility>
93+
<includes>
94+
<include>**/*.avsc</include>
95+
</includes>
96+
<!--
97+
This forces Avro to use Java Strings instead of Avro's Utf8.
98+
This is required since the job relies on equals checks on some String fields
99+
to verify that state restore was successful.
100+
-->
101+
<stringType>String</stringType>
102+
</configuration>
103+
</execution>
104+
</executions>
105+
</plugin>
106+
<plugin>
107+
<groupId>org.apache.maven.plugins</groupId>
108+
<artifactId>maven-checkstyle-plugin</artifactId>
109+
<configuration>
110+
<excludes>**/org/apache/flink/streaming/tests/avro/*</excludes>
111+
</configuration>
112+
</plugin>
79113
</plugins>
80114
</build>
81115

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
{"namespace": "org.apache.flink.streaming.tests.avro",
19+
"type": "record",
20+
"name": "ComplexPayloadAvro",
21+
"fields": [
22+
{
23+
"name": "eventTime",
24+
"type": "long",
25+
"default": ""
26+
},
27+
{
28+
"name": "stringList",
29+
"type": {
30+
"type": "array",
31+
"items": {
32+
"type": "string"
33+
}
34+
}
35+
},
36+
{
37+
"name": "strPayload",
38+
"type": "string",
39+
"default": ""
40+
},
41+
{
42+
"name": "innerPayLoad",
43+
"type": {
44+
"name": "InnerPayLoadAvro",
45+
"type": "record",
46+
"fields": [
47+
{"name": "sequenceNumber", "type": "long"}
48+
]
49+
}
50+
}
51+
]
52+
}

flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java

+37-6
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,18 @@
2222
import org.apache.flink.api.common.state.ValueState;
2323
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
2424
import org.apache.flink.api.java.utils.ParameterTool;
25+
import org.apache.flink.formats.avro.typeutils.AvroSerializer;
2526
import org.apache.flink.streaming.api.datastream.DataStream;
2627
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2728
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
2829
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
2930
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
3031
import org.apache.flink.streaming.tests.artificialstate.ComplexPayload;
32+
import org.apache.flink.streaming.tests.avro.ComplexPayloadAvro;
33+
import org.apache.flink.streaming.tests.avro.InnerPayLoadAvro;
3134
import org.apache.flink.util.Collector;
3235

36+
import java.util.Arrays;
3337
import java.util.Collections;
3438

3539
import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.applyTumblingWindows;
@@ -72,26 +76,53 @@ public static void main(String[] args) throws Exception {
7276

7377
setupEnvironment(env, pt);
7478

79+
// add a keyed stateful map operator, which uses Kryo for state serialization
7580
DataStream<Event> eventStream = env.addSource(createEventSource(pt))
7681
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
7782
.keyBy(Event::getKey)
7883
.map(createArtificialKeyedStateMapper(
7984
// map function simply forwards the inputs
8085
(MapFunction<Event, Event>) in -> in,
8186
// state is verified and updated per event as a wrapped ComplexPayload state object
82-
(Event first, ComplexPayload second) -> {
83-
if (second != null && !second.getStrPayload().equals(KEYED_STATE_OPER_NAME)) {
87+
(Event event, ComplexPayload lastState) -> {
88+
if (lastState != null && !lastState.getStrPayload().equals(KEYED_STATE_OPER_NAME)
89+
&& lastState.getInnerPayLoad().getSequenceNumber() == (event.getSequenceNumber() - 1)) {
8490
System.out.println("State is set or restored incorrectly");
8591
}
86-
return new ComplexPayload(first, KEYED_STATE_OPER_NAME);
92+
return new ComplexPayload(event, KEYED_STATE_OPER_NAME);
8793
},
8894
Collections.singletonList(
8995
new KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom KryoSerializer
9096
Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
9197
)
92-
)
93-
.name(KEYED_STATE_OPER_NAME)
94-
.returns(Event.class);
98+
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo");
99+
100+
// add a keyed stateful map operator, which uses Avro for state serialization
101+
eventStream = eventStream
102+
.keyBy(Event::getKey)
103+
.map(createArtificialKeyedStateMapper(
104+
// map function simply forwards the inputs
105+
(MapFunction<Event, Event>) in -> in,
106+
// state is verified and updated per event as a wrapped ComplexPayloadAvro state object
107+
(Event event, ComplexPayloadAvro lastState) -> {
108+
if (lastState != null && !lastState.getStrPayload().equals(KEYED_STATE_OPER_NAME)
109+
&& lastState.getInnerPayLoad().getSequenceNumber() == (event.getSequenceNumber() - 1)) {
110+
System.out.println("State is set or restored incorrectly");
111+
}
112+
113+
ComplexPayloadAvro payload = new ComplexPayloadAvro();
114+
payload.setEventTime(event.getEventTime());
115+
payload.setInnerPayLoad(new InnerPayLoadAvro(event.getSequenceNumber()));
116+
payload.setStrPayload(KEYED_STATE_OPER_NAME);
117+
payload.setStringList(Arrays.asList(String.valueOf(event.getKey()), event.getPayload()));
118+
119+
return payload;
120+
},
121+
Collections.singletonList(
122+
new AvroSerializer<>(ComplexPayloadAvro.class)), // custom AvroSerializer
123+
Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type extraction
124+
)
125+
).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro");
95126

96127
DataStream<Event> eventStream2 = eventStream
97128
.map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in))

flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR
9595

9696
wait_job_running $DATASTREAM_JOB
9797

98-
wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
98+
wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
9999

100100
# take a savepoint of the state machine job
101101
SAVEPOINT_PATH=$(take_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
@@ -120,7 +120,7 @@ DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TES
120120

121121
wait_job_running $DATASTREAM_JOB
122122

123-
wait_oper_metric_num_in_records ArtificalKeyedStateMapper.0 200
123+
wait_oper_metric_num_in_records SemanticsCheckMapper.0 200
124124

125125
# if state is errorneous and the state machine job produces alerting state transitions,
126126
# output would be non-empty and the test will not pass

0 commit comments

Comments
 (0)