From 72ffd897a76776a3350e9461699db69346bcc9e6 Mon Sep 17 00:00:00 2001 From: ou yuanning <45346669+ouyuanning@users.noreply.github.com> Date: Thu, 23 Jan 2025 20:20:42 +0800 Subject: [PATCH] Add reset for some fun (#21327) Add reset for some fun Approved by: @qingxinhome, @badboynt1 --- pkg/sql/colexec/evalExpression.go | 4 ++-- pkg/sql/colexec/evalExpressionReset.go | 10 ++++++++-- pkg/sql/plan/function/func_builtin_serial.go | 5 +++++ pkg/sql/plan/function/function.go | 18 ++++++++++++------ pkg/sql/plan/function/list_builtIn.go | 11 ++++++----- 5 files changed, 33 insertions(+), 15 deletions(-) diff --git a/pkg/sql/colexec/evalExpression.go b/pkg/sql/colexec/evalExpression.go index 4c0ea0de36af0..25a62a7b1bcb6 100644 --- a/pkg/sql/colexec/evalExpression.go +++ b/pkg/sql/colexec/evalExpression.go @@ -191,7 +191,7 @@ func NewExpressionExecutor(proc *process.Process, planExpr *plan.Expr) (Expressi executor.overloadID = overloadID executor.volatile, executor.timeDependent = overload.CannotFold(), overload.IsRealTimeRelated() executor.fid, _ = function.DecodeOverloadID(overloadID) - executor.evalFn, executor.freeFn = overload.GetExecuteMethod() + executor.evalFn, executor.resetFn, executor.freeFn = overload.GetExecuteMethod() } typ := types.New(types.T(planExpr.Typ.Id), planExpr.Typ.Width, planExpr.Typ.Scale) @@ -1392,7 +1392,7 @@ func GetExprZoneMap( ivecs[i] = vecs[arg.AuxId] } } - fn, fnFree := overload.GetExecuteMethod() + fn, _, fnFree := overload.GetExecuteMethod() typ := types.New(types.T(expr.Typ.Id), expr.Typ.Width, expr.Typ.Scale) result := vector.NewFunctionResultWrapper(typ, proc.Mp()) diff --git a/pkg/sql/colexec/evalExpressionReset.go b/pkg/sql/colexec/evalExpressionReset.go index 055c7188b635b..055f4a0e8bbc2 100644 --- a/pkg/sql/colexec/evalExpressionReset.go +++ b/pkg/sql/colexec/evalExpressionReset.go @@ -57,13 +57,19 @@ type functionInformationForEval struct { proc *process.Process, rowCount int, selectList *function.FunctionSelectList) error - freeFn func() error + resetFn func() error + freeFn func() error } func (fI *functionInformationForEval) reset() { // we need to regenerate the evalFn to avoid a wrong result since the function may take an own runtime contest. // todo: in fact, we can jump this step if the function is a pure function. but we don't have this information now. + if fI.resetFn != nil { + _ = fI.resetFn() + return + } + if fI.freeFn != nil { _ = fI.freeFn() fI.freeFn = nil @@ -73,7 +79,7 @@ func (fI *functionInformationForEval) reset() { if fI.evalFn != nil { // we can set the context nil here since this function will never return an error. overload, _ := function.GetFunctionById(context.TODO(), fI.overloadID) - fI.evalFn, fI.freeFn = overload.GetExecuteMethod() + fI.evalFn, fI.resetFn, fI.freeFn = overload.GetExecuteMethod() } } diff --git a/pkg/sql/plan/function/func_builtin_serial.go b/pkg/sql/plan/function/func_builtin_serial.go index 58468680cd6d7..40dad6316d7dd 100644 --- a/pkg/sql/plan/function/func_builtin_serial.go +++ b/pkg/sql/plan/function/func_builtin_serial.go @@ -34,3 +34,8 @@ func (op *opSerial) Close() error { op.packer.Close() return nil } + +func (op *opSerial) Reset() error { + op.packer.Reset() + return nil +} diff --git a/pkg/sql/plan/function/function.go b/pkg/sql/plan/function/function.go index bbdb53983bb76..b8566125d263d 100644 --- a/pkg/sql/plan/function/function.go +++ b/pkg/sql/plan/function/function.go @@ -207,7 +207,7 @@ func RunFunctionDirectly(proc *process.Process, overloadID int64, inputs []*vect result.Free() return nil, err } - exec, execFree := f.GetExecuteMethod() + exec, _, execFree := f.GetExecuteMethod() if err = exec(inputs, result, proc, evaluateLength, nil); err != nil { result.Free() if execFree != nil { @@ -344,6 +344,12 @@ type executeLogicOfOverload func(parameters []*vector.Vector, // in case we need it in the future. type executeFreeOfOverload func() error +// executeResetOfOverload is used to reset the resources allocated by the execution logic. +// It is mainly used in SERIAL and SERIAL_FULL. +// NOTE: right now, we are not throwing an error when the reset logic failed. However, it is still included +// in case we need it in the future. +type executeResetOfOverload func() error + type aggregationLogicOfOverload struct { // agg related string for error message. str string @@ -375,7 +381,7 @@ type overload struct { // the execution logic and free logic. // NOTE: use either newOp or newOpWithFree. - newOpWithFree func() (executeLogicOfOverload, executeFreeOfOverload) + newOpWithFree func() (executeLogicOfOverload, executeResetOfOverload, executeFreeOfOverload) // in fact, the function framework does not directly run aggregate functions and window functions. // we use two flags to mark whether function is one of them. @@ -412,14 +418,14 @@ func (ov *overload) CannotExecuteInParallel() bool { return ov.cannotParallel } -func (ov *overload) GetExecuteMethod() (executeLogicOfOverload, executeFreeOfOverload) { +func (ov *overload) GetExecuteMethod() (executeLogicOfOverload, executeResetOfOverload, executeFreeOfOverload) { if ov.newOpWithFree != nil { - fn, fnFree := ov.newOpWithFree() - return fn, fnFree + fn, fnReset, fnFree := ov.newOpWithFree() + return fn, fnReset, fnFree } fn := ov.newOp() - return fn, nil + return fn, nil, nil } func (ov *overload) GetReturnTypeMethod() func(parameters []types.Type) types.Type { diff --git a/pkg/sql/plan/function/list_builtIn.go b/pkg/sql/plan/function/list_builtIn.go index 5e20ea38247dc..30823fc0ffa2d 100644 --- a/pkg/sql/plan/function/list_builtIn.go +++ b/pkg/sql/plan/function/list_builtIn.go @@ -16,7 +16,8 @@ package function import ( "fmt" - "github.com/matrixorigin/matrixone/pkg/sql/plan/function/fault" + + fj "github.com/matrixorigin/matrixone/pkg/sql/plan/function/fault" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" @@ -1445,9 +1446,9 @@ var supportedStringBuiltIns = []FuncNew{ retType: func(parameters []types.Type) types.Type { return types.T_varchar.ToType() }, - newOpWithFree: func() (executeLogicOfOverload, executeFreeOfOverload) { + newOpWithFree: func() (executeLogicOfOverload, executeResetOfOverload, executeFreeOfOverload) { opSerial := newOpSerial() - return opSerial.BuiltInSerial, opSerial.Close + return opSerial.BuiltInSerial, opSerial.Reset, opSerial.Close }, }, }, @@ -1471,9 +1472,9 @@ var supportedStringBuiltIns = []FuncNew{ retType: func(parameters []types.Type) types.Type { return types.T_varchar.ToType() }, - newOpWithFree: func() (executeLogicOfOverload, executeFreeOfOverload) { + newOpWithFree: func() (executeLogicOfOverload, executeResetOfOverload, executeFreeOfOverload) { opSerial := newOpSerial() - return opSerial.BuiltInSerialFull, opSerial.Close + return opSerial.BuiltInSerialFull, opSerial.Reset, opSerial.Close }, }, },