Skip to content

Commit

Permalink
feat(sink): add date, time, interval, bytea and jsonb data types for …
Browse files Browse the repository at this point in the history
…PG/MySQL sink of stream_chunk/json payload (#9957)

Co-authored-by: weili <[email protected]>
Co-authored-by: WillyKidd <[email protected]>
  • Loading branch information
3 people authored and lmatz committed Jun 1, 2023
1 parent fd246bd commit f23e99c
Show file tree
Hide file tree
Showing 18 changed files with 749 additions and 88 deletions.
17 changes: 13 additions & 4 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node
mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXISTS test;"
# grant access to `test` for ci test user
mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test.* TO 'mysqluser'@'%';"
# create a table named t_remote
# creates two table named t_remote_0, t_remote_1
mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql

echo "--- preparing postgresql"
Expand Down Expand Up @@ -92,10 +92,19 @@ sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
sleep 1

# check sink destination mysql using shell
diff -u ./e2e_test/sink/remote/mysql_expected_result.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote ORDER BY id")
diff -u ./e2e_test/sink/remote/mysql_expected_result_0.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote_0 ORDER BY id")
if [ $? -eq 0 ]; then
echo "mysql sink check passed"
echo "mysql sink check 0 passed"
else
echo "The output is not as expected."
exit 1
fi

diff -u ./e2e_test/sink/remote/mysql_expected_result_1.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT id, v_varchar, v_text, v_integer, v_smallint, v_bigint, v_decimal, v_real, v_double, v_boolean, v_date, v_time, v_timestamp, v_jsonb, TO_BASE64(v_bytea) FROM test.t_remote_1 ORDER BY id")
if [ $? -eq 0 ]; then
echo "mysql sink check 1 passed"
else
echo "The output is not as expected."
exit 1
Expand Down
11 changes: 10 additions & 1 deletion e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
# the check is run on sink destination postgres database

query I
select * from t_remote order by id;
select * from t_remote_0 order by id;
----
1 Alex 28208 281620391 4986480304337356659 28162.0391 2.03 28162.0391 2023-03-20 10:18:30
3 Carl 18300 1702307129 7878292368468104216 17023.07129 23.07 17023.07129 2023-03-20 10:18:32
4 Doris 17250 151951802 3946135584462581863 1519518.02 18.02 1519518.02 2023-03-21 10:18:30
5 Eve 9725 698160808 524334216698825611 69.8160808 69.81 69.8160808 2023-03-21 10:18:31
6 Frank 28131 1233587627 8492820454814063326 123358.7627 58.76 123358.7627 2023-03-21 10:18:32

query II
select * from t_remote_1 order by id;
----
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 t 2023-05-22 12:34:56 2023-05-22 12:34:56 {"key": "value"} \xdeadbeef
3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 t 2023-05-24 12:34:56 2023-05-24 12:34:56 {"key": "value3"} \xcafebabe
4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 f 2023-05-25 23:45:01 2023-05-25 23:45:01 {"key": "value4"} \xbabec0de
5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 t 2023-05-26 12:34:56 2023-05-26 12:34:56 {"key": "value5"} \xdeadbabe
6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 f 2023-05-27 23:45:01 2023-05-27 23:45:01 {"key": "value6"} \xdeadbabe
96 changes: 82 additions & 14 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
create table t_remote (
create table t_remote_0 (
id integer primary key,
v_varchar varchar,
v_smallint smallint,
Expand All @@ -12,59 +12,127 @@ create table t_remote (
);

statement ok
create materialized view mv_remote as select * from t_remote;
CREATE TABLE t_remote_1 (
id BIGINT PRIMARY KEY,
v_varchar VARCHAR,
v_text TEXT,
v_integer INTEGER,
v_smallint SMALLINT,
v_bigint BIGINT,
v_decimal DECIMAL,
v_real REAL,
v_double DOUBLE PRECISION,
v_boolean BOOLEAN,
v_date DATE,
v_time TIME,
v_timestamp TIMESTAMP,
v_jsonb JSONB,
v_bytea BYTEA
);

statement ok
create materialized view mv_remote_0 as select * from t_remote_0;

statement ok
create materialized view mv_remote_1 as select * from t_remote_1;

statement ok
CREATE SINK s_postgres FROM mv_remote WITH (
CREATE SINK s_postgres_0 FROM mv_remote_0 WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='t_remote',
table.name='t_remote_0',
type='upsert'
);

statement ok
CREATE SINK s_postgres_1 FROM mv_remote_1 WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='t_remote_1',
type='upsert'
);

statement ok
CREATE SINK s_mysql_0 FROM mv_remote_0 WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
table.name='t_remote_0',
type='upsert'
);

statement ok
CREATE SINK s_mysql FROM mv_remote WITH (
CREATE SINK s_mysql_1 FROM mv_remote_1 WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
table.name='t_remote',
table.name='t_remote_1',
type='upsert'
);

statement ok
INSERT INTO t_remote VALUES
INSERT INTO t_remote_0 VALUES
(1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'),
(2, 'Bob', 10580, 2131030003, 3074255027698877876, 21310.30003, 10.3, 21310.30003, '2023-03-20 10:18:31'),
(3, 'Carl', 18300, 1702307129, 7878292368468104216, 17023.07129, 23.07, 17023.07129, '2023-03-20 10:18:32');

statement ok
INSERT INTO t_remote VALUES
INSERT INTO t_remote_0 VALUES
(4, 'Doris', 17250, 151951802, 3946135584462581863, 1519518.02, 18.02, 1519518.02, '2023-03-21 10:18:30'),
(5, 'Eve', 9725, 698160808, 524334216698825611, 69.8160808, 69.81, 69.8160808, '2023-03-21 10:18:31'),
(6, 'Frank', 28131, 1233587627, 8492820454814063326, 123358.7627, 58.76, 123358.7627, '2023-03-21 10:18:32');

statement ok
INSERT INTO t_remote_1 VALUES
(1, 'Varchar value 1', 'Text value 1', 123, 456, 789, 12.34, 56.78, 90.12, TRUE, '2023-05-22', '12:34:56', '2023-05-22 12:34:56', '{"key": "value"}', E'\\xDEADBEEF'),
(2, 'Varchar value 2', 'Text value 2', 234, 567, 890, 23.45, 67.89, 01.23, FALSE, '2023-05-23', '23:45:01', '2023-05-23 23:45:01', '{"key": "value2"}', E'\\xFEEDBEEF'),
(3, 'Varchar value 3', 'Text value 3', 345, 678, 901, 34.56, 78.90, 12.34, TRUE, '2023-05-24', '12:34:56', '2023-05-24 12:34:56', '{"key": "value3"}', E'\\xCAFEBABE');

statement ok
INSERT INTO t_remote_1 VALUES
(4, 'Varchar value 4', 'Text value 4', 456, 789, 012, 45.67, 89.01, 23.45, FALSE, '2023-05-25', '23:45:01', '2023-05-25 23:45:01', '{"key": "value4"}', E'\\xBABEC0DE'),
(5, 'Varchar value 5', 'Text value 5', 567, 890, 123, 56.78, 90.12, 34.56, TRUE, '2023-05-26', '12:34:56', '2023-05-26 12:34:56', '{"key": "value5"}', E'\\xDEADBABE'),
(6, 'Varchar value 6', 'Text value 6', 789, 123, 456, 67.89, 34.56, 78.91, FALSE, '2023-05-27', '23:45:01', '2023-05-27 23:45:01', '{"key": "value6"}', E'\\xDEADBABE');

statement ok
FLUSH;

statement ok
UPDATE t_remote SET v_varchar = 'Alex' WHERE id = 1;
UPDATE t_remote_0 SET v_varchar = 'Alex' WHERE id = 1;

statement ok
UPDATE t_remote_1 SET v_varchar = 'Alex' WHERE id = 1;

statement ok
DELETE FROM t_remote_0 WHERE id = 2;

statement ok
DELETE FROM t_remote WHERE id = 2;
DELETE FROM t_remote_1 WHERE id = 2;

statement ok
FLUSH;

statement ok
DROP SINK s_postgres;
DROP SINK s_postgres_0;

statement ok
DROP SINK s_postgres_1;

statement ok
DROP SINK s_mysql_0;

statement ok
DROP SINK s_mysql_1;

statement ok
DROP MATERIALIZED VIEW mv_remote_0;

statement ok
DROP SINK s_mysql
DROP MATERIALIZED VIEW mv_remote_1;

statement ok
DROP MATERIALIZED VIEW mv_remote;
DROP TABLE t_remote_0;

statement ok
DROP TABLE t_remote;
DROP TABLE t_remote_1;

statement ok
FLUSH;
22 changes: 20 additions & 2 deletions e2e_test/sink/remote/mysql_create_table.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE t_remote (
CREATE TABLE t_remote_0 (
id integer PRIMARY KEY,
v_varchar varchar(100),
v_smallint smallint,
Expand All @@ -8,4 +8,22 @@ CREATE TABLE t_remote (
v_float float,
v_double double,
v_timestamp timestamp
);
);

CREATE TABLE t_remote_1 (
id BIGINT PRIMARY KEY,
v_varchar VARCHAR(255),
v_text TEXT,
v_integer INT,
v_smallint SMALLINT,
v_bigint BIGINT,
v_decimal DECIMAL(10,2),
v_real FLOAT,
v_double DOUBLE,
v_boolean BOOLEAN,
v_date DATE,
v_time TIME,
v_timestamp TIMESTAMP,
v_jsonb JSON,
v_bytea BLOB
);
5 changes: 5 additions & 0 deletions e2e_test/sink/remote/mysql_expected_result_1.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1 Alex Text value 1 123 456 789 12.34 56.78 90.12 1 2023-05-22 12:34:56 2023-05-22 12:34:56 {"key": "value"} 3q2+7w==
3 Varchar value 3 Text value 3 345 678 901 34.56 78.9 12.34 1 2023-05-24 12:34:56 2023-05-24 12:34:56 {"key": "value3"} yv66vg==
4 Varchar value 4 Text value 4 456 789 12 45.67 89.01 23.45 0 2023-05-25 23:45:01 2023-05-25 23:45:01 {"key": "value4"} ur7A3g==
5 Varchar value 5 Text value 5 567 890 123 56.78 90.12 34.56 1 2023-05-26 12:34:56 2023-05-26 12:34:56 {"key": "value5"} 3q26vg==
6 Varchar value 6 Text value 6 789 123 456 67.89 34.56 78.91 0 2023-05-27 23:45:01 2023-05-27 23:45:01 {"key": "value6"} 3q26vg==
22 changes: 20 additions & 2 deletions e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE t_remote (
CREATE TABLE t_remote_0 (
id integer PRIMARY KEY,
v_varchar varchar(100),
v_smallint smallint,
Expand All @@ -8,4 +8,22 @@ CREATE TABLE t_remote (
v_float real,
v_double double precision,
v_timestamp timestamp
);
);

CREATE TABLE t_remote_1 (
id BIGINT PRIMARY KEY,
v_varchar VARCHAR(255),
v_text TEXT,
v_integer INTEGER,
v_smallint SMALLINT,
v_bigint BIGINT,
v_decimal DECIMAL(10,2),
v_real REAL,
v_double DOUBLE PRECISION,
v_boolean BOOLEAN,
v_date DATE,
v_time TIME,
v_timestamp TIMESTAMP,
v_jsonb JSONB,
v_bytea BYTEA
);
2 changes: 1 addition & 1 deletion integration_tests/postgres-cdc/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
- POSTGRES_PASSWORD=123456
- POSTGRES_DB=mydb
ports:
- 5432:5432
- 8432:5432
healthcheck:
test: [ "CMD-SHELL", "pg_isready --username=myuser --dbname=mydb" ]
interval: 5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload;
import com.risingwave.proto.Data;
import java.io.ByteArrayInputStream;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.Base64;
import java.util.Map;

public class JsonDeserializer implements Deserializer {
Expand Down Expand Up @@ -130,6 +135,28 @@ private static BigDecimal castDecimal(Object value) {
}
}

private static Time castTime(Object value) {
try {
Long milli = castLong(value);
return new Time(milli);
} catch (RuntimeException e) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into time from " + value.getClass())
.asRuntimeException();
}
}

private static Date castDate(Object value) {
try {
Long days = castLong(value) - 1;
return Date.valueOf(LocalDate.of(1, 1, 1).plusDays(days));
} catch (RuntimeException e) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into date from " + value.getClass())
.asRuntimeException();
}
}

private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
// value might be null
if (value == null) {
Expand Down Expand Up @@ -171,6 +198,32 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj
.asRuntimeException();
}
return Timestamp.valueOf((String) value);
case TIME:
return castTime(value);
case DATE:
return castDate(value);
case INTERVAL:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected interval, got " + value.getClass())
.asRuntimeException();
}
return value;
case JSONB:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected jsonb, got " + value.getClass())
.asRuntimeException();
}
return value;
case BYTEA:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected bytea, got " + value.getClass())
.asRuntimeException();
}
byte[] bytes = Base64.getDecoder().decode((String) value);
return new ByteArrayInputStream(bytes);
default:
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unsupported type " + typeName)
Expand Down
Loading

0 comments on commit f23e99c

Please sign in to comment.