Skip to content

Commit e47cc67

Browse files
committed
refactor: refactor at executor to support different databases
Signed-off-by: LeeHao <[email protected]>
1 parent 115df45 commit e47cc67

21 files changed

+692
-496
lines changed

pkg/datasource/sql/conn_at.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (c *ATConn) ExecContext(ctx context.Context, query string, args []driver.Na
104104
DBName: c.dbName,
105105
IsSupportsSavepoints: true,
106106
IsAutoCommit: c.GetAutoCommit(),
107+
DBType: c.dbType,
107108
}
108109

109110
ret, err := executor.ExecWithNamedValue(ctx, execCtx,

pkg/datasource/sql/exec/at/at_executor.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ package at
1919

2020
import (
2121
"context"
22-
23-
"seata.apache.org/seata-go/pkg/datasource/sql/exec/at/internal"
24-
2522
"seata.apache.org/seata-go/pkg/datasource/sql/exec"
23+
"seata.apache.org/seata-go/pkg/datasource/sql/exec/at/internal"
24+
"seata.apache.org/seata-go/pkg/datasource/sql/exec/at/mysql"
2625
"seata.apache.org/seata-go/pkg/datasource/sql/parser"
2726
"seata.apache.org/seata-go/pkg/datasource/sql/types"
2827
"seata.apache.org/seata-go/pkg/datasource/sql/util"
2928
"seata.apache.org/seata-go/pkg/tm"
29+
"seata.apache.org/seata-go/pkg/util/log"
3030
)
3131

3232
func Init() {
@@ -55,17 +55,17 @@ func (e *ATExecutor) ExecWithNamedValue(ctx context.Context, execCtx *types.Exec
5555
} else {
5656
switch queryParser.SQLType {
5757
case types.SQLTypeInsert:
58-
executor = internal.NewInsertExecutor(queryParser, execCtx, e.hooks)
58+
executor = e.NewInsertExecutor(queryParser, execCtx, e.hooks)
5959
case types.SQLTypeUpdate:
60-
executor = internal.NewUpdateExecutor(queryParser, execCtx, e.hooks)
60+
executor = e.NewUpdateExecutor(queryParser, execCtx, e.hooks)
6161
case types.SQLTypeDelete:
62-
executor = internal.NewDeleteExecutor(queryParser, execCtx, e.hooks)
62+
executor = e.NewDeleteExecutor(queryParser, execCtx, e.hooks)
6363
case types.SQLTypeSelectForUpdate:
64-
executor = internal.NewSelectForUpdateExecutor(queryParser, execCtx, e.hooks)
64+
executor = e.NewSelectForUpdateExecutor(queryParser, execCtx, e.hooks)
6565
case types.SQLTypeInsertOnDuplicateUpdate:
66-
executor = internal.NewInsertOnUpdateExecutor(queryParser, execCtx, e.hooks)
66+
executor = e.NewInsertOnUpdateExecutor(queryParser, execCtx, e.hooks)
6767
case types.SQLTypeMulti:
68-
executor = internal.NewMultiExecutor(queryParser, execCtx, e.hooks)
68+
executor = e.NewMultiExecutor(queryParser, execCtx, e.hooks)
6969
default:
7070
executor = internal.NewPlainExecutor(queryParser, execCtx)
7171
}
@@ -79,3 +79,57 @@ func (e *ATExecutor) ExecWithValue(ctx context.Context, execCtx *types.ExecConte
7979
execCtx.NamedValues = util.ValueToNamedValue(execCtx.Values)
8080
return e.ExecWithNamedValue(ctx, execCtx, f)
8181
}
82+
83+
func (e *ATExecutor) NewInsertExecutor(parserCtx *types.ParseContext, execContext *types.ExecContext, hooks []exec.SQLHook) internal.Executor {
84+
switch execContext.DBType {
85+
case types.DBTypeMySQL:
86+
return mysql.NewInsertExecutor(parserCtx, execContext, e.hooks)
87+
}
88+
log.Errorf("unsupported db type: %s for insert executor", execContext.DBType)
89+
return nil
90+
}
91+
92+
func (e *ATExecutor) NewUpdateExecutor(parserCtx *types.ParseContext, execContext *types.ExecContext, hooks []exec.SQLHook) internal.Executor {
93+
switch execContext.DBType {
94+
case types.DBTypeMySQL:
95+
return mysql.NewUpdateExecutor(parserCtx, execContext, e.hooks)
96+
}
97+
log.Errorf("unsupported db type: %s for update executor", execContext.DBType)
98+
return nil
99+
}
100+
101+
func (e *ATExecutor) NewDeleteExecutor(parserCtx *types.ParseContext, execContext *types.ExecContext, hooks []exec.SQLHook) internal.Executor {
102+
switch execContext.DBType {
103+
case types.DBTypeMySQL:
104+
return mysql.NewDeleteExecutor(parserCtx, execContext, e.hooks)
105+
}
106+
log.Errorf("unsupported db type: %s for delete executor", execContext.DBType)
107+
return nil
108+
}
109+
110+
func (e *ATExecutor) NewSelectForUpdateExecutor(parserCtx *types.ParseContext, execContext *types.ExecContext, hooks []exec.SQLHook) internal.Executor {
111+
switch execContext.DBType {
112+
case types.DBTypeMySQL:
113+
return mysql.NewSelectForUpdateExecutor(parserCtx, execContext, e.hooks)
114+
}
115+
log.Errorf("unsupported db type: %s for select_for_update executor", execContext.DBType)
116+
return nil
117+
}
118+
119+
func (e *ATExecutor) NewInsertOnUpdateExecutor(parserCtx *types.ParseContext, execContext *types.ExecContext, hooks []exec.SQLHook) internal.Executor {
120+
switch execContext.DBType {
121+
case types.DBTypeMySQL:
122+
return mysql.NewInsertOnUpdateExecutor(parserCtx, execContext, e.hooks)
123+
}
124+
log.Errorf("unsupported db type: %s for insert_on_update executor", execContext.DBType)
125+
return nil
126+
}
127+
128+
func (e *ATExecutor) NewMultiExecutor(parserCtx *types.ParseContext, execContext *types.ExecContext, hooks []exec.SQLHook) internal.Executor {
129+
switch execContext.DBType {
130+
case types.DBTypeMySQL:
131+
return mysql.NewMultiExecutor(parserCtx, execContext, e.hooks)
132+
}
133+
log.Errorf("unsupported db type: %s for multi executor", execContext.DBType)
134+
return nil
135+
}

pkg/datasource/sql/exec/at/internal/base_executor.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"database/sql"
2424
"database/sql/driver"
2525
"fmt"
26+
"seata.apache.org/seata-go/pkg/datasource/sql/datasource"
27+
"seata.apache.org/seata-go/pkg/util/log"
2628
"strings"
2729

2830
"seata.apache.org/seata-go/pkg/datasource/sql/undo"
@@ -42,19 +44,19 @@ type Executor interface {
4244
}
4345

4446
type BaseExecutor struct {
45-
hooks []exec.SQLHook
46-
parserCtx *types.ParseContext
47-
execContext *types.ExecContext
47+
Hooks []exec.SQLHook
48+
ParserCtx *types.ParseContext
49+
ExecCtx *types.ExecContext
4850
}
4951

5052
func (b *BaseExecutor) beforeHooks(ctx context.Context, execCtx *types.ExecContext) {
51-
for _, hook := range b.hooks {
53+
for _, hook := range b.Hooks {
5254
hook.Before(ctx, execCtx)
5355
}
5456
}
5557

5658
func (b *BaseExecutor) afterHooks(ctx context.Context, execCtx *types.ExecContext) {
57-
for _, hook := range b.hooks {
59+
for _, hook := range b.Hooks {
5860
hook.After(ctx, execCtx)
5961
}
6062
}
@@ -100,7 +102,7 @@ func (*BaseExecutor) GetScanSlice(columnNames []string, tableMeta *types.TableMe
100102
}
101103

102104
func (b BaseExecutor) DBType() types.DBType {
103-
return b.execContext.DBType
105+
return b.ExecCtx.DBType
104106
}
105107

106108
func (b *BaseExecutor) buildSelectArgs(stmt *ast.SelectStmt, args []driver.NamedValue) []driver.NamedValue {
@@ -298,7 +300,7 @@ func getSqlNullValue(value interface{}) interface{} {
298300

299301
// buildWhereConditionByPKs build where condition by primary keys
300302
// each pk is a condition.the result will like :" (id,userCode) in ((?,?),(?,?)) or (id,userCode) in ((?,?),(?,?) ) or (id,userCode) in ((?,?))"
301-
func (b *BaseExecutor) buildWhereConditionByPKs(pkNameList []string, rowSize int, dbType string, maxInSize int) string {
303+
func (b *BaseExecutor) buildWhereConditionByPKs(pkNameList []string, rowSize int, maxInSize int) string {
302304
var (
303305
whereStr = &strings.Builder{}
304306
batchSize = rowSize/maxInSize + 1
@@ -404,3 +406,29 @@ func (b *BaseExecutor) buildLockKey(records *types.RecordImage, meta types.Table
404406

405407
return lockKeys.String()
406408
}
409+
410+
func (b *BaseExecutor) GetMetaData(ctx context.Context) (*types.TableMeta, error) {
411+
tableName, _ := b.ParserCtx.GetTableName()
412+
return datasource.GetTableCache(b.DBType()).GetTableMeta(ctx, b.ExecCtx.DBName, tableName)
413+
}
414+
415+
func (b *BaseExecutor) ExecSelectSQL(ctx context.Context, selectSQL string, args []driver.NamedValue) (driver.Rows, error) {
416+
var rowsi driver.Rows
417+
418+
queryerCtx, ok := b.ExecCtx.Conn.(driver.QueryerContext)
419+
var queryer driver.Queryer
420+
if !ok {
421+
queryer, ok = b.ExecCtx.Conn.(driver.Queryer)
422+
}
423+
if !ok {
424+
log.Errorf("target conn should been driver.QueryerContext or driver.Queryer")
425+
return nil, fmt.Errorf("invalid conn")
426+
}
427+
428+
rowsi, err := util.CtxDriverQuery(ctx, queryerCtx, queryer, selectSQL, args)
429+
if err != nil {
430+
log.Errorf("ctx driver query: %+v", err)
431+
return nil, err
432+
}
433+
return rowsi, nil
434+
}

pkg/datasource/sql/exec/at/internal/delete_executor.go

Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@ import (
2121
"context"
2222
"database/sql/driver"
2323
"fmt"
24-
2524
"github.com/arana-db/parser/ast"
2625
"github.com/arana-db/parser/format"
27-
"seata.apache.org/seata-go/pkg/datasource/sql/datasource"
28-
"seata.apache.org/seata-go/pkg/datasource/sql/exec"
2926
"seata.apache.org/seata-go/pkg/datasource/sql/parser"
30-
"seata.apache.org/seata-go/pkg/datasource/sql/types"
31-
"seata.apache.org/seata-go/pkg/datasource/sql/util"
3227
"seata.apache.org/seata-go/pkg/util/bytes"
28+
29+
"seata.apache.org/seata-go/pkg/datasource/sql/exec"
30+
"seata.apache.org/seata-go/pkg/datasource/sql/types"
3331
"seata.apache.org/seata-go/pkg/util/log"
3432
)
3533

@@ -39,23 +37,25 @@ type DeleteExecutor struct {
3937
}
4038

4139
// NewDeleteExecutor get delete Executor
42-
func NewDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) Executor {
43-
return &DeleteExecutor{BaseExecutor: BaseExecutor{hooks, parserCtx, execContent}}
40+
func NewDeleteExecutor(parserCtx *types.ParseContext, execContext *types.ExecContext, hooks []exec.SQLHook) Executor {
41+
return &DeleteExecutor{
42+
BaseExecutor: BaseExecutor{Hooks: hooks, ParserCtx: parserCtx, ExecCtx: execContext},
43+
}
4444
}
4545

4646
// ExecContext exec SQL, and generate before image and after image
4747
func (d *DeleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
48-
d.beforeHooks(ctx, d.execContext)
48+
d.beforeHooks(ctx, d.ExecCtx)
4949
defer func() {
50-
d.afterHooks(ctx, d.execContext)
50+
d.afterHooks(ctx, d.ExecCtx)
5151
}()
5252

5353
beforeImage, err := d.beforeImage(ctx)
5454
if err != nil {
5555
return nil, err
5656
}
5757

58-
res, err := f(ctx, d.execContext.Query, d.execContext.NamedValues)
58+
res, err := f(ctx, d.ExecCtx.Query, d.ExecCtx.NamedValues)
5959
if err != nil {
6060
return nil, err
6161
}
@@ -65,42 +65,24 @@ func (d *DeleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNam
6565
return nil, err
6666
}
6767

68-
d.execContext.TxCtx.RoundImages.AppendBeofreImage(beforeImage)
69-
d.execContext.TxCtx.RoundImages.AppendAfterImage(afterImage)
68+
d.ExecCtx.TxCtx.RoundImages.AppendBeofreImage(beforeImage)
69+
d.ExecCtx.TxCtx.RoundImages.AppendAfterImage(afterImage)
7070
return res, nil
7171
}
7272

7373
// beforeImage build before image
7474
func (d *DeleteExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) {
75-
selectSQL, selectArgs, err := d.buildBeforeImageSQL(d.execContext.Query, d.execContext.NamedValues)
75+
selectSQL, selectArgs, err := d.BuildBeforeImageSQL(d.ExecCtx.Query, d.ExecCtx.NamedValues)
7676
if err != nil {
7777
return nil, err
7878
}
7979

80-
var rowsi driver.Rows
81-
queryerCtx, ok := d.execContext.Conn.(driver.QueryerContext)
82-
var queryer driver.Queryer
83-
if !ok {
84-
queryer, ok = d.execContext.Conn.(driver.Queryer)
85-
}
86-
if ok {
87-
rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, selectSQL, selectArgs)
88-
defer func() {
89-
if rowsi != nil {
90-
rowsi.Close()
91-
}
92-
}()
93-
if err != nil {
94-
log.Errorf("ctx driver query: %+v", err)
95-
return nil, err
96-
}
97-
} else {
98-
log.Errorf("target conn should been driver.QueryerContext or driver.Queryer")
99-
return nil, fmt.Errorf("invalid conn")
80+
rowsi, err := d.ExecSelectSQL(ctx, selectSQL, selectArgs)
81+
if err != nil {
82+
return nil, err
10083
}
10184

102-
tableName, _ := d.parserCtx.GetTableName()
103-
metaData, err := datasource.GetTableCache(d.BaseExecutor.DBType()).GetTableMeta(ctx, d.execContext.DBName, tableName)
85+
metaData, err := d.GetMetaData(ctx)
10486

10587
if err != nil {
10688
return nil, err
@@ -114,13 +96,13 @@ func (d *DeleteExecutor) beforeImage(ctx context.Context) (*types.RecordImage, e
11496
image.TableMeta = metaData
11597

11698
lockKey := d.buildLockKey(image, *metaData)
117-
d.execContext.TxCtx.LockKeys[lockKey] = struct{}{}
99+
d.ExecCtx.TxCtx.LockKeys[lockKey] = struct{}{}
118100

119101
return image, nil
120102
}
121103

122-
// buildBeforeImageSQL build delete sql from delete sql
123-
func (d *DeleteExecutor) buildBeforeImageSQL(query string, args []driver.NamedValue) (string, []driver.NamedValue, error) {
104+
// BuildBeforeImageSQL build delete sql from delete sql
105+
func (d *DeleteExecutor) BuildBeforeImageSQL(query string, args []driver.NamedValue) (string, []driver.NamedValue, error) {
124106
p, err := parser.DoParser(query)
125107
if err != nil {
126108
return "", nil, err
@@ -154,8 +136,7 @@ func (d *DeleteExecutor) buildBeforeImageSQL(query string, args []driver.NamedVa
154136

155137
// afterImage build after image
156138
func (d *DeleteExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) {
157-
tableName, _ := d.parserCtx.GetTableName()
158-
metaData, err := datasource.GetTableCache(d.BaseExecutor.DBType()).GetTableMeta(ctx, d.execContext.DBName, tableName)
139+
metaData, err := d.GetMetaData(ctx)
159140
if err != nil {
160141
return nil, err
161142
}

pkg/datasource/sql/exec/at/internal/delete_executor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func Test_deleteExecutor_buildBeforeImageSQL(t *testing.T) {
7373
c, err := parser.DoParser(tt.sourceQuery)
7474
assert.Nil(t, err)
7575
executor := NewDeleteExecutor(c, &types.ExecContext{Values: tt.sourceQueryArgs, NamedValues: util.ValueToNamedValue(tt.sourceQueryArgs)}, []exec.SQLHook{})
76-
query, args, err := executor.(*DeleteExecutor).buildBeforeImageSQL(tt.sourceQuery, util.ValueToNamedValue(tt.sourceQueryArgs))
76+
query, args, err := executor.(*DeleteExecutor).BuildBeforeImageSQL(tt.sourceQuery, util.ValueToNamedValue(tt.sourceQueryArgs))
7777
assert.Nil(t, err)
7878
assert.Equal(t, tt.expectQuery, query)
7979
assert.Equal(t, tt.expectQueryArgs, util.NamedValueToValue(args))

0 commit comments

Comments
 (0)