Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,18 @@
import org.apache.flink.table.types.logical.RowType;

import com.google.protobuf.ByteString;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Test for huge proto definition, which may trigger some special optimizations such as code
* splitting.
*/
public class BigPbProtoToRowTest {
class BigPbProtoToRowTest {

@Test
public void testSimple() throws Exception {
void testSimple() throws Exception {
BigPbClass.BigPbMessage bigPbMessage =
BigPbClass.BigPbMessage.newBuilder()
.setIntField1(5)
Expand Down Expand Up @@ -86,47 +83,47 @@ public void testSimple() throws Exception {
ProtobufTestHelper.pbBytesToRow(
BigPbClass.BigPbMessage.class, bigPbMessage.toByteArray());

assertEquals(5, row.getInt(0));
assertFalse(row.getBoolean(1));
assertEquals("test1", row.getString(2).toString());
assertArrayEquals(new byte[] {1, 2, 3}, row.getBinary(3));
assertEquals(2.5, row.getDouble(4), 0.0);
assertEquals(1.5F, row.getFloat(5), 0.0);
assertEquals(3, row.getInt(6));
assertEquals(7L, row.getLong(7));
assertEquals(9L, row.getLong(8));
assertArrayEquals(new byte[] {4, 5, 6}, row.getBinary(9));
assertEquals(6.5, row.getDouble(10), 0.0);
assertArrayEquals(new byte[] {7, 8, 9}, row.getBinary(11));
assertTrue(row.getBoolean(12));
assertEquals("test2", row.getString(13).toString());
assertEquals(3.5F, row.getFloat(14), 0.0);
assertEquals(8, row.getInt(15));
assertArrayEquals(new byte[] {10, 11, 12}, row.getBinary(16));
assertTrue(row.getBoolean(17));
assertEquals("test3", row.getString(18).toString());
assertEquals(4.5F, row.getFloat(19), 0.0);
assertEquals(1, row.getInt(20));
assertEquals(2L, row.getLong(21));
assertEquals(3, row.getInt(22));
assertEquals(4L, row.getLong(23));
assertEquals(5.5, row.getDouble(24), 0.0);
assertEquals(6, row.getInt(25));
assertEquals(7L, row.getLong(26));
assertTrue(row.getBoolean(27));
assertEquals("value1", row.getArray(28).getString(0).toString());
assertEquals("value2", row.getArray(28).getString(1).toString());
assertEquals("value3", row.getArray(28).getString(2).toString());
assertEquals(8.5F, row.getFloat(29), 0.0);
assertEquals("test4", row.getString(30).toString());
assertArrayEquals(new byte[] {13, 14, 15}, row.getMap(31).valueArray().getBinary(0));
assertArrayEquals(new byte[] {16, 17, 18}, row.getMap(31).valueArray().getBinary(1));
assertEquals("value1", row.getMap(32).valueArray().getString(0).toString());
assertEquals("value2", row.getMap(32).valueArray().getString(1).toString());
assertThat(row.getInt(0)).isEqualTo(5);
assertThat(row.getBoolean(1)).isFalse();
assertThat(row.getString(2).toString()).isEqualTo("test1");
assertThat(row.getBinary(3)).isEqualTo(new byte[] {1, 2, 3});
assertThat(row.getDouble(4)).isEqualTo(2.5);
assertThat(row.getFloat(5)).isEqualTo(1.5F);
assertThat(row.getInt(6)).isEqualTo(3);
assertThat(row.getLong(7)).isEqualTo(7L);
assertThat(row.getLong(8)).isEqualTo(9L);
assertThat(row.getBinary(9)).isEqualTo(new byte[] {4, 5, 6});
assertThat(row.getDouble(10)).isEqualTo(6.5);
assertThat(row.getBinary(11)).isEqualTo(new byte[] {7, 8, 9});
assertThat(row.getBoolean(12)).isTrue();
assertThat(row.getString(13).toString()).isEqualTo("test2");
assertThat(row.getFloat(14)).isEqualTo(3.5F);
assertThat(row.getInt(15)).isEqualTo(8);
assertThat(row.getBinary(16)).isEqualTo(new byte[] {10, 11, 12});
assertThat(row.getBoolean(17)).isTrue();
assertThat(row.getString(18).toString()).isEqualTo("test3");
assertThat(row.getFloat(19)).isEqualTo(4.5F);
assertThat(row.getInt(20)).isEqualTo(1);
assertThat(row.getLong(21)).isEqualTo(2L);
assertThat(row.getInt(22)).isEqualTo(3);
assertThat(row.getLong(23)).isEqualTo(4L);
assertThat(row.getDouble(24)).isEqualTo(5.5);
assertThat(row.getInt(25)).isEqualTo(6);
assertThat(row.getLong(26)).isEqualTo(7L);
assertThat(row.getBoolean(27)).isTrue();
assertThat(row.getArray(28).getString(0).toString()).isEqualTo("value1");
assertThat(row.getArray(28).getString(1).toString()).isEqualTo("value2");
assertThat(row.getArray(28).getString(2).toString()).isEqualTo("value3");
assertThat(row.getFloat(29)).isEqualTo(8.5F);
assertThat(row.getString(30).toString()).isEqualTo("test4");
assertThat(row.getMap(31).valueArray().getBinary(0)).isEqualTo(new byte[] {13, 14, 15});
assertThat(row.getMap(31).valueArray().getBinary(1)).isEqualTo(new byte[] {16, 17, 18});
assertThat(row.getMap(32).valueArray().getString(0).toString()).isEqualTo("value1");
assertThat(row.getMap(32).valueArray().getString(1).toString()).isEqualTo("value2");
}

@Test
public void testSplitInDeserialization() throws Exception {
void testSplitInDeserialization() throws Exception {
RowType rowType = PbToRowTypeUtil.generateRowType(BigPbClass.BigPbMessage.getDescriptor());
PbFormatConfig formatConfig =
new PbFormatConfig(BigPbClass.BigPbMessage.class.getName(), false, false, "");
Expand All @@ -135,6 +132,6 @@ public void testSplitInDeserialization() throws Exception {
rowType, InternalTypeInfo.of(rowType), formatConfig);
pbRowDataDeserializationSchema.open(null);
// make sure code is split
assertTrue(pbRowDataDeserializationSchema.isCodeSplit());
assertThat(pbRowDataDeserializationSchema.isCodeSplit()).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,21 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.RowType;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Test for huge proto definition, which may trigger some special optimizations such as code
* splitting.
*/
public class BigPbRowToProtoTest {
class BigPbRowToProtoTest {

@Test
public void testSimple() throws Exception {
void testSimple() throws Exception {
GenericRowData rowData = new GenericRowData(33);
rowData.setField(0, 20);
rowData.setField(1, false);
Expand Down Expand Up @@ -98,69 +96,66 @@ public void testSimple() throws Exception {
byte[] bytes = ProtobufTestHelper.rowToPbBytes(rowData, BigPbClass.BigPbMessage.class);

BigPbClass.BigPbMessage bigPbMessage = BigPbClass.BigPbMessage.parseFrom(bytes);
assertEquals(rowData.getField(0), bigPbMessage.getIntField1());
assertEquals(rowData.getField(1), bigPbMessage.getBoolField2());
assertEquals(rowData.getField(2).toString(), bigPbMessage.getStringField3());
assertArrayEquals(
((byte[]) rowData.getField(3)), bigPbMessage.getBytesField4().toByteArray());
assertEquals(rowData.getField(4), bigPbMessage.getDoubleField5());
assertEquals(rowData.getField(5), bigPbMessage.getFloatField6());
assertEquals(rowData.getField(6), bigPbMessage.getUint32Field7());
assertEquals(rowData.getField(7), bigPbMessage.getInt64Field8());
assertEquals(rowData.getField(8), bigPbMessage.getUint64Field9());
assertArrayEquals(
((byte[]) rowData.getField(9)), bigPbMessage.getBytesField10().toByteArray());
assertEquals(rowData.getField(10), bigPbMessage.getDoubleField11());
assertArrayEquals(
((byte[]) rowData.getField(11)), bigPbMessage.getBytesField12().toByteArray());
assertEquals(rowData.getField(12), bigPbMessage.getBoolField13());
assertEquals(rowData.getField(13).toString(), bigPbMessage.getStringField14());
assertEquals(rowData.getField(14), bigPbMessage.getFloatField15());
assertEquals(rowData.getField(15), bigPbMessage.getInt32Field16());
assertArrayEquals(
((byte[]) rowData.getField(16)), bigPbMessage.getBytesField17().toByteArray());
assertEquals(rowData.getField(17), bigPbMessage.getBoolField18());
assertEquals(rowData.getField(18).toString(), bigPbMessage.getStringField19());
assertEquals(rowData.getField(19), bigPbMessage.getFloatField20());
assertEquals(rowData.getField(20), bigPbMessage.getFixed32Field21());
assertEquals(rowData.getField(21), bigPbMessage.getFixed64Field22());
assertEquals(rowData.getField(22), bigPbMessage.getSfixed32Field23());
assertEquals(rowData.getField(23), bigPbMessage.getSfixed64Field24());
assertEquals(rowData.getField(24), bigPbMessage.getDoubleField25());
assertEquals(rowData.getField(25), bigPbMessage.getUint32Field26());
assertEquals(rowData.getField(26), bigPbMessage.getUint64Field27());
assertEquals(rowData.getField(27), bigPbMessage.getBoolField28());
assertEquals(
((GenericArrayData) rowData.getField(28)).getString(0).toString(),
bigPbMessage.getField29List().get(0));
assertEquals(
((GenericArrayData) rowData.getField(28)).getString(1).toString(),
bigPbMessage.getField29List().get(1));
assertEquals(
((GenericArrayData) rowData.getField(28)).getString(2).toString(),
bigPbMessage.getField29List().get(2));
assertEquals(rowData.getField(29), bigPbMessage.getFloatField30());
assertEquals(rowData.getField(30).toString(), bigPbMessage.getStringField31());
assertThat(bigPbMessage.getIntField1()).isEqualTo(rowData.getField(0));
assertThat(bigPbMessage.getBoolField2()).isEqualTo(rowData.getField(1));
assertThat(bigPbMessage.getStringField3()).isEqualTo(rowData.getField(2).toString());
assertThat(bigPbMessage.getBytesField4().toByteArray())
.isEqualTo(((byte[]) rowData.getField(3)));
assertThat(bigPbMessage.getDoubleField5()).isEqualTo(rowData.getField(4));
assertThat(bigPbMessage.getFloatField6()).isEqualTo(rowData.getField(5));
assertThat(bigPbMessage.getUint32Field7()).isEqualTo(rowData.getField(6));
assertThat(bigPbMessage.getInt64Field8()).isEqualTo(rowData.getField(7));
assertThat(bigPbMessage.getUint64Field9()).isEqualTo(rowData.getField(8));
assertThat(bigPbMessage.getBytesField10().toByteArray())
.isEqualTo(((byte[]) rowData.getField(9)));
assertThat(bigPbMessage.getDoubleField11()).isEqualTo(rowData.getField(10));
assertThat(bigPbMessage.getBytesField12().toByteArray())
.isEqualTo(((byte[]) rowData.getField(11)));
assertThat(bigPbMessage.getBoolField13()).isEqualTo(rowData.getField(12));
assertThat(bigPbMessage.getStringField14()).isEqualTo(rowData.getField(13).toString());
assertThat(bigPbMessage.getFloatField15()).isEqualTo(rowData.getField(14));
assertThat(bigPbMessage.getInt32Field16()).isEqualTo(rowData.getField(15));
assertThat(bigPbMessage.getBytesField17().toByteArray())
.isEqualTo(((byte[]) rowData.getField(16)));
assertThat(bigPbMessage.getBoolField18()).isEqualTo(rowData.getField(17));
assertThat(bigPbMessage.getStringField19()).isEqualTo(rowData.getField(18).toString());
assertThat(bigPbMessage.getFloatField20()).isEqualTo(rowData.getField(19));
assertThat(bigPbMessage.getFixed32Field21()).isEqualTo(rowData.getField(20));
assertThat(bigPbMessage.getFixed64Field22()).isEqualTo(rowData.getField(21));
assertThat(bigPbMessage.getSfixed32Field23()).isEqualTo(rowData.getField(22));
assertThat(bigPbMessage.getSfixed64Field24()).isEqualTo(rowData.getField(23));
assertThat(bigPbMessage.getDoubleField25()).isEqualTo(rowData.getField(24));
assertThat(bigPbMessage.getUint32Field26()).isEqualTo(rowData.getField(25));
assertThat(bigPbMessage.getUint64Field27()).isEqualTo(rowData.getField(26));
assertThat(bigPbMessage.getBoolField28()).isEqualTo(rowData.getField(27));
assertThat(bigPbMessage.getField29List().get(0))
.isEqualTo(((GenericArrayData) rowData.getField(28)).getString(0).toString());
assertThat(bigPbMessage.getField29List().get(1))
.isEqualTo(((GenericArrayData) rowData.getField(28)).getString(1).toString());
assertThat(bigPbMessage.getField29List().get(2))
.isEqualTo(((GenericArrayData) rowData.getField(28)).getString(2).toString());
assertThat(bigPbMessage.getFloatField30()).isEqualTo(rowData.getField(29));
assertThat(bigPbMessage.getStringField31()).isEqualTo(rowData.getField(30).toString());

ArrayData keySet = rowData.getMap(32).keyArray();
ArrayData valueSet = rowData.getMap(32).valueArray();
assertEquals(keySet.getString(0).toString(), "key2");
assertEquals(keySet.getString(1).toString(), "key3");
assertEquals(keySet.getString(2).toString(), "key1");
assertEquals(valueSet.getString(0).toString(), "value2");
assertEquals(valueSet.getString(1).toString(), "value3");
assertEquals(valueSet.getString(2).toString(), "value1");
assertThat(keySet.getString(0).toString()).isEqualTo("key2");
assertThat(keySet.getString(1).toString()).isEqualTo("key3");
assertThat(keySet.getString(2).toString()).isEqualTo("key1");
assertThat(valueSet.getString(0).toString()).isEqualTo("value2");
assertThat(valueSet.getString(1).toString()).isEqualTo("value3");
assertThat(valueSet.getString(2).toString()).isEqualTo("value1");
}

@Test
public void testSplitInSerialization() throws Exception {
void testSplitInSerialization() throws Exception {
RowType rowType = PbToRowTypeUtil.generateRowType(BigPbClass.BigPbMessage.getDescriptor());
PbFormatConfig formatConfig =
new PbFormatConfig(BigPbClass.BigPbMessage.class.getName(), false, false, "");
PbRowDataSerializationSchema pbRowDataSerializationSchema =
new PbRowDataSerializationSchema(rowType, formatConfig);
pbRowDataSerializationSchema.open(null);
// make sure code is split
assertTrue(pbRowDataSerializationSchema.isCodeSplit());
assertThat(pbRowDataSerializationSchema.isCodeSplit()).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
import org.apache.flink.table.data.RowData;

import com.google.protobuf.ByteString;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/** Test conversion of proto map data to flink internal data. */
public class MapProtoToRowTest {
class MapProtoToRowTest {
@Test
public void testMessage() throws Exception {
void testMessage() throws Exception {
MapTest.InnerMessageTest innerMessageTest =
MapTest.InnerMessageTest.newBuilder().setA(1).setB(2).build();
MapTest mapTest =
Expand All @@ -45,20 +44,20 @@ public void testMessage() throws Exception {
RowData row = ProtobufTestHelper.pbBytesToRow(MapTest.class, mapTest.toByteArray());

MapData map1 = row.getMap(1);
assertEquals("a", map1.keyArray().getString(0).toString());
assertEquals("b", map1.valueArray().getString(0).toString());
assertEquals("c", map1.keyArray().getString(1).toString());
assertEquals("d", map1.valueArray().getString(1).toString());
assertThat(map1.keyArray().getString(0).toString()).isEqualTo("a");
assertThat(map1.valueArray().getString(0).toString()).isEqualTo("b");
assertThat(map1.keyArray().getString(1).toString()).isEqualTo("c");
assertThat(map1.valueArray().getString(1).toString()).isEqualTo("d");

MapData map2 = row.getMap(2);
assertEquals("f", map2.keyArray().getString(0).toString());
assertThat(map2.keyArray().getString(0).toString()).isEqualTo("f");
RowData rowData2 = map2.valueArray().getRow(0, 2);

assertEquals(1, rowData2.getInt(0));
assertEquals(2L, rowData2.getLong(1));
assertThat(rowData2.getInt(0)).isEqualTo(1);
assertThat(rowData2.getLong(1)).isEqualTo(2L);

MapData map3 = row.getMap(3);
assertEquals("e", map3.keyArray().getString(0).toString());
assertArrayEquals(new byte[] {1, 2, 3}, map3.valueArray().getBinary(0));
assertThat(map3.keyArray().getString(0).toString()).isEqualTo("e");
assertThat(map3.valueArray().getBinary(0)).isEqualTo(new byte[] {1, 2, 3});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
import org.apache.flink.table.data.StringData;

import com.google.protobuf.ByteString;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/** Test conversion of flink internal map data to proto data. */
public class MapRowToProtoTest {
class MapRowToProtoTest {
@Test
public void testSimple() throws Exception {
void testSimple() throws Exception {
Map<StringData, StringData> map1 = new HashMap<>();
map1.put(StringData.fromString("a"), StringData.fromString("b"));
Map<StringData, RowData> map2 = new HashMap<>();
Expand All @@ -52,20 +52,21 @@ public void testSimple() throws Exception {
byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, MapTest.class);

MapTest mapTest = MapTest.parseFrom(bytes);
assertEquals(1, mapTest.getA());
assertEquals("b", mapTest.getMap1Map().get("a"));
assertThat(mapTest.getA()).isEqualTo(1);
assertThat(mapTest.getMap1Map().get("a")).isEqualTo("b");
MapTest.InnerMessageTest innerMessageTest = mapTest.getMap2Map().get("c");
assertEquals(1, innerMessageTest.getA());
assertEquals(2L, innerMessageTest.getB());
assertEquals(ByteString.copyFrom(new byte[] {1, 2, 3}), mapTest.getMap3Map().get("e"));
assertThat(innerMessageTest.getA()).isEqualTo(1);
assertThat(innerMessageTest.getB()).isEqualTo(2L);
assertThat(mapTest.getMap3Map().get("e"))
.isEqualTo(ByteString.copyFrom(new byte[] {1, 2, 3}));
}

@Test
public void testNull() throws Exception {
void testNull() throws Exception {
RowData row = GenericRowData.of(1, null, null, null);
byte[] bytes = ProtobufTestHelper.rowToPbBytes(row, MapTest.class);
MapTest mapTest = MapTest.parseFrom(bytes);
Map<String, String> map = mapTest.getMap1Map();
assertEquals(0, map.size());
assertThat(map.size()).isEqualTo(0);
}
}
Loading