Skip to content

Commit 849a921

Browse files
authored
chore: Release column vectors and their arrays correctly (#19496)
Signed-off-by: Christian Haudum <[email protected]>
1 parent 1cc3245 commit 849a921

File tree

10 files changed

+31
-6
lines changed

10 files changed

+31
-6
lines changed

pkg/engine/internal/executor/expressions.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ var _ ColumnVector = (*Array)(nil)
259259

260260
// ToArray implements ColumnVector.
261261
func (a *Array) ToArray() arrow.Array {
262+
a.array.Retain()
262263
return a.array
263264
}
264265

pkg/engine/internal/executor/expressions_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ func TestEvaluateBinaryExpression(t *testing.T) {
216216
func collectBooleanColumnVector(vec ColumnVector) []bool {
217217
res := make([]bool, 0, vec.Len())
218218
arr := vec.ToArray().(*array.Boolean)
219+
defer arr.Release()
219220
for i := range int(vec.Len()) {
220221
res = append(res, arr.Value(i))
221222
}
@@ -326,8 +327,10 @@ null,null,null`
326327
colVec, err := e.eval(colExpr, record)
327328
require.NoError(t, err)
328329
require.IsType(t, &CoalesceVector{}, colVec)
330+
defer colVec.Release()
329331

330332
arr := colVec.ToArray()
333+
defer arr.Release()
331334
require.IsType(t, &array.String{}, arr)
332335
stringArr := arr.(*array.String)
333336

pkg/engine/internal/executor/filter.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,21 @@ func NewFilterPipeline(filter *physical.Filter, input Pipeline, evaluator expres
2424
cols := make([]*array.Boolean, 0, len(filter.Predicates))
2525

2626
for i, pred := range filter.Predicates {
27-
res, err := evaluator.eval(pred, batch)
27+
vec, err := evaluator.eval(pred, batch)
2828
if err != nil {
2929
return nil, err
3030
}
31-
data := res.ToArray()
31+
defer vec.Release()
3232

33+
arr := vec.ToArray()
3334
// boolean filters are only used for filtering; they're not returned
3435
// and must be released
35-
defer data.Release()
36+
defer arr.Release()
3637

37-
if data.DataType().ID() != arrow.BOOL {
38-
return nil, fmt.Errorf("predicate %d returned non-boolean type %s", i, data.DataType())
38+
if arr.DataType().ID() != arrow.BOOL {
39+
return nil, fmt.Errorf("predicate %d returned non-boolean type %s", i, arr.DataType())
3940
}
40-
casted := data.(*array.Boolean)
41+
casted := arr.(*array.Boolean)
4142
cols = append(cols, casted)
4243
}
4344

pkg/engine/internal/executor/functions.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ type arrayType[T comparable] interface {
161161
IsNull(int) bool
162162
Value(int) T
163163
Len() int
164+
Release()
164165
}
165166

166167
// genericFunction is a struct that implements the [BinaryFunction] interface methods
@@ -179,11 +180,13 @@ func (f *genericFunction[E, T]) Evaluate(lhs ColumnVector, rhs ColumnVector) (Co
179180
if !ok {
180181
return nil, arrow.ErrType
181182
}
183+
defer lhsArr.Release()
182184

183185
rhsArr, ok := rhs.ToArray().(E)
184186
if !ok {
185187
return nil, arrow.ErrType
186188
}
189+
defer rhsArr.Release()
187190

188191
mem := memory.NewGoAllocator()
189192
builder := array.NewBooleanBuilder(mem)

pkg/engine/internal/executor/functions_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ func createFloat64Array(mem memory.Allocator, values []float64, nulls []bool) *A
120120
// Helper function to extract boolean values from result
121121
func extractBoolValues(result ColumnVector) []bool {
122122
arr := result.ToArray().(*array.Boolean)
123+
defer arr.Release()
124+
123125
values := make([]bool, arr.Len())
124126
for i := 0; i < arr.Len(); i++ {
125127
if arr.IsNull(i) {

pkg/engine/internal/executor/project.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva
4040
if err != nil {
4141
return nil, err
4242
}
43+
defer vec.Release()
4344
ident := semconv.NewIdentifier(columnNames[i], vec.ColumnType(), vec.Type())
4445
fields = append(fields, semconv.FieldFromIdent(ident, true))
4546
arr := vec.ToArray()

pkg/engine/internal/executor/range_aggregation.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro
163163
if err != nil {
164164
return nil, err
165165
}
166+
defer vec.Release()
166167

167168
if vec.Type() != types.Loki.String {
168169
return nil, fmt.Errorf("unsupported datatype for partitioning %s", vec.Type())
@@ -179,6 +180,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro
179180
if err != nil {
180181
return nil, err
181182
}
183+
defer tsVec.Release()
182184
tsCol := tsVec.ToArray().(*array.Timestamp)
183185
defer tsCol.Release()
184186

pkg/engine/internal/executor/sortmerge.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,14 @@ loop:
154154
if err != nil {
155155
return nil, err
156156
}
157+
defer col.Release()
158+
157159
tsCol, ok := col.ToArray().(*array.Timestamp)
158160
if !ok {
159161
return nil, errors.New("column is not a timestamp column")
160162
}
161163
ts := tsCol.Value(int(p.offsets[i]))
164+
tsCol.Release()
162165

163166
// Populate slices for sorting
164167
inputIndexes = append(inputIndexes, i)
@@ -199,11 +202,13 @@ loop:
199202
if err != nil {
200203
return nil, err
201204
}
205+
defer col.Release()
202206
// We assume the column is a Uint64 array
203207
tsCol, ok := col.ToArray().(*array.Timestamp)
204208
if !ok {
205209
return nil, errors.New("column is not a timestamp column")
206210
}
211+
defer tsCol.Release()
207212

208213
// Calculate start/end of the sub-slice of the record
209214
start := p.offsets[j]

pkg/engine/internal/executor/sortmerge_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ func TestSortMerge(t *testing.T) {
8080

8181
tsCol, err := c.evaluator.eval(merge.Column, batch)
8282
require.NoError(t, err)
83+
defer tsCol.Release()
8384
arr := tsCol.ToArray().(*array.Timestamp)
85+
defer arr.Release()
8486

8587
timestamps = append(timestamps, arr.Values()...)
8688
batches++
@@ -126,8 +128,10 @@ func TestSortMerge(t *testing.T) {
126128
}
127129

128130
tsCol, err := c.evaluator.eval(merge.Column, batch)
131+
defer tsCol.Release()
129132
require.NoError(t, err)
130133
arr := tsCol.ToArray().(*array.Timestamp)
134+
defer arr.Release()
131135

132136
timestamps = append(timestamps, arr.Values()...)
133137
batches++

pkg/engine/internal/executor/vector_aggregate.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err
102102
if err != nil {
103103
return nil, err
104104
}
105+
defer tsVec.Release()
105106
tsCol := tsVec.ToArray().(*array.Timestamp)
106107
defer tsCol.Release()
107108

@@ -110,6 +111,7 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err
110111
if err != nil {
111112
return nil, err
112113
}
114+
defer valueVec.Release()
113115
valueArr := valueVec.ToArray().(*array.Float64)
114116
defer valueArr.Release()
115117

@@ -121,6 +123,7 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.Record, err
121123
if err != nil {
122124
return nil, err
123125
}
126+
defer vec.Release()
124127

125128
if vec.Type() != types.Loki.String {
126129
return nil, fmt.Errorf("unsupported datatype for grouping %s", vec.Type())

0 commit comments

Comments
 (0)