-
Notifications
You must be signed in to change notification settings - Fork 548
BIP 2: Support upserting oracle table
Status: MERGED
Author: @chncaesar
Contributor: @AdmondGuo
Date: 2022.03.23
Issue: #1772
Pull Requests: #1722
数据处理时,更新(upsert) RDBMS 表是比较普遍的场景。目前,Byzer-lang 能够更新 MySQL 表。代码save append result_set as jdbc.
db.table_1 where idCol="c1"
。
result_set
有如下数据
c1 | c2 |
---|---|
1 | "a_updated" |
2 | "b" |
table_1
有如下记录
c1 | c2 |
---|---|
1 | "a" |
执行时,比较 c1 字段,若值已经存在,则更新其他字段;否则插入一条新记录。完成后,table_1
变为
c1 | c2 |
---|---|
1 | "a_updated" |
2 | "b" |
我们希望 Byzer-lang 具备更新(upsert) Oracle 表的能力。
各家 RDBMS 提供了不同 sql
实现 upsert
语义。Oracle 提供了 merge into 语句。
因而,byzer-lang 将结果集转化为 n 条 merge into
语句,通过 JDBC 协议提交至Oracle,提交或者回滚事务。
代码实现上,DataFrameWriterExtensions
利用 scala 隐式能力扩展 DataFrameWriter
功能。UpsertUtils
已经实现了 upsert
时 不同数据分片 JDBC 连接和事务控制。
我们需要扩展 UpsertBuilder
,生成 Oracle merge into
, 并注册该类。类图如下:
这里有 2 点要注意:
- 事务控制
- 结果集和表 schema 对齐
Byzer-lang 是一个分布式应用,每个数据分片有独立数据库连接,可能导致脏数据。强事务要求的场景,将数据分片改为1,牺牲一点性能。
对于每个数据分片,batchSize
条数据提交一次事务,若有异常,则回滚。为了避免脏数据,batchSize
需要大于等于结果集条数。
要求结果集字段名称和数据类型与目标表一致,否则更新失败。例如以下语句执行报错,因为 concat(c2, "upd")
没有显式声明字段名。
select c1, concat(c2,"upd") as updated;
save append updated as jdbc.`db1.table1`;
修改为
select c1, concat(c2,"upd") c2 as updated;
save append updated as jdbc.`db1.table1`;