Skip to content

BIP 2: Support upserting oracle table

PENG Zhengshuai edited this page Apr 28, 2022 · 2 revisions

Status: MERGED

Author: @chncaesar

Contributor: @AdmondGuo

Date: 2022.03.23

Issue: #1772

Pull Requests: #1722


背景

数据处理时,更新(upsert) RDBMS 表是比较普遍的场景。目前,Byzer-lang 能够更新 MySQL 表。代码save append result_set as jdbc.db.table_1 where idCol="c1"result_set 有如下数据

c1 c2
1 "a_updated"
2 "b"

table_1 有如下记录

c1 c2
1 "a"

执行时,比较 c1 字段,若值已经存在,则更新其他字段;否则插入一条新记录。完成后,table_1 变为

c1 c2
1 "a_updated"
2 "b"

我们希望 Byzer-lang 具备更新(upsert) Oracle 表的能力。

设计

各家 RDBMS 提供了不同 sql 实现 upsert 语义。Oracle 提供了 merge into 语句。 因而,byzer-lang 将结果集转化为 n 条 merge into 语句,通过 JDBC 协议提交至Oracle,提交或者回滚事务。

代码实现上,DataFrameWriterExtensions 利用 scala 隐式能力扩展 DataFrameWriter 功能。UpsertUtils 已经实现了 upsert 时 不同数据分片 JDBC 连接和事务控制。

我们需要扩展 UpsertBuilder,生成 Oracle merge into, 并注册该类。类图如下:

image

使用方式

这里有 2 点要注意:

  1. 事务控制
  2. 结果集和表 schema 对齐

事务控制

Byzer-lang 是一个分布式应用,每个数据分片有独立数据库连接,可能导致脏数据。强事务要求的场景,将数据分片改为1,牺牲一点性能。 对于每个数据分片,batchSize 条数据提交一次事务,若有异常,则回滚。为了避免脏数据,batchSize 需要大于等于结果集条数。

schema 对齐

要求结果集字段名称和数据类型与目标表一致,否则更新失败。例如以下语句执行报错,因为 concat(c2, "upd") 没有显式声明字段名。

select c1, concat(c2,"upd") as updated;

save append updated as jdbc.`db1.table1`;

修改为

select c1, concat(c2,"upd") c2 as updated;
save append updated as jdbc.`db1.table1`;