diff --git a/pkg/bootstrap/versions/v2_0_1/pubsub.go b/pkg/bootstrap/versions/v2_0_1/pubsub.go index 899d95fc63f13..f8def79d7dfb7 100644 --- a/pkg/bootstrap/versions/v2_0_1/pubsub.go +++ b/pkg/bootstrap/versions/v2_0_1/pubsub.go @@ -107,6 +107,11 @@ func migrateMoPubs(txn executor.TxnExecutor) (err error) { if _, err = txn.Exec(insertSql, executor.StatementOption{}.WithAccountID(0)); err != nil { return } + + deleteSql := fmt.Sprintf("delete from mo_catalog.mo_subs where pub_account_name = '%s' and pub_name = '%s' and sub_name is null", info.PubAccountName, info.PubName) + if _, err = txn.Exec(deleteSql, executor.StatementOption{}.WithAccountID(0)); err != nil { + return + } } return } diff --git a/pkg/bootstrap/versions/v2_0_1/pubsub_test.go b/pkg/bootstrap/versions/v2_0_1/pubsub_test.go index c9784c034fc4d..b568d99001a8f 100644 --- a/pkg/bootstrap/versions/v2_0_1/pubsub_test.go +++ b/pkg/bootstrap/versions/v2_0_1/pubsub_test.go @@ -15,6 +15,7 @@ package v2_0_1 import ( + "strings" "testing" "github.com/matrixorigin/matrixone/pkg/bootstrap/versions" @@ -27,7 +28,9 @@ import ( "github.com/stretchr/testify/assert" ) -type MockTxnExecutor struct{} +type MockTxnExecutor struct { + flag bool +} func (MockTxnExecutor) Use(db string) { //TODO implement me @@ -39,7 +42,11 @@ func (MockTxnExecutor) LockTable(table string) error { panic("implement me") } -func (MockTxnExecutor) Exec(sql string, options executor.StatementOption) (executor.Result, error) { +func (e MockTxnExecutor) Exec(sql string, options executor.StatementOption) (executor.Result, error) { + if strings.HasPrefix(sql, "delete from mo_catalog.mo_subs") && e.flag { + return executor.Result{}, assert.AnError + } + bat := batch.New([]string{"a"}) bat.Vecs[0] = testutil.MakeInt32Vector([]int32{1}, nil) bat.SetRowCount(1) @@ -106,3 +113,46 @@ func Test_migrateMoPubs(t *testing.T) { err := migrateMoPubs(txn) assert.NoError(t, err) } + +func Test_migrateMoPubs_deleteFailed(t *testing.T) { + getAccountsStub := gostub.Stub( + &pubsub.GetAccounts, + func(_ executor.TxnExecutor) (map[string]*pubsub.AccountInfo, map[int32]*pubsub.AccountInfo, error) { + return map[string]*pubsub.AccountInfo{ + "acc1": {Id: 1, Name: "acc1"}, + }, nil, nil + }, + ) + defer getAccountsStub.Reset() + + getAllPubInfosStub := gostub.Stub( + &versions.GetAllPubInfos, + func(_ executor.TxnExecutor, _ map[string]*pubsub.AccountInfo) (map[string]*pubsub.PubInfo, error) { + return map[string]*pubsub.PubInfo{ + "sys#pubName": { + PubAccountName: "sys", + PubName: "pubName", + SubAccountsStr: pubsub.AccountAll, + }, + "acc1#pubName": { + PubAccountName: "acc1", + PubName: "pubName", + SubAccountsStr: pubsub.AccountAll, + }, + }, nil + }, + ) + defer getAllPubInfosStub.Reset() + + getSubbedAccNamesStub := gostub.Stub( + &getSubbedAccNames, + func(_ executor.TxnExecutor, _, _ string, _ map[int32]*pubsub.AccountInfo) ([]string, error) { + return []string{"acc2"}, nil + }, + ) + defer getSubbedAccNamesStub.Reset() + + txn := &MockTxnExecutor{flag: true} + err := migrateMoPubs(txn) + assert.Error(t, err) +}