Skip to content

Commit a4e3084

Browse files
[FLINK-37857][tests] Add a DSV2 sink implementation to tests which can collect data
1 parent 86ee4b5 commit a4e3084

File tree

2 files changed

+177
-0
lines changed

2 files changed

+177
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.util.testing;
20+
21+
import org.apache.flink.api.connector.sink2.Sink;
22+
import org.apache.flink.api.connector.sink2.SinkWriter;
23+
import org.apache.flink.api.connector.sink2.WriterInitContext;
24+
25+
import java.io.IOException;
26+
import java.time.Duration;
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.concurrent.BlockingQueue;
31+
import java.util.concurrent.LinkedBlockingQueue;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
36+
/** Sink for collecting output during testing. */
37+
public class CollectingSink<T> implements Sink<T> {
38+
private static final long serialVersionUID = 1L;
39+
private static final List<BlockingQueue<Object>> queues =
40+
Collections.synchronizedList(new ArrayList<>());
41+
private static final AtomicInteger numSinks = new AtomicInteger(-1);
42+
private final int index;
43+
44+
public CollectingSink() {
45+
this.index = numSinks.incrementAndGet();
46+
queues.add(new LinkedBlockingQueue<>());
47+
}
48+
49+
@Override
50+
public SinkWriter<T> createWriter(WriterInitContext context) throws IOException {
51+
return new CollectingElementWriter(index);
52+
}
53+
54+
private class CollectingElementWriter implements SinkWriter<T> {
55+
private final int index;
56+
57+
public CollectingElementWriter(int index) {
58+
this.index = index;
59+
}
60+
61+
@Override
62+
public void write(T element, Context context) {
63+
queues.get(this.index).add(element);
64+
}
65+
66+
@Override
67+
public void flush(boolean endOfInput) {}
68+
69+
@Override
70+
public void close() {}
71+
}
72+
73+
@SuppressWarnings("unchecked")
74+
public List<T> getRemainingOutput() {
75+
return new ArrayList<>((BlockingQueue<T>) queues.get(this.index));
76+
}
77+
78+
public boolean isEmpty() {
79+
return queues.get(this.index).isEmpty();
80+
}
81+
82+
public T poll() throws TimeoutException {
83+
return this.poll(Duration.ofSeconds(15L));
84+
}
85+
86+
@SuppressWarnings("unchecked")
87+
public T poll(Duration duration) throws TimeoutException {
88+
Object element;
89+
90+
try {
91+
element = queues.get(this.index).poll(duration.toMillis(), TimeUnit.MILLISECONDS);
92+
} catch (InterruptedException var4) {
93+
throw new RuntimeException(var4);
94+
}
95+
96+
if (element == null) {
97+
throw new TimeoutException();
98+
} else {
99+
return (T) element;
100+
}
101+
}
102+
103+
public void close() {
104+
queues.get(this.index).clear();
105+
}
106+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.util;
20+
21+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
22+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
23+
import org.apache.flink.streaming.util.testing.CollectingSink;
24+
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
28+
import java.util.Arrays;
29+
import java.util.List;
30+
import java.util.concurrent.TimeoutException;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
34+
/** Tests {@link CollectingSink}. */
35+
class CollectingSinkTest {
36+
37+
private CollectingSink<Integer> sink;
38+
39+
@BeforeEach
40+
public void setup() throws Exception {
41+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
42+
env.setParallelism(1);
43+
44+
DataStreamSource<Integer> stream = env.fromData(0, 1, 2).setParallelism(1);
45+
46+
sink = new CollectingSink<>();
47+
stream.sinkTo(sink);
48+
env.execute();
49+
}
50+
51+
@Test
52+
void testGetRemainingOutputGivesBackData() {
53+
List<Integer> result = sink.getRemainingOutput();
54+
assertThat(result).containsExactlyInAnyOrderElementsOf(Arrays.asList(0, 1, 2));
55+
closeAndAssertEmpty();
56+
}
57+
58+
@Test
59+
void testPollGivesBackData() throws TimeoutException {
60+
for (int i = 0; i < 3; i++) {
61+
assertThat(sink.poll()).isEqualTo(i);
62+
}
63+
closeAndAssertEmpty();
64+
}
65+
66+
private void closeAndAssertEmpty() {
67+
sink.close();
68+
List<Integer> result = sink.getRemainingOutput();
69+
assertThat(result).isEmpty();
70+
}
71+
}

0 commit comments

Comments
 (0)