Skip to content

Commit

Permalink
refactor: improve lock meta (#21300)
Browse files Browse the repository at this point in the history
improve lock meta

Approved by: @ouyuanning
  • Loading branch information
huby2358 authored Jan 24, 2025
1 parent 72ffd89 commit 3166f3b
Showing 1 changed file with 58 additions and 43 deletions.
101 changes: 58 additions & 43 deletions pkg/sql/compile/lock_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,16 @@ type LockMeta struct {
metaTables map[string]struct{} //key: (db_name table_name)
lockDbExe colexec.ExpressionExecutor //executor to serial function to lock mo_database
lockTableExe colexec.ExpressionExecutor //executor to serial function to lock mo_tables
lockMetaVecs []*vector.Vector //paramters for serial function
lockDbVec *vector.Vector
lockTableVec *vector.Vector
lockMetaVecs []*vector.Vector //paramters for serial function
}

func NewLockMeta() *LockMeta {
return &LockMeta{}
}

func (l *LockMeta) reset(_ *process.Process) {
if l.lockDbExe != nil {
l.lockDbExe.ResetForNextQuery()
}
if l.lockTableExe != nil {
l.lockTableExe.ResetForNextQuery()
}
}

func (l *LockMeta) clear(proc *process.Process) {
Expand All @@ -74,6 +70,8 @@ func (l *LockMeta) clear(proc *process.Process) {
vec.Free(proc.Mp())
}
l.lockMetaVecs = nil
l.lockDbVec = nil
l.lockTableVec = nil
}

func (l *LockMeta) appendMetaTables(objRes *plan.ObjectRef) {
Expand All @@ -94,7 +92,7 @@ func (l *LockMeta) doLock(e engine.Engine, proc *process.Process) error {
if lockLen == 0 {
return nil
}

var err error
accountId, err := defines.GetAccountId(proc.Ctx)
if err != nil {
return err
Expand Down Expand Up @@ -126,57 +124,74 @@ func (l *LockMeta) doLock(e engine.Engine, proc *process.Process) error {
}

lockDbs := make(map[string]struct{})
bat := batch.NewWithSize(3)
for _, table := range tables {
names := strings.SplitN(table, " ", 2)
err := vector.AppendFixed(l.lockMetaVecs[0], accountId, false, proc.GetMPool()) //account_id
if err != nil {
return err
}
err = vector.AppendBytes(l.lockMetaVecs[1], []byte(names[0]), false, proc.GetMPool()) //db_name
if err != nil {
return err
lockVec := l.lockTableVec

var bat *batch.Batch
if l.lockTableVec == nil {
bat = batch.NewWithSize(3)
for _, table := range tables {
names := strings.SplitN(table, " ", 2)
err = vector.AppendFixed(l.lockMetaVecs[0], accountId, false, proc.GetMPool()) //account_id
if err != nil {
return err
}
err = vector.AppendBytes(l.lockMetaVecs[1], []byte(names[0]), false, proc.GetMPool()) //db_name
if err != nil {
return err
}
err = vector.AppendBytes(l.lockMetaVecs[2], []byte(names[1]), false, proc.GetMPool()) //table_name
if err != nil {
return err
}
lockDbs[names[0]] = struct{}{}
}
err = vector.AppendBytes(l.lockMetaVecs[2], []byte(names[1]), false, proc.GetMPool()) //table_name

// call serial function to lock mo_tables
bat.Vecs = l.lockMetaVecs

bat.SetRowCount(l.lockMetaVecs[0].Length())
lockVec, err = l.lockTableExe.Eval(proc, []*batch.Batch{bat}, nil)
if err != nil {
return err
}
lockDbs[names[0]] = struct{}{}
l.lockTableVec = lockVec
}

// call serial function to lock mo_tables
bat.Vecs = l.lockMetaVecs
err = l.lockMetaRows(e, proc, l.lockTableExe, bat, accountId, l.table_table_id)
err = l.lockMetaRows(e, proc, lockVec, accountId, l.table_table_id)
if err != nil {
return err
}

// recall serial function to lock mo_databases
l.lockMetaVecs[0].CleanOnlyData()
l.lockMetaVecs[1].CleanOnlyData()
if len(lockDbs) > 1 {
for dbName := range lockDbs {
err := vector.AppendFixed(l.lockMetaVecs[0], accountId, false, proc.GetMPool()) //account_id
if err != nil {
return err
}
err = vector.AppendBytes(l.lockMetaVecs[1], []byte(dbName), false, proc.GetMPool()) //db_name
if err != nil {
return err
lockVec = l.lockDbVec
if l.lockDbVec == nil {
l.lockMetaVecs[0].CleanOnlyData()
l.lockMetaVecs[1].CleanOnlyData()
if len(lockDbs) > 1 {
for dbName := range lockDbs {
err = vector.AppendFixed(l.lockMetaVecs[0], accountId, false, proc.GetMPool()) //account_id
if err != nil {
return err
}
err = vector.AppendBytes(l.lockMetaVecs[1], []byte(dbName), false, proc.GetMPool()) //db_name
if err != nil {
return err
}
}
}
bat.Vecs = l.lockMetaVecs[:2]
bat.SetRowCount(l.lockMetaVecs[0].Length())
lockVec, err = l.lockDbExe.Eval(proc, []*batch.Batch{bat}, nil)
if err != nil {
return err
}
l.lockDbVec = lockVec
}
bat.Vecs = l.lockMetaVecs[:2]
return l.lockMetaRows(e, proc, l.lockDbExe, bat, accountId, l.database_table_id)

return l.lockMetaRows(e, proc, lockVec, accountId, l.database_table_id)
}

func (l *LockMeta) lockMetaRows(e engine.Engine, proc *process.Process, executor colexec.ExpressionExecutor, bat *batch.Batch, accountId uint32, tableId uint64) error {
executor.ResetForNextQuery()
bat.SetRowCount(l.lockMetaVecs[0].Length())
lockVec, err := executor.Eval(proc, []*batch.Batch{bat}, nil)
if err != nil {
return err
}
func (l *LockMeta) lockMetaRows(e engine.Engine, proc *process.Process, lockVec *vector.Vector, accountId uint32, tableId uint64) error {
b := batch.NewWithSize(1)
b.SetVector(0, lockVec)
if err := lockop.LockRows(e, proc, nil, tableId, b, 0, *lockVec.GetType(), lock.LockMode_Shared, lock.Sharding_None, accountId); err != nil {
Expand Down

0 comments on commit 3166f3b

Please sign in to comment.