diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 701ee007223bf..5cf00c1b30c7a 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -37,7 +37,7 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXISTS test;" # grant access to `test` for ci test user mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test.* TO 'mysqluser'@'%';" -# create a table named t_remote +# creates two table named t_remote_0, t_remote_1 mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql echo "--- preparing postgresql" @@ -92,10 +92,19 @@ sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' sleep 1 # check sink destination mysql using shell -diff -u ./e2e_test/sink/remote/mysql_expected_result.tsv \ -<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote ORDER BY id") +diff -u ./e2e_test/sink/remote/mysql_expected_result_0.tsv \ +<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote_0 ORDER BY id") if [ $? -eq 0 ]; then - echo "mysql sink check passed" + echo "mysql sink check 0 passed" +else + echo "The output is not as expected." + exit 1 +fi + +diff -u ./e2e_test/sink/remote/mysql_expected_result_1.tsv \ +<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT id, v_varchar, v_text, v_integer, v_smallint, v_bigint, v_decimal, v_real, v_double, v_boolean, v_date, v_time, v_timestamp, v_jsonb, TO_BASE64(v_bytea) FROM test.t_remote_1 ORDER BY id") +if [ $? -eq 0 ]; then + echo "mysql sink check 1 passed" else echo "The output is not as expected." exit 1 diff --git a/e2e_test/sink/remote/jdbc.check.pg.slt b/e2e_test/sink/remote/jdbc.check.pg.slt index 7c8de88f41de3..f68761f8a7227 100644 --- a/e2e_test/sink/remote/jdbc.check.pg.slt +++ b/e2e_test/sink/remote/jdbc.check.pg.slt @@ -1,10 +1,19 @@ # the check is run on sink destination postgres database query I -select * from t_remote order by id; +select * from t_remote_0 order by id; ---- 1 Alex 28208 281620391 4986480304337356659 28162.0391 2.03 28162.0391 2023-03-20 10:18:30 3 Carl 18300 1702307129 7878292368468104216 17023.07129 23.07 17023.07129 2023-03-20 10:18:32 4 Doris 17250 151951802 3946135584462581863 1519518.02 18.02 1519518.02 2023-03-21 10:18:30 5 Eve 9725 698160808 524334216698825611 69.8160808 69.81 69.8160808 2023-03-21 10:18:31 6 Frank 28131 1233587627 8492820454814063326 123358.7627 58.76 123358.7627 2023-03-21 10:18:32 + +query II +select * from t_remote_1 order by id; +---- +1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 {"key": "value"} \xdeadbeef +3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 t 2023-05-24 12:34:56 2023-05-24 12:34:56 {"key": "value3"} \xcafebabe +4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 f 2023-05-25 23:45:01 2023-05-25 23:45:01 {"key": "value4"} \xbabec0de +5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 {"key": "value5"} \xdeadbabe +6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 {"key": "value6"} \xdeadbabe diff --git a/e2e_test/sink/remote/jdbc.load.slt b/e2e_test/sink/remote/jdbc.load.slt index d75a6922a0c98..93448382b4096 100644 --- a/e2e_test/sink/remote/jdbc.load.slt +++ b/e2e_test/sink/remote/jdbc.load.slt @@ -1,5 +1,5 @@ statement ok -create table t_remote ( +create table t_remote_0 ( id integer primary key, v_varchar varchar, v_smallint smallint, @@ -12,59 +12,127 @@ create table t_remote ( ); statement ok -create materialized view mv_remote as select * from t_remote; +CREATE TABLE t_remote_1 ( + id BIGINT PRIMARY KEY, + v_varchar VARCHAR, + v_text TEXT, + v_integer INTEGER, + v_smallint SMALLINT, + v_bigint BIGINT, + v_decimal DECIMAL, + v_real REAL, + v_double DOUBLE PRECISION, + v_boolean BOOLEAN, + v_date DATE, + v_time TIME, + v_timestamp TIMESTAMP, + v_jsonb JSONB, + v_bytea BYTEA +); + +statement ok +create materialized view mv_remote_0 as select * from t_remote_0; + +statement ok +create materialized view mv_remote_1 as select * from t_remote_1; statement ok -CREATE SINK s_postgres FROM mv_remote WITH ( +CREATE SINK s_postgres_0 FROM mv_remote_0 WITH ( connector='jdbc', jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', - table.name='t_remote', + table.name='t_remote_0', + type='upsert' +); + +statement ok +CREATE SINK s_postgres_1 FROM mv_remote_1 WITH ( + connector='jdbc', + jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', + table.name='t_remote_1', + type='upsert' +); + +statement ok +CREATE SINK s_mysql_0 FROM mv_remote_0 WITH ( + connector='jdbc', + jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw', + table.name='t_remote_0', type='upsert' ); statement ok -CREATE SINK s_mysql FROM mv_remote WITH ( +CREATE SINK s_mysql_1 FROM mv_remote_1 WITH ( connector='jdbc', jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw', - table.name='t_remote', + table.name='t_remote_1', type='upsert' ); statement ok -INSERT INTO t_remote VALUES +INSERT INTO t_remote_0 VALUES (1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'), (2, 'Bob', 10580, 2131030003, 3074255027698877876, 21310.30003, 10.3, 21310.30003, '2023-03-20 10:18:31'), (3, 'Carl', 18300, 1702307129, 7878292368468104216, 17023.07129, 23.07, 17023.07129, '2023-03-20 10:18:32'); statement ok -INSERT INTO t_remote VALUES +INSERT INTO t_remote_0 VALUES (4, 'Doris', 17250, 151951802, 3946135584462581863, 1519518.02, 18.02, 1519518.02, '2023-03-21 10:18:30'), (5, 'Eve', 9725, 698160808, 524334216698825611, 69.8160808, 69.81, 69.8160808, '2023-03-21 10:18:31'), (6, 'Frank', 28131, 1233587627, 8492820454814063326, 123358.7627, 58.76, 123358.7627, '2023-03-21 10:18:32'); +statement ok +INSERT INTO t_remote_1 VALUES + (1, 'Varchar value 1', 'Text value 1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '{"key": "value"}', E'\\xDEADBEEF'), + (2, 'Varchar value 2', 'Text value 2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '{"key": "value2"}', E'\\xFEEDBEEF'), + (3, 'Varchar value 3', 'Text value 3', 345, 678, 901, 34.56, 78.90, 12.34, TRUE, '2023-05-24', '12:34:56', '2023-05-24 12:34:56', '{"key": "value3"}', E'\\xCAFEBABE'); + +statement ok +INSERT INTO t_remote_1 VALUES + (4, 'Varchar value 4', 'Text value 4', 456, 789, 012, 45.67, 89.01, 23.45, FALSE, '2023-05-25', '23:45:01', '2023-05-25 23:45:01', '{"key": "value4"}', E'\\xBABEC0DE'), + (5, 'Varchar value 5', 'Text value 5', 567, 890, 123, 56.78, 90.12, 34.56, TRUE, '2023-05-26', '12:34:56', '2023-05-26 12:34:56', '{"key": "value5"}', E'\\xDEADBABE'), + (6, 'Varchar value 6', 'Text value 6', 789, 123, 456, 67.89, 34.56, 78.91, FALSE, '2023-05-27', '23:45:01', '2023-05-27 23:45:01', '{"key": "value6"}', E'\\xDEADBABE'); + statement ok FLUSH; statement ok -UPDATE t_remote SET v_varchar = 'Alex' WHERE id = 1; +UPDATE t_remote_0 SET v_varchar = 'Alex' WHERE id = 1; + +statement ok +UPDATE t_remote_1 SET v_varchar = 'Alex' WHERE id = 1; + +statement ok +DELETE FROM t_remote_0 WHERE id = 2; statement ok -DELETE FROM t_remote WHERE id = 2; +DELETE FROM t_remote_1 WHERE id = 2; statement ok FLUSH; statement ok -DROP SINK s_postgres; +DROP SINK s_postgres_0; + +statement ok +DROP SINK s_postgres_1; + +statement ok +DROP SINK s_mysql_0; + +statement ok +DROP SINK s_mysql_1; + +statement ok +DROP MATERIALIZED VIEW mv_remote_0; statement ok -DROP SINK s_mysql +DROP MATERIALIZED VIEW mv_remote_1; statement ok -DROP MATERIALIZED VIEW mv_remote; +DROP TABLE t_remote_0; statement ok -DROP TABLE t_remote; +DROP TABLE t_remote_1; statement ok FLUSH; diff --git a/e2e_test/sink/remote/mysql_create_table.sql b/e2e_test/sink/remote/mysql_create_table.sql index 491072eb59d63..1ed5aa3d3a333 100644 --- a/e2e_test/sink/remote/mysql_create_table.sql +++ b/e2e_test/sink/remote/mysql_create_table.sql @@ -1,4 +1,4 @@ -CREATE TABLE t_remote ( +CREATE TABLE t_remote_0 ( id integer PRIMARY KEY, v_varchar varchar(100), v_smallint smallint, @@ -8,4 +8,22 @@ CREATE TABLE t_remote ( v_float float, v_double double, v_timestamp timestamp -); \ No newline at end of file +); + +CREATE TABLE t_remote_1 ( + id BIGINT PRIMARY KEY, + v_varchar VARCHAR(255), + v_text TEXT, + v_integer INT, + v_smallint SMALLINT, + v_bigint BIGINT, + v_decimal DECIMAL(10,2), + v_real FLOAT, + v_double DOUBLE, + v_boolean BOOLEAN, + v_date DATE, + v_time TIME, + v_timestamp TIMESTAMP, + v_jsonb JSON, + v_bytea BLOB +); diff --git a/e2e_test/sink/remote/mysql_expected_result.tsv b/e2e_test/sink/remote/mysql_expected_result_0.tsv similarity index 100% rename from e2e_test/sink/remote/mysql_expected_result.tsv rename to e2e_test/sink/remote/mysql_expected_result_0.tsv diff --git a/e2e_test/sink/remote/mysql_expected_result_1.tsv b/e2e_test/sink/remote/mysql_expected_result_1.tsv new file mode 100644 index 0000000000000..65d1ea8b28de6 --- /dev/null +++ b/e2e_test/sink/remote/mysql_expected_result_1.tsv @@ -0,0 +1,5 @@ +1 Alex Text value 1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 {"key": "value"} 3q2+7w== +3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 1 2023-05-24 12:34:56 2023-05-24 12:34:56 {"key": "value3"} yv66vg== +4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 0 2023-05-25 23:45:01 2023-05-25 23:45:01 {"key": "value4"} ur7A3g== +5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 1 2023-05-26 12:34:56 2023-05-26 12:34:56 {"key": "value5"} 3q26vg== +6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 0 2023-05-27 23:45:01 2023-05-27 23:45:01 {"key": "value6"} 3q26vg== diff --git a/e2e_test/sink/remote/pg_create_table.sql b/e2e_test/sink/remote/pg_create_table.sql index c20e3386e8d06..0b1b91c5a362b 100644 --- a/e2e_test/sink/remote/pg_create_table.sql +++ b/e2e_test/sink/remote/pg_create_table.sql @@ -1,4 +1,4 @@ -CREATE TABLE t_remote ( +CREATE TABLE t_remote_0 ( id integer PRIMARY KEY, v_varchar varchar(100), v_smallint smallint, @@ -8,4 +8,22 @@ CREATE TABLE t_remote ( v_float real, v_double double precision, v_timestamp timestamp -); \ No newline at end of file +); + +CREATE TABLE t_remote_1 ( + id BIGINT PRIMARY KEY, + v_varchar VARCHAR(255), + v_text TEXT, + v_integer INTEGER, + v_smallint SMALLINT, + v_bigint BIGINT, + v_decimal DECIMAL(10,2), + v_real REAL, + v_double DOUBLE PRECISION, + v_boolean BOOLEAN, + v_date DATE, + v_time TIME, + v_timestamp TIMESTAMP, + v_jsonb JSONB, + v_bytea BYTEA +); diff --git a/integration_tests/postgres-cdc/docker-compose.yml b/integration_tests/postgres-cdc/docker-compose.yml index 142ec8ce7584b..c68035b33feb8 100644 --- a/integration_tests/postgres-cdc/docker-compose.yml +++ b/integration_tests/postgres-cdc/docker-compose.yml @@ -42,7 +42,7 @@ services: - POSTGRES_PASSWORD=123456 - POSTGRES_DB=mydb ports: - - 5432:5432 + - 8432:5432 healthcheck: test: [ "CMD-SHELL", "pg_isready --username=myuser --dbname=mydb" ] interval: 5s diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java index 425250ea19cd6..641a5b2ea309d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java @@ -22,8 +22,13 @@ import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload; import com.risingwave.proto.Data; +import java.io.ByteArrayInputStream; import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalDate; +import java.util.Base64; import java.util.Map; public class JsonDeserializer implements Deserializer { @@ -130,6 +135,28 @@ private static BigDecimal castDecimal(Object value) { } } + private static Time castTime(Object value) { + try { + Long milli = castLong(value); + return new Time(milli); + } catch (RuntimeException e) { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("unable to cast into time from " + value.getClass()) + .asRuntimeException(); + } + } + + private static Date castDate(Object value) { + try { + Long days = castLong(value) - 1; + return Date.valueOf(LocalDate.of(1, 1, 1).plusDays(days)); + } catch (RuntimeException e) { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("unable to cast into date from " + value.getClass()) + .asRuntimeException(); + } + } + private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) { // value might be null if (value == null) { @@ -171,6 +198,32 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj .asRuntimeException(); } return Timestamp.valueOf((String) value); + case TIME: + return castTime(value); + case DATE: + return castDate(value); + case INTERVAL: + if (!(value instanceof String)) { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("Expected interval, got " + value.getClass()) + .asRuntimeException(); + } + return value; + case JSONB: + if (!(value instanceof String)) { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("Expected jsonb, got " + value.getClass()) + .asRuntimeException(); + } + return value; + case BYTEA: + if (!(value instanceof String)) { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("Expected bytea, got " + value.getClass()) + .asRuntimeException(); + } + byte[] bytes = Base64.getDecoder().decode((String) value); + return new ByteArrayInputStream(bytes); default: throw io.grpc.Status.INVALID_ARGUMENT .withDescription("unsupported type " + typeName) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java index c2d755a9ccc0f..148f938660f4f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java @@ -116,6 +116,15 @@ static ValueGetter[] buildValueGetter(TableSchema tableSchema) { return row.getTimestamp(index); }; break; + case TIME: + ret[i] = + row -> { + if (row.isNull(index)) { + return null; + } + return row.getTime(index); + }; + break; case DECIMAL: ret[i] = row -> { @@ -125,6 +134,45 @@ static ValueGetter[] buildValueGetter(TableSchema tableSchema) { return row.getDecimal(index); }; break; + case DATE: + ret[i] = + row -> { + if (row.isNull(index)) { + return null; + } + return row.getDate(index); + }; + break; + + case INTERVAL: + ret[i] = + row -> { + if (row.isNull(index)) { + return null; + } + return row.getInterval(index); + }; + break; + + case JSONB: + ret[i] = + row -> { + if (row.isNull(index)) { + return null; + } + return row.getJsonb(index); + }; + break; + + case BYTEA: + ret[i] = + row -> { + if (row.isNull(index)) { + return null; + } + return row.getBytea(index); + }; + break; default: throw io.grpc.Status.INVALID_ARGUMENT .withDescription("unsupported type " + typeName) diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java index ca28bb1cd4185..872be3ee53aa1 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/jdbc/JDBCSinkTest.java @@ -17,11 +17,14 @@ import static org.junit.Assert.*; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.risingwave.connector.JDBCSink; import com.risingwave.connector.JDBCSinkConfig; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkRow; +import com.risingwave.proto.Data.DataType.TypeName; import com.risingwave.proto.Data.Op; +import java.io.ByteArrayInputStream; import java.sql.*; import org.junit.Test; import org.testcontainers.containers.JdbcDatabaseContainer; @@ -29,28 +32,68 @@ import org.testcontainers.containers.PostgreSQLContainer; public class JDBCSinkTest { - static void createMockTable(String jdbcUrl, String tableName) throws SQLException { + private enum TestType { + TestPg, + TestMySQL, + } + + private static final String pgCreateStmt = + "CREATE TABLE %s (id INT PRIMARY KEY, v_varchar VARCHAR(255), v_date DATE, v_time TIME, v_timestamp TIMESTAMP, v_jsonb JSONB, v_bytea BYTEA)"; + private static final String mysqlCreateStmt = + "CREATE TABLE %s (id INT PRIMARY KEY, v_varchar VARCHAR(255), v_date DATE, v_time TIME, v_timestamp TIMESTAMP, v_jsonb JSON, v_bytea BLOB)"; + + static void createMockTable(String jdbcUrl, String tableName, TestType testType) + throws SQLException { Connection conn = DriverManager.getConnection(jdbcUrl); conn.setAutoCommit(false); Statement stmt = conn.createStatement(); stmt.execute("DROP TABLE IF EXISTS " + tableName); - stmt.execute("create table " + tableName + " (id int primary key, name varchar(255))"); + if (testType == TestType.TestPg) { + stmt.execute(String.format(pgCreateStmt, tableName)); + } else { + stmt.execute(String.format(mysqlCreateStmt, tableName)); + } conn.commit(); conn.close(); } - static void testJDBCSync(JdbcDatabaseContainer container) throws SQLException { - String tableName = "test"; - createMockTable(container.getJdbcUrl(), tableName); + static TableSchema getTestTableSchema() { + return new TableSchema( + Lists.newArrayList( + "id", "v_varchar", "v_date", "v_time", "v_timestamp", "v_jsonb", "v_bytea"), + Lists.newArrayList( + TypeName.INT32, + TypeName.VARCHAR, + TypeName.DATE, + TypeName.TIME, + TypeName.TIMESTAMP, + TypeName.JSONB, + TypeName.BYTEA), + Lists.newArrayList("id")); + } + static void testJDBCSync(JdbcDatabaseContainer container, TestType testType) + throws SQLException { + String tableName = "test"; + createMockTable(container.getJdbcUrl(), tableName, testType); JDBCSink sink = new JDBCSink( new JDBCSinkConfig(container.getJdbcUrl(), tableName, "upsert"), - TableSchema.getMockTableSchema()); + getTestTableSchema()); assertEquals(tableName, sink.getTableName()); Connection conn = sink.getConn(); - sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 1, "Alice"))); + sink.write( + Iterators.forArray( + new ArraySinkRow( + Op.INSERT, + 1, + "Alice", + new Date(1000000000), + new Time(1000000000), + new Timestamp(1000000000), + "{\"key\": \"password\", \"value\": \"Singularity123\"}", + new ByteArrayInputStream("I want to sleep".getBytes())))); sink.sync(); Statement stmt = conn.createStatement(); @@ -61,7 +104,17 @@ static void testJDBCSync(JdbcDatabaseContainer container) throws SQLException } assertEquals(1, count); - sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 2, "Bob"))); + sink.write( + Iterators.forArray( + new ArraySinkRow( + Op.INSERT, + 2, + "Bob", + new Date(1000000000), + new Time(1000000000), + new Timestamp(1000000000), + "{\"key\": \"password\", \"value\": \"Singularity123\"}", + new ByteArrayInputStream("I want to sleep".getBytes())))); sink.sync(); stmt = conn.createStatement(); rs = stmt.executeQuery("SELECT * FROM test"); @@ -74,24 +127,65 @@ static void testJDBCSync(JdbcDatabaseContainer container) throws SQLException sink.drop(); } - static void testJDBCWrite(JdbcDatabaseContainer container) throws SQLException { + static void testJDBCWrite(JdbcDatabaseContainer container, TestType testType) + throws SQLException { String tableName = "test"; - createMockTable(container.getJdbcUrl(), tableName); + createMockTable(container.getJdbcUrl(), tableName, testType); JDBCSink sink = new JDBCSink( new JDBCSinkConfig(container.getJdbcUrl(), tableName, "upsert"), - TableSchema.getMockTableSchema()); + getTestTableSchema()); assertEquals(tableName, sink.getTableName()); Connection conn = sink.getConn(); sink.write( Iterators.forArray( - new ArraySinkRow(Op.INSERT, 1, "Alice"), - new ArraySinkRow(Op.INSERT, 2, "Bob"), - new ArraySinkRow(Op.UPDATE_DELETE, 1, "Alice"), - new ArraySinkRow(Op.UPDATE_INSERT, 1, "Clare"), - new ArraySinkRow(Op.DELETE, 2, "Bob"))); + new ArraySinkRow( + Op.INSERT, + 1, + "Alice", + new Date(1000000000), + new Time(1000000000), + new Timestamp(1000000000), + "{\"key\": \"password\", \"value\": \"Singularity123\"}", + new ByteArrayInputStream("I want to sleep".getBytes())), + new ArraySinkRow( + Op.INSERT, + 2, + "Bob", + new Date(1000000000), + new Time(1000000000), + new Timestamp(1000000000), + "{\"key\": \"password\", \"value\": \"Singularity123\"}", + new ByteArrayInputStream("I want to sleep".getBytes())), + new ArraySinkRow( + Op.UPDATE_DELETE, + 1, + "Alice", + new Date(1000000000), + new Time(1000000000), + new Timestamp(1000000000), + "{\"key\": \"password\", \"value\": \"Singularity123\"}", + new ByteArrayInputStream("I want to sleep".getBytes())), + new ArraySinkRow( + Op.UPDATE_INSERT, + 1, + "Clare", + new Date(2000000000), + new Time(2000000000), + new Timestamp(2000000000), + "{\"key\": \"password\", \"value\": \"Singularity123123123123\"}", + new ByteArrayInputStream("I want to eat".getBytes())), + new ArraySinkRow( + Op.DELETE, + 2, + "Bob", + new Date(1000000000), + new Time(1000000000), + new Timestamp(1000000000), + "{\"key\": \"password\", \"value\": \"Singularity123\"}", + new ByteArrayInputStream("I want to sleep".getBytes())))); sink.sync(); Statement stmt = conn.createStatement(); @@ -101,20 +195,27 @@ static void testJDBCWrite(JdbcDatabaseContainer container) throws SQLExceptio // check if rows are inserted assertEquals(1, rs.getInt(1)); assertEquals("Clare", rs.getString(2)); + assertEquals(new Date(2000000000).toString(), rs.getDate(3).toString()); + assertEquals(new Time(2000000000).toString(), rs.getTime(4).toString()); + assertEquals(new Timestamp(2000000000), rs.getTimestamp(5)); + assertEquals( + "{\"key\": \"password\", \"value\": \"Singularity123123123123\"}", rs.getString(6)); + assertEquals("I want to eat", new String(rs.getBytes(7))); assertFalse(rs.next()); sink.sync(); stmt.close(); } - static void testJDBCDrop(JdbcDatabaseContainer container) throws SQLException { + static void testJDBCDrop(JdbcDatabaseContainer container, TestType testType) + throws SQLException { String tableName = "test"; - createMockTable(container.getJdbcUrl(), tableName); + createMockTable(container.getJdbcUrl(), tableName, testType); JDBCSink sink = new JDBCSink( new JDBCSinkConfig(container.getJdbcUrl(), tableName, "upsert"), - TableSchema.getMockTableSchema()); + getTestTableSchema()); assertEquals(tableName, sink.getTableName()); Connection conn = sink.getConn(); sink.drop(); @@ -136,9 +237,9 @@ public void testPostgres() throws SQLException { .withUrlParam("user", "postgres") .withUrlParam("password", "password"); pg.start(); - testJDBCSync(pg); - testJDBCWrite(pg); - testJDBCDrop(pg); + testJDBCSync(pg, TestType.TestPg); + testJDBCWrite(pg, TestType.TestPg); + testJDBCDrop(pg, TestType.TestPg); pg.stop(); } @@ -153,9 +254,9 @@ public void testMySQL() throws SQLException { .withUrlParam("user", "postgres") .withUrlParam("password", "password"); mysql.start(); - testJDBCSync(mysql); - testJDBCWrite(mysql); - testJDBCDrop(mysql); + testJDBCSync(mysql, TestType.TestMySQL); + testJDBCWrite(mysql, TestType.TestMySQL); + testJDBCDrop(mysql, TestType.TestMySQL); mysql.stop(); } } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 621178b3dc380..9a74935564872 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -19,15 +19,23 @@ import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.proto.Data; import io.grpc.Status; +import java.io.InputStream; import java.sql.*; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.postgresql.util.PGInterval; +import org.postgresql.util.PGobject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +enum DatabaseType { + MYSQL, + POSTGRES, +} + public class JDBCSink extends SinkBase { public static final String INSERT_TEMPLATE = "INSERT INTO %s (%s) VALUES (%s)"; private static final String DELETE_TEMPLATE = "DELETE FROM %s WHERE %s"; @@ -37,6 +45,7 @@ public class JDBCSink extends SinkBase { private final JDBCSinkConfig config; private final Connection conn; private final List pkColumnNames; + private final DatabaseType targetDbType; public static final String JDBC_COLUMN_NAME_KEY = "COLUMN_NAME"; private String updateDeleteConditionBuffer; @@ -47,6 +56,17 @@ public class JDBCSink extends SinkBase { public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { super(tableSchema); + var jdbcUrl = config.getJdbcUrl().toLowerCase(); + if (jdbcUrl.startsWith("jdbc:mysql")) { + this.targetDbType = DatabaseType.MYSQL; + } else if (jdbcUrl.startsWith("jdbc:postgresql")) { + this.targetDbType = DatabaseType.POSTGRES; + } else { + throw Status.INVALID_ARGUMENT + .withDescription("Unsupported jdbc url: " + jdbcUrl) + .asRuntimeException(); + } + this.config = config; try { this.conn = DriverManager.getConnection(config.getJdbcUrl()); @@ -90,12 +110,7 @@ private PreparedStatement prepareStatement(SinkRow row) { String.format( INSERT_TEMPLATE, config.getTableName(), columnsRepr, valuesRepr); try { - PreparedStatement stmt = - conn.prepareStatement(insertStmt, Statement.RETURN_GENERATED_KEYS); - for (int i = 0; i < row.size(); i++) { - stmt.setObject(i + 1, row.get(i)); - } - return stmt; + return generatePreparedStatement(insertStmt, row, null); } catch (SQLException e) { throw io.grpc.Status.INTERNAL .withDescription( @@ -174,14 +189,7 @@ private PreparedStatement prepareStatement(SinkRow row) { updateDeleteConditionBuffer); try { PreparedStatement stmt = - conn.prepareStatement(updateStmt, Statement.RETURN_GENERATED_KEYS); - int placeholderIdx = 1; - for (int i = 0; i < row.size(); i++) { - stmt.setObject(placeholderIdx++, row.get(i)); - } - for (Object value : updateDeleteValueBuffer) { - stmt.setObject(placeholderIdx++, value); - } + generatePreparedStatement(updateStmt, row, updateDeleteValueBuffer); updateDeleteConditionBuffer = null; updateDeleteValueBuffer = null; return stmt; @@ -199,6 +207,56 @@ private PreparedStatement prepareStatement(SinkRow row) { } } + /** + * Generates sql statement for insert/update + * + * @param inputStmt insert/update template string + * @param row column values to fill into statement + * @param updateDeleteValueBuffer pk values for update condition, pass null for insert + * @return prepared sql statement for insert/delete + * @throws SQLException + */ + private PreparedStatement generatePreparedStatement( + String inputStmt, SinkRow row, Object[] updateDeleteValueBuffer) throws SQLException { + PreparedStatement stmt = conn.prepareStatement(inputStmt, Statement.RETURN_GENERATED_KEYS); + var columnNames = getTableSchema().getColumnNames(); + int placeholderIdx = 1; + for (int i = 0; i < row.size(); i++) { + switch (getTableSchema().getColumnType(columnNames[i])) { + case INTERVAL: + if (targetDbType == DatabaseType.POSTGRES) { + stmt.setObject(placeholderIdx++, new PGInterval((String) row.get(i))); + } else { + stmt.setObject(placeholderIdx++, row.get(i)); + } + break; + case JSONB: + if (targetDbType == DatabaseType.POSTGRES) { + // reference: https://github.com/pgjdbc/pgjdbc/issues/265 + var pgObj = new PGobject(); + pgObj.setType("jsonb"); + pgObj.setValue((String) row.get(i)); + stmt.setObject(placeholderIdx++, pgObj); + } else { + stmt.setObject(placeholderIdx++, row.get(i)); + } + break; + case BYTEA: + stmt.setBinaryStream(placeholderIdx++, (InputStream) row.get(i)); + break; + default: + stmt.setObject(placeholderIdx++, row.get(i)); + break; + } + } + if (updateDeleteValueBuffer != null) { + for (Object value : updateDeleteValueBuffer) { + stmt.setObject(placeholderIdx++, value); + } + } + return stmt; + } + @Override public void write(Iterator rows) { while (rows.hasNext()) { diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java index 2e493691ef801..154c038dd9fed 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java @@ -14,6 +14,8 @@ package com.risingwave.java.binding; +import java.io.ByteArrayInputStream; + public class BaseRow implements AutoCloseable { protected final long pointer; private boolean isClosed; @@ -59,10 +61,32 @@ public java.sql.Timestamp getTimestamp(int index) { return Binding.rowGetTimestampValue(pointer, index); } + public java.sql.Time getTime(int index) { + return Binding.rowGetTimeValue(pointer, index); + } + public java.math.BigDecimal getDecimal(int index) { return Binding.rowGetDecimalValue(pointer, index); } + public java.sql.Date getDate(int index) { + return Binding.rowGetDateValue(pointer, index); + } + + // string representation of interval: "2 mons 3 days 00:00:00.000004" or "P1Y2M3DT4H5M6.789123S" + public String getInterval(int index) { + return Binding.rowGetIntervalValue(pointer, index); + } + + // string representation of jsonb: '{"key": "value"}' + public String getJsonb(int index) { + return Binding.rowGetJsonbValue(pointer, index); + } + + public ByteArrayInputStream getBytea(int index) { + return Binding.rowGetByteaValue(pointer, index); + } + @Override public void close() { if (!isClosed) { diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index 1fcaa659db577..d3bfe8359a432 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -14,6 +14,8 @@ package com.risingwave.java.binding; +import java.io.ByteArrayInputStream; + public class Binding { static { System.loadLibrary("risingwave_java_binding"); @@ -57,6 +59,16 @@ public class Binding { static native java.math.BigDecimal rowGetDecimalValue(long pointer, int index); + static native java.sql.Time rowGetTimeValue(long pointer, int index); + + static native java.sql.Date rowGetDateValue(long pointer, int index); + + static native String rowGetIntervalValue(long pointer, int index); + + static native String rowGetJsonbValue(long pointer, int index); + + static native ByteArrayInputStream rowGetByteaValue(long pointer, int index); + // Since the underlying rust does not have garbage collection, we will have to manually call // close on the row to release the row instance pointed by the pointer. static native void rowClose(long pointer); diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 99229bfac7603..cd12818c02e1c 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -31,7 +31,8 @@ use serde_json::{json, Map, Value}; use tracing::warn; use super::{ - Sink, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + Sink, SinkError, TimestampHandlingMode, SINK_TYPE_APPEND_ONLY, SINK_TYPE_DEBEZIUM, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::common::KafkaCommon; use crate::sink::{datum_to_json_object, record_to_json, Result}; @@ -256,7 +257,7 @@ impl KafkaSink { "schema": schema_to_json(schema), "payload": { "before": null, - "after": record_to_json(row, &schema.fields)?, + "after": record_to_json(row, &schema.fields, TimestampHandlingMode::Milli)?, "op": "c", "ts_ms": ts_ms, "source": source_field, @@ -265,7 +266,7 @@ impl KafkaSink { Op::Delete => Some(json!({ "schema": schema_to_json(schema), "payload": { - "before": record_to_json(row, &schema.fields)?, + "before": record_to_json(row, &schema.fields, TimestampHandlingMode::Milli)?, "after": null, "op": "d", "ts_ms": ts_ms, @@ -273,7 +274,11 @@ impl KafkaSink { } })), Op::UpdateDelete => { - update_cache = Some(record_to_json(row, &schema.fields)?); + update_cache = Some(record_to_json( + row, + &schema.fields, + TimestampHandlingMode::Milli, + )?); continue; } Op::UpdateInsert => { @@ -282,7 +287,7 @@ impl KafkaSink { "schema": schema_to_json(schema), "payload": { "before": before, - "after": record_to_json(row, &schema.fields)?, + "after": record_to_json(row, &schema.fields, TimestampHandlingMode::Milli)?, "op": "u", "ts_ms": ts_ms, "source": source_field, @@ -324,15 +329,27 @@ impl KafkaSink { let schema = &self.schema; for (op, row) in chunk.rows() { let event_object = match op { - Op::Insert => Some(Value::Object(record_to_json(row, &schema.fields)?)), + Op::Insert => Some(Value::Object(record_to_json( + row, + &schema.fields, + TimestampHandlingMode::Milli, + )?)), Op::Delete => Some(Value::Null), Op::UpdateDelete => { - update_cache = Some(record_to_json(row, &schema.fields)?); + update_cache = Some(record_to_json( + row, + &schema.fields, + TimestampHandlingMode::Milli, + )?); continue; } Op::UpdateInsert => { if update_cache.take().is_some() { - Some(Value::Object(record_to_json(row, &schema.fields)?)) + Some(Value::Object(record_to_json( + row, + &schema.fields, + TimestampHandlingMode::Milli, + )?)) } else { warn!( "not found UpdateDelete in prev row, skipping, row index {:?}", @@ -360,7 +377,12 @@ impl KafkaSink { async fn append_only(&self, chunk: StreamChunk) -> Result<()> { for (op, row) in chunk.rows() { if op == Op::Insert { - let record = Value::Object(record_to_json(row, &self.schema.fields)?).to_string(); + let record = Value::Object(record_to_json( + row, + &self.schema.fields, + TimestampHandlingMode::Milli, + )?) + .to_string(); self.send( BaseRecord::to(self.config.common.topic.as_str()) .key(self.gen_message_key().as_bytes()) @@ -450,7 +472,7 @@ fn pk_to_json( for idx in pk_indices { let field = &schema[*idx]; let key = field.name.clone(); - let value = datum_to_json_object(field, row.datum_at(*idx)) + let value = datum_to_json_object(field, row.datum_at(*idx), TimestampHandlingMode::Milli) .map_err(|e| SinkError::JsonParse(e.to_string()))?; mappings.insert(key, value); } @@ -460,7 +482,11 @@ fn pk_to_json( pub fn chunk_to_json(chunk: StreamChunk, schema: &Schema) -> Result> { let mut records: Vec = Vec::with_capacity(chunk.capacity()); for (_, row) in chunk.rows() { - let record = Value::Object(record_to_json(row, &schema.fields)?); + let record = Value::Object(record_to_json( + row, + &schema.fields, + TimestampHandlingMode::Milli, + )?); records.push(record.to_string()); } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 7a140e7ccbf30..d72af118e71c2 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -21,6 +21,8 @@ use std::collections::HashMap; use anyhow::anyhow; use async_trait::async_trait; +use base64::engine::general_purpose; +use base64::Engine as _; use chrono::{Datelike, NaiveDateTime, Timelike}; use enum_as_inner::EnumAsInner; use risingwave_common::array::{ArrayError, ArrayResult, RowRef, StreamChunk}; @@ -259,18 +261,32 @@ impl From for RwError { } } -pub fn record_to_json(row: RowRef<'_>, schema: &[Field]) -> Result> { +#[derive(Clone, Copy)] +pub enum TimestampHandlingMode { + Milli, + String, +} + +pub fn record_to_json( + row: RowRef<'_>, + schema: &[Field], + timestamp_handling_mode: TimestampHandlingMode, +) -> Result> { let mut mappings = Map::with_capacity(schema.len()); for (field, datum_ref) in schema.iter().zip_eq_fast(row.iter()) { let key = field.name.clone(); - let value = datum_to_json_object(field, datum_ref) + let value = datum_to_json_object(field, datum_ref, timestamp_handling_mode) .map_err(|e| SinkError::JsonParse(e.to_string()))?; mappings.insert(key, value); } Ok(mappings) } -fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult { +fn datum_to_json_object( + field: &Field, + datum: DatumRef<'_>, + timestamp_handling_mode: TimestampHandlingMode, +) -> ArrayResult { let scalar_ref = match datum { None => return Ok(Value::Null), Some(datum) => datum, @@ -321,22 +337,27 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult { json!(v.0.num_days_from_ce()) } - (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => { - json!(v.0.timestamp_millis()) - } + (DataType::Timestamp, ScalarRefImpl::Timestamp(v)) => match timestamp_handling_mode { + TimestampHandlingMode::Milli => json!(v.0.timestamp_millis()), + TimestampHandlingMode::String => json!(v.0.format("%Y-%m-%d %H:%M:%S%.6f").to_string()), + }, (DataType::Bytea, ScalarRefImpl::Bytea(v)) => { - json!(hex::encode(v)) + json!(general_purpose::STANDARD_NO_PAD.encode(v)) } // PYMDTHMS (DataType::Interval, ScalarRefImpl::Interval(v)) => { json!(v.as_iso_8601()) } + (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => { + json!(jsonb_ref.to_string()) + } (DataType::List(datatype), ScalarRefImpl::List(list_ref)) => { let elems = list_ref.iter(); let mut vec = Vec::with_capacity(elems.len()); let inner_field = Field::unnamed(Box::::into_inner(datatype)); for sub_datum_ref in elems { - let value = datum_to_json_object(&inner_field, sub_datum_ref)?; + let value = + datum_to_json_object(&inner_field, sub_datum_ref, timestamp_handling_mode)?; vec.push(value); } json!(vec) @@ -349,7 +370,8 @@ fn datum_to_json_object(field: &Field, datum: DatumRef<'_>) -> ArrayResult RemoteSink { | DataType::Decimal | DataType::Timestamp | DataType::Varchar + | DataType::Date + | DataType::Time + | DataType::Interval + | DataType::Jsonb + | DataType::Bytea ) { Ok( Column { name: column.column_desc.name.clone(), @@ -211,7 +216,7 @@ impl RemoteSink { }) } else { Err(SinkError::Remote(format!( - "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Timestamp and Varchar, got {:?}: {:?}", + "remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Bytea and Varchar, got {:?}: {:?}", column.column_desc.name, column.column_desc.data_type ))) @@ -299,7 +304,11 @@ impl Sink for RemoteSink { SinkPayloadFormat::Json => { let mut row_ops = vec![]; for (op, row_ref) in chunk.rows() { - let map = record_to_json(row_ref, &self.schema.fields)?; + let map = record_to_json( + row_ref, + &self.schema.fields, + TimestampHandlingMode::String, + )?; let row_op = RowOp { op_type: op.to_protobuf() as i32, line: serde_json::to_string(&map) diff --git a/src/java_binding/src/lib.rs b/src/java_binding/src/lib.rs index 49211b8005395..6e230348258bf 100644 --- a/src/java_binding/src/lib.rs +++ b/src/java_binding/src/lib.rs @@ -28,8 +28,11 @@ use std::slice::from_raw_parts; use std::sync::{Arc, LazyLock}; use hummock_iterator::{HummockJavaBindingIterator, KeyedRow}; -use jni::objects::{AutoArray, GlobalRef, JClass, JMethodID, JObject, JString, ReleaseMode}; -use jni::sys::{jboolean, jbyte, jbyteArray, jdouble, jfloat, jint, jlong, jshort}; +use jni::objects::{ + AutoArray, GlobalRef, JClass, JMethodID, JObject, JStaticMethodID, JString, JValue, ReleaseMode, +}; +use jni::signature::ReturnType; +use jni::sys::{jboolean, jbyte, jbyteArray, jdouble, jfloat, jint, jlong, jshort, jvalue}; use jni::JNIEnv; use once_cell::sync::OnceCell; use prost::{DecodeError, Message}; @@ -229,7 +232,11 @@ pub enum JavaBindingRowInner { #[derive(Default)] pub struct JavaClassMethodCache { big_decimal_ctor: OnceCell<(GlobalRef, JMethodID)>, + byte_array_input_stream_ctor: OnceCell<(GlobalRef, JMethodID)>, timestamp_ctor: OnceCell<(GlobalRef, JMethodID)>, + + date_ctor: OnceCell<(GlobalRef, JStaticMethodID)>, + time_ctor: OnceCell<(GlobalRef, JStaticMethodID)>, } pub struct JavaBindingRow { @@ -491,6 +498,40 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetStringValu }) } +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetIntervalValue<'a>( + env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingRow>, + idx: jint, +) -> JString<'a> { + execute_and_catch(env, move || { + let interval = pointer + .as_ref() + .datum_at(idx as usize) + .unwrap() + .into_interval() + .to_string(); + Ok(env.new_string(interval)?) + }) +} + +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetJsonbValue<'a>( + env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingRow>, + idx: jint, +) -> JString<'a> { + execute_and_catch(env, move || { + let jsonb = pointer + .as_ref() + .datum_at(idx as usize) + .unwrap() + .into_jsonb() + .to_string(); + Ok(env.new_string(jsonb)?) + }) +} + #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimestampValue<'a>( env: EnvParam<'a>, @@ -552,6 +593,125 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDecimalVal }) } +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDateValue<'a>( + env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingRow>, + idx: jint, +) -> JObject<'a> { + execute_and_catch(env, move || { + let value = pointer + .as_ref() + .datum_at(idx as usize) + .unwrap() + .into_date() + .0 + .to_string(); + + let string_value = env.new_string(value)?; + let (class_ref, constructor) = + pointer.as_ref().class_cache.date_ctor.get_or_try_init(|| { + let cls = env.find_class("java/sql/Date")?; + let init_method = env.get_static_method_id( + cls, + "valueOf", + "(Ljava/lang/String;)Ljava/sql/Date;", + )?; + Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method)) + })?; + let class = JClass::from(class_ref.as_obj()); + let JValue::Object(date_obj) = env.call_static_method_unchecked( + class, + *constructor, + ReturnType::Object, + &[jvalue::from(JValue::from(string_value))], + )? else { + return Err(BindingError::from(jni::errors::Error::MethodNotFound { + name: "valueOf".to_string(), + sig: "(Ljava/lang/String;)Ljava/sql/Date;".into(), + })); + }; + Ok(date_obj) + }) +} + +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimeValue<'a>( + env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingRow>, + idx: jint, +) -> JObject<'a> { + execute_and_catch(env, move || { + let value = pointer + .as_ref() + .datum_at(idx as usize) + .unwrap() + .into_time() + .0 + .to_string(); + + let string_value = env.new_string(value)?; + let (class_ref, constructor) = + pointer.as_ref().class_cache.time_ctor.get_or_try_init(|| { + let cls = env.find_class("java/sql/Time")?; + let init_method = env.get_static_method_id( + cls, + "valueOf", + "(Ljava/lang/String;)Ljava/sql/Time;", + )?; + Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method)) + })?; + let class = JClass::from(class_ref.as_obj()); + let JValue::Object(obj) = env.call_static_method_unchecked( + class, + *constructor, + ReturnType::Object, + &[jvalue::from(JValue::from(string_value))], + )? else { + return Err(BindingError::from(jni::errors::Error::MethodNotFound { + name: "valueOf".to_string(), + sig: "(Ljava/lang/String;)Ljava/sql/Time;".into(), + })); + }; + Ok(obj) + }) +} + +#[no_mangle] +pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetByteaValue<'a>( + env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingRow>, + idx: jint, +) -> JObject<'a> { + execute_and_catch(env, move || { + let bytes = pointer + .as_ref() + .datum_at(idx as usize) + .unwrap() + .into_bytea(); + let bytes_value = env.byte_array_from_slice(bytes)?; + let (ts_class_ref, constructor) = pointer + .as_ref() + .class_cache + .byte_array_input_stream_ctor + .get_or_try_init(|| { + let cls = env.find_class("java/io/ByteArrayInputStream")?; + let init_method = env.get_method_id(cls, "", "([B)V")?; + Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method)) + })?; + let ts_class = JClass::from(ts_class_ref.as_obj()); + unsafe { + let input_stream_obj = env.new_object_unchecked( + ts_class, + *constructor, + &[JValue::Object(JObject::from_raw(bytes_value))], + )?; + + Ok(input_stream_obj) + } + }) +} + #[no_mangle] pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowClose<'a>( _env: EnvParam<'a>,