Skip to content

Commit

Permalink
Add reset for some fun (#21327)
Browse files Browse the repository at this point in the history
Add reset for some fun

Approved by: @qingxinhome, @badboynt1
  • Loading branch information
ouyuanning authored Jan 23, 2025
1 parent fa2ec45 commit 72ffd89
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 15 deletions.
4 changes: 2 additions & 2 deletions pkg/sql/colexec/evalExpression.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func NewExpressionExecutor(proc *process.Process, planExpr *plan.Expr) (Expressi
executor.overloadID = overloadID
executor.volatile, executor.timeDependent = overload.CannotFold(), overload.IsRealTimeRelated()
executor.fid, _ = function.DecodeOverloadID(overloadID)
executor.evalFn, executor.freeFn = overload.GetExecuteMethod()
executor.evalFn, executor.resetFn, executor.freeFn = overload.GetExecuteMethod()
}
typ := types.New(types.T(planExpr.Typ.Id), planExpr.Typ.Width, planExpr.Typ.Scale)

Expand Down Expand Up @@ -1392,7 +1392,7 @@ func GetExprZoneMap(
ivecs[i] = vecs[arg.AuxId]
}
}
fn, fnFree := overload.GetExecuteMethod()
fn, _, fnFree := overload.GetExecuteMethod()
typ := types.New(types.T(expr.Typ.Id), expr.Typ.Width, expr.Typ.Scale)

result := vector.NewFunctionResultWrapper(typ, proc.Mp())
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/colexec/evalExpressionReset.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,19 @@ type functionInformationForEval struct {
proc *process.Process,
rowCount int,
selectList *function.FunctionSelectList) error
freeFn func() error
resetFn func() error
freeFn func() error
}

func (fI *functionInformationForEval) reset() {
// we need to regenerate the evalFn to avoid a wrong result since the function may take an own runtime contest.
// todo: in fact, we can jump this step if the function is a pure function. but we don't have this information now.

if fI.resetFn != nil {
_ = fI.resetFn()
return
}

if fI.freeFn != nil {
_ = fI.freeFn()
fI.freeFn = nil
Expand All @@ -73,7 +79,7 @@ func (fI *functionInformationForEval) reset() {
if fI.evalFn != nil {
// we can set the context nil here since this function will never return an error.
overload, _ := function.GetFunctionById(context.TODO(), fI.overloadID)
fI.evalFn, fI.freeFn = overload.GetExecuteMethod()
fI.evalFn, fI.resetFn, fI.freeFn = overload.GetExecuteMethod()
}
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/plan/function/func_builtin_serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ func (op *opSerial) Close() error {
op.packer.Close()
return nil
}

func (op *opSerial) Reset() error {
op.packer.Reset()
return nil
}
18 changes: 12 additions & 6 deletions pkg/sql/plan/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func RunFunctionDirectly(proc *process.Process, overloadID int64, inputs []*vect
result.Free()
return nil, err
}
exec, execFree := f.GetExecuteMethod()
exec, _, execFree := f.GetExecuteMethod()
if err = exec(inputs, result, proc, evaluateLength, nil); err != nil {
result.Free()
if execFree != nil {
Expand Down Expand Up @@ -344,6 +344,12 @@ type executeLogicOfOverload func(parameters []*vector.Vector,
// in case we need it in the future.
type executeFreeOfOverload func() error

// executeResetOfOverload is used to reset the resources allocated by the execution logic.
// It is mainly used in SERIAL and SERIAL_FULL.
// NOTE: right now, we are not throwing an error when the reset logic failed. However, it is still included
// in case we need it in the future.
type executeResetOfOverload func() error

type aggregationLogicOfOverload struct {
// agg related string for error message.
str string
Expand Down Expand Up @@ -375,7 +381,7 @@ type overload struct {

// the execution logic and free logic.
// NOTE: use either newOp or newOpWithFree.
newOpWithFree func() (executeLogicOfOverload, executeFreeOfOverload)
newOpWithFree func() (executeLogicOfOverload, executeResetOfOverload, executeFreeOfOverload)

// in fact, the function framework does not directly run aggregate functions and window functions.
// we use two flags to mark whether function is one of them.
Expand Down Expand Up @@ -412,14 +418,14 @@ func (ov *overload) CannotExecuteInParallel() bool {
return ov.cannotParallel
}

func (ov *overload) GetExecuteMethod() (executeLogicOfOverload, executeFreeOfOverload) {
func (ov *overload) GetExecuteMethod() (executeLogicOfOverload, executeResetOfOverload, executeFreeOfOverload) {
if ov.newOpWithFree != nil {
fn, fnFree := ov.newOpWithFree()
return fn, fnFree
fn, fnReset, fnFree := ov.newOpWithFree()
return fn, fnReset, fnFree
}

fn := ov.newOp()
return fn, nil
return fn, nil, nil
}

func (ov *overload) GetReturnTypeMethod() func(parameters []types.Type) types.Type {
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/plan/function/list_builtIn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ package function

import (
"fmt"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function/fault"

fj "github.com/matrixorigin/matrixone/pkg/sql/plan/function/fault"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -1445,9 +1446,9 @@ var supportedStringBuiltIns = []FuncNew{
retType: func(parameters []types.Type) types.Type {
return types.T_varchar.ToType()
},
newOpWithFree: func() (executeLogicOfOverload, executeFreeOfOverload) {
newOpWithFree: func() (executeLogicOfOverload, executeResetOfOverload, executeFreeOfOverload) {
opSerial := newOpSerial()
return opSerial.BuiltInSerial, opSerial.Close
return opSerial.BuiltInSerial, opSerial.Reset, opSerial.Close
},
},
},
Expand All @@ -1471,9 +1472,9 @@ var supportedStringBuiltIns = []FuncNew{
retType: func(parameters []types.Type) types.Type {
return types.T_varchar.ToType()
},
newOpWithFree: func() (executeLogicOfOverload, executeFreeOfOverload) {
newOpWithFree: func() (executeLogicOfOverload, executeResetOfOverload, executeFreeOfOverload) {
opSerial := newOpSerial()
return opSerial.BuiltInSerialFull, opSerial.Close
return opSerial.BuiltInSerialFull, opSerial.Reset, opSerial.Close
},
},
},
Expand Down

0 comments on commit 72ffd89

Please sign in to comment.