Skip to content

Commit 1fce6de

Browse files
committed
support load duplicate opt
1 parent 9297a07 commit 1fce6de

File tree

8 files changed

+143
-85
lines changed

8 files changed

+143
-85
lines changed

pkg/pb/plan/plan.pb.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sql/parsers/dialect/mysql/mysql_sql.go

Lines changed: 19 additions & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sql/parsers/dialect/mysql/mysql_sql.y

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ import (
186186
int64Val int64
187187
strs []string
188188

189-
duplicateKey tree.DuplicateKey
189+
loadDuplicateMode tree.LoadDupMode
190190
fields *tree.Fields
191191
fieldsList []*tree.Fields
192192
lines *tree.Lines
@@ -754,7 +754,7 @@ import (
754754
%type <exprs> data_values data_opt row_value
755755

756756
%type <boolVal> local_opt
757-
%type <duplicateKey> duplicate_opt
757+
%type <loadDuplicateMode> duplicate_opt
758758
%type <fields> load_fields field_item export_fields
759759
%type <fieldsList> field_item_list
760760
%type <str> field_terminator starting_opt lines_terminated_opt starting lines_terminated
@@ -1510,7 +1510,7 @@ load_data_stmt:
15101510
$$ = &tree.Load{
15111511
Local: $3,
15121512
Param: $4,
1513-
DuplicateHandling: $5,
1513+
DuplicateOpt: $5,
15141514
Table: $8,
15151515
}
15161516
$$.(*tree.Load).Param.Tail = $9
@@ -1890,23 +1890,23 @@ field_terminator:
18901890

18911891
duplicate_opt:
18921892
{
1893-
$$ = &tree.DuplicateKeyError{}
1893+
$$ = tree.LOAD_DUPLICATE_CHECKING
18941894
}
18951895
| CHECKING
18961896
{
1897-
$$ = &tree.DuplicateKeyError{}
1897+
$$ = tree.LOAD_DUPLICATE_CHECKING
18981898
}
18991899
| NOCHECKING
19001900
{
1901-
$$ = &tree.DuplicateKeyNoChecking{}
1901+
$$ = tree.LOAD_DUPLICATE_NOCHECKING
19021902
}
19031903
| IGNORE
19041904
{
1905-
$$ = &tree.DuplicateKeyIgnore{}
1905+
$$ = tree.LOAD_DUPLICATE_IGNORE
19061906
}
19071907
| REPLACE
19081908
{
1909-
$$ = &tree.DuplicateKeyReplace{}
1909+
$$ = tree.LOAD_DUPLICATE_REPLACE
19101910
}
19111911

19121912
local_opt:

pkg/sql/parsers/tree/update.go

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,22 @@ type TailParameter struct {
200200
Assignments UpdateExprs
201201
}
202202

203+
type LoadDupMode int
204+
205+
const (
206+
LOAD_DUPLICATE_CHECKING LoadDupMode = iota
207+
LOAD_DUPLICATE_NOCHECKING
208+
LOAD_DUPLICATE_IGNORE
209+
LOAD_DUPLICATE_REPLACE
210+
)
211+
203212
// Load data statement
204213
type Load struct {
205214
statementImpl
206-
Local bool
207-
DuplicateHandling DuplicateKey
208-
Table *TableName
209-
Accounts IdentifierList
215+
Local bool
216+
DuplicateOpt LoadDupMode
217+
Table *TableName
218+
Accounts IdentifierList
210219
//Partition
211220
Param *ExternParam
212221
}
@@ -247,15 +256,15 @@ func (node *Load) Format(ctx *FmtCtx) {
247256
}
248257
}
249258

250-
switch node.DuplicateHandling.(type) {
251-
case *DuplicateKeyError:
252-
break
253-
case *DuplicateKeyIgnore:
259+
switch node.DuplicateOpt {
260+
case LOAD_DUPLICATE_CHECKING:
261+
ctx.WriteString(" checking")
262+
case LOAD_DUPLICATE_NOCHECKING:
263+
ctx.WriteString(" noChecking")
264+
case LOAD_DUPLICATE_IGNORE:
254265
ctx.WriteString(" ignore")
255-
case *DuplicateKeyReplace:
266+
case LOAD_DUPLICATE_REPLACE:
256267
ctx.WriteString(" replace")
257-
case *DuplicateKeyNoChecking:
258-
ctx.WriteString(" noChecking")
259268
}
260269
ctx.WriteString(" into table ")
261270
node.Table.Format(ctx)
@@ -345,40 +354,6 @@ func formatS3option(ctx *FmtCtx, option []string) {
345354
func (node *Load) GetStatementType() string { return "Load" }
346355
func (node *Load) GetQueryType() string { return QueryTypeDML }
347356

348-
type DuplicateKey interface{}
349-
350-
type duplicateKeyImpl struct {
351-
DuplicateKey
352-
}
353-
354-
type DuplicateKeyError struct {
355-
duplicateKeyImpl
356-
}
357-
358-
func NewDuplicateKeyError() *DuplicateKeyError {
359-
return &DuplicateKeyError{}
360-
}
361-
362-
type DuplicateKeyReplace struct {
363-
duplicateKeyImpl
364-
}
365-
366-
func NewDuplicateKeyReplace() *DuplicateKeyReplace {
367-
return &DuplicateKeyReplace{}
368-
}
369-
370-
type DuplicateKeyIgnore struct {
371-
duplicateKeyImpl
372-
}
373-
374-
type DuplicateKeyNoChecking struct {
375-
duplicateKeyImpl
376-
}
377-
378-
func NewDuplicateKeyIgnore() *DuplicateKeyIgnore {
379-
return &DuplicateKeyIgnore{}
380-
}
381-
382357
type EscapedBy struct {
383358
Value byte
384359
}

pkg/sql/plan/bind_insert.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,28 @@ func (builder *QueryBuilder) bindInsert(stmt *tree.Insert, bindCtx *BindContext)
6060
return 0, err
6161
}
6262

63-
return builder.appendDedupAndMultiUpdateNodesForBindInsert(bindCtx, dmlCtx, lastNodeID, colName2Idx, skipUniqueIdx, stmt.OnDuplicateUpdate)
63+
var onDupAction plan.Node_OnDuplicateAction
64+
if len(stmt.OnDuplicateUpdate) == 0 {
65+
onDupAction = plan.Node_FAIL
66+
} else if len(stmt.OnDuplicateUpdate) == 1 && stmt.OnDuplicateUpdate[0] == nil {
67+
onDupAction = plan.Node_IGNORE
68+
} else {
69+
onDupAction = plan.Node_UPDATE
70+
}
71+
72+
return builder.appendDedupAndMultiUpdateNodesForBindInsert(bindCtx, dmlCtx, lastNodeID, colName2Idx, skipUniqueIdx, onDupAction, stmt.OnDuplicateUpdate)
6473
}
6574

66-
func (builder *QueryBuilder) canSkipDedup(tableDef *plan.TableDef) bool {
75+
func (builder *QueryBuilder) canSkipDedup(tableDef *plan.TableDef, onDupAction plan.Node_OnDuplicateAction) bool {
76+
if onDupAction == plan.Node_NOCHECKING {
77+
return true
78+
}
79+
6780
if builder.optimizerHints != nil && builder.optimizerHints.skipDedup == 1 {
6881
return true
6982
}
7083

71-
if builder.qry.LoadTag || builder.isRestore {
84+
if builder.isRestore {
7285
return true
7386
}
7487

@@ -85,6 +98,7 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(
8598
lastNodeID int32,
8699
colName2Idx map[string]int32,
87100
skipUniqueIdx []bool,
101+
onDupAction plan.Node_OnDuplicateAction,
88102
astUpdateExprs tree.UpdateExprs,
89103
) (int32, error) {
90104
var err error
@@ -106,21 +120,14 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(
106120
}
107121
}
108122

109-
var onDupAction plan.Node_OnDuplicateAction
110123
scanTag := builder.genNewTag()
111124
updateExprs := make(map[string]*plan.Expr)
112125

113-
if len(astUpdateExprs) == 0 {
114-
onDupAction = plan.Node_FAIL
115-
} else if len(astUpdateExprs) == 1 && astUpdateExprs[0] == nil {
116-
onDupAction = plan.Node_IGNORE
117-
} else {
126+
if onDupAction == plan.Node_UPDATE {
118127
if pkName == catalog.FakePrimaryKeyColName {
119128
return 0, moerr.NewUnsupportedDML(builder.compCtx.GetContext(), "update on duplicate without primary key")
120129
}
121130

122-
onDupAction = plan.Node_UPDATE
123-
124131
binder := NewOndupUpdateBinder(builder.GetContext(), builder, bindCtx, scanTag, selectTag, tableDef)
125132
var updateExpr *plan.Expr
126133
for _, astUpdateExpr := range astUpdateExprs {
@@ -236,7 +243,7 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(
236243
}
237244

238245
// handle primary/unique key confliction
239-
if builder.canSkipDedup(dmlCtx.tableDefs[0]) {
246+
if builder.canSkipDedup(dmlCtx.tableDefs[0], onDupAction) {
240247
// load do not handle primary/unique key confliction
241248
for i, idxDef := range tableDef.Indexes {
242249
if !idxDef.TableExist || skipUniqueIdx[i] {

pkg/sql/plan/bind_load.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,19 @@ func (builder *QueryBuilder) bindLoad(stmt *tree.Load, bindCtx *BindContext) (in
3535
return -1, err
3636
}
3737

38-
return builder.appendDedupAndMultiUpdateNodesForBindInsert(bindCtx, dmlCtx, lastNodeID, colName2Idx, skipUniqueIdx, nil)
38+
var onDupAction plan.Node_OnDuplicateAction
39+
switch stmt.DuplicateOpt {
40+
case tree.LOAD_DUPLICATE_CHECKING:
41+
onDupAction = plan.Node_FAIL
42+
case tree.LOAD_DUPLICATE_NOCHECKING:
43+
onDupAction = plan.Node_NOCHECKING
44+
case tree.LOAD_DUPLICATE_IGNORE:
45+
onDupAction = plan.Node_IGNORE
46+
case tree.LOAD_DUPLICATE_REPLACE:
47+
onDupAction = plan.Node_FAIL // todo, when support replace, modify here
48+
}
49+
50+
return builder.appendDedupAndMultiUpdateNodesForBindInsert(bindCtx, dmlCtx, lastNodeID, colName2Idx, skipUniqueIdx, onDupAction, nil)
3951
}
4052

4153
func (builder *QueryBuilder) bindExternalScan(

test/distributed/cases/load_data/load_data.result

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,3 +916,36 @@ select count(*) from load_data_t9;
916916
count(*)
917917
400000
918918
drop table load_data_t9;
919+
drop table if exists load_data_duplicate;
920+
create table load_data_duplicate(col1 int primary key, col2 int);
921+
load data infile '$resources/load_data/test_duplicate_1.csv' replace into table load_data_duplicate FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';
922+
Duplicate entry '2' for key 'col1'
923+
select * from load_data_duplicate order by col1, col2;
924+
col1 col2
925+
delete from load_data_duplicate;
926+
load data infile '$resources/load_data/test_duplicate_1.csv' ignore into table load_data_duplicate FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';
927+
select * from load_data_duplicate order by col1, col2;
928+
col1 col2
929+
1 2
930+
2 3
931+
3 3
932+
delete from load_data_duplicate;
933+
load data infile '$resources/load_data/test_duplicate_1.csv' into table load_data_duplicate FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';
934+
Duplicate entry '2' for key 'col1'
935+
select * from load_data_duplicate order by col1, col2;
936+
col1 col2
937+
delete from load_data_duplicate;
938+
load data infile '$resources/load_data/test_duplicate_1.csv' checking into table load_data_duplicate FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';
939+
Duplicate entry '2' for key 'col1'
940+
select * from load_data_duplicate order by col1, col2;
941+
col1 col2
942+
delete from load_data_duplicate;
943+
load data infile '$resources/load_data/test_duplicate_1.csv' nochecking into table load_data_duplicate FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n';
944+
select * from load_data_duplicate order by col1, col2;
945+
col1 col2
946+
1 2
947+
2 3
948+
2 4
949+
3 3
950+
delete from load_data_duplicate;
951+
drop table load_data_duplicate;

0 commit comments

Comments
 (0)