Skip to content

Conversation

reusee
Copy link
Contributor

@reusee reusee commented Sep 19, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #3433

What this PR does / why we need it:

add group aggregation spill


PR Type

Enhancement


Description

  • Add memory spill functionality to group aggregation operator

  • Implement spillable data structures and memory management

  • Add spill threshold configuration and memory usage tracking

  • Create comprehensive test coverage for spill scenarios


Diagram Walkthrough

flowchart LR
  A["Group Operator"] --> B["Memory Usage Monitor"]
  B --> C["Spill Threshold Check"]
  C --> D["SpillManager"]
  D --> E["SpillableAggState"]
  E --> F["Serialized Data"]
  F --> G["Memory Recovery"]
  G --> H["Merge Spilled Results"]
Loading

File Walkthrough

Relevant files
Enhancement
exec.go
Add spill initialization and integration logic                     

pkg/sql/colexec/group/exec.go

  • Initialize SpillManager and SpillThreshold in Prepare method
  • Add memory usage tracking and spill logic in batch consumption
  • Integrate spilled results merging in final result retrieval
+19/-0   
group_spill.go
Core spill implementation with state management                   

pkg/sql/colexec/group/group_spill.go

  • Implement core spill functionality with memory usage monitoring
  • Add spillPartialResults method to serialize and store aggregation
    state
  • Create mergeSpilledResults and restoreAndMergeSpilledAggregators
    methods
  • Handle batch reconstruction and aggregator state restoration
+364/-0 
spill.go
Define spill interfaces and types                                               

pkg/sql/colexec/group/spill.go

  • Define SpillableData interface for serializable data structures
  • Create SpillManager interface for spill operations
  • Establish SpillID type for spill identification
+33/-0   
spill_memory.go
Memory-based spill manager implementation                               

pkg/sql/colexec/group/spill_memory.go

  • Implement MemorySpillManager for in-memory spill storage
  • Add atomic memory tracking and thread-safe operations
  • Provide spill, retrieve, and delete functionality
+75/-0   
spillable_agg_state.go
Spillable aggregation state implementation                             

pkg/sql/colexec/group/spillable_agg_state.go

  • Implement SpillableAggState struct for aggregation state serialization
  • Add binary serialization and deserialization methods
  • Handle vector marshaling and type preservation
  • Provide memory estimation and cleanup functionality
+204/-0 
types.go
Add spill configuration to group types                                     

pkg/sql/colexec/group/types.go

  • Add SpillManager and SpillThreshold fields to Group struct
  • Extend container with spill-related state tracking fields
  • Update Free and cleanup methods to handle spill resources
+21/-0   
Tests
exec_test.go
Update test mock with size method                                               

pkg/sql/colexec/group/exec_test.go

  • Add Size() method to test aggregation executor mock
+4/-0     
spill_test.go
Comprehensive spill functionality tests                                   

pkg/sql/colexec/group/spill_test.go

  • Create comprehensive test cases for spill functionality
  • Test single spill cycle and multiple spill cycles
  • Verify memory cleanup and aggregation correctness
  • Include memory leak detection and validation
+160/-0 

Copy link

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis 🔶

3433 - Partially compliant

Compliant requirements:

  • Hash agg tracks memory usage.
  • Hash agg spills when exceeding threshold.

Non-compliant requirements:

  • Hash join must track memory usage and spill.

Requires further human verification:

  • Validate spill behavior under various workloads and configurations in an integrated environment.
  • Measure performance impact of spilling and merging on large datasets.
  • Confirm configuration wiring for SpillThreshold from system/session settings.
⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Memory Accounting

Memory usage tracking only sums selected structures and may miss allocations (e.g., result1 internal buffers, hash table overhead, temporary vectors). Verify accounting is sufficient to trigger spill at the right time and avoid OOM.

func (group *Group) updateMemoryUsage(proc *process.Process) {
	usage := int64(0)
	if !group.ctr.hr.IsEmpty() && group.ctr.hr.Hash != nil {
		usage += int64(group.ctr.hr.Hash.Size())
	}
	for _, bat := range group.ctr.result1.ToPopped {
		if bat != nil {
			usage += int64(bat.Size())
		}
	}
	for _, agg := range group.ctr.result1.AggList {
		if agg != nil {
			usage += agg.Size()
		}
	}
	group.ctr.currentMemUsage = usage
}
Merge Correctness

Merging spilled aggregators iterates per-row and uses currentGroupCount offset; ensure alignment between batches and agg states, and that GroupGrow and Merge indices are correct for all agg types and group-by shapes.

currentGroupCount := 0
for _, bat := range group.ctr.result1.ToPopped {
	if bat != nil {
		currentGroupCount += bat.RowCount()
	}
}

for i, tempAgg := range tempAggs {
	if tempAgg == nil {
		continue
	}

	currentAgg := group.ctr.result1.AggList[i]
	if currentAgg == nil {
		continue
	}

	for spilledGroupIdx := 0; spilledGroupIdx < spillState.GroupCount; spilledGroupIdx++ {
		currentGroupIdx := currentGroupCount + spilledGroupIdx
		if err := currentAgg.Merge(tempAgg, currentGroupIdx, spilledGroupIdx); err != nil {
			return err
		}
	}
}
Serialization Robustness

Custom binary format for vectors and types needs compatibility and error handling; confirm all vector encodings and null bitmaps are preserved and future-proof across versions.

func (s *SpillableAggState) Serialize() ([]byte, error) {
	buf := bytes.NewBuffer(nil)

	if err := binary.Write(buf, binary.LittleEndian, int32(s.GroupCount)); err != nil {
		return nil, err
	}

	if err := binary.Write(buf, binary.LittleEndian, int32(len(s.GroupVectors))); err != nil {
		return nil, err
	}

	if err := binary.Write(buf, binary.LittleEndian, int32(len(s.GroupVectorTypes))); err != nil {
		return nil, err
	}
	for _, typ := range s.GroupVectorTypes {
		typBytes, err := typ.MarshalBinary()
		if err != nil {
			return nil, err
		}
		if err := binary.Write(buf, binary.LittleEndian, int32(len(typBytes))); err != nil {
			return nil, err
		}
		if _, err := buf.Write(typBytes); err != nil {
			return nil, err
		}
	}

	for i, vec := range s.GroupVectors {
		if vec == nil {
			if err := binary.Write(buf, binary.LittleEndian, int32(0)); err != nil {
				return nil, err
			}
			continue
		}

		vecBytes, err := vec.MarshalBinary()
		if err != nil {
			return nil, err
		}
		if err := binary.Write(buf, binary.LittleEndian, int32(len(vecBytes))); err != nil {
			return nil, err
		}
		if _, err := buf.Write(vecBytes); err != nil {
			return nil, err
		}

		if i >= len(s.GroupVectorTypes) {
			s.GroupVectorTypes = append(s.GroupVectorTypes, *vec.GetType())
		}
	}

	if err := binary.Write(buf, binary.LittleEndian, int32(len(s.MarshaledAggStates))); err != nil {
		return nil, err
	}
	for _, aggState := range s.MarshaledAggStates {
		if err := binary.Write(buf, binary.LittleEndian, int32(len(aggState))); err != nil {
			return nil, err
		}
		if _, err := buf.Write(aggState); err != nil {
			return nil, err
		}
	}

	return buf.Bytes(), nil
}

Copy link

qodo-merge-pro bot commented Sep 19, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Implement disk-based spilling instead of in-memory

The current MemorySpillManager stores spilled data in memory, which fails to
reduce the overall memory footprint. A disk-based spill manager should be
implemented to properly offload data to disk.

Examples:

pkg/sql/colexec/group/spill_memory.go [24-46]
type MemorySpillManager struct {
	data     map[SpillID][]byte
	nextID   int64
	totalMem int64
}

func NewMemorySpillManager() *MemorySpillManager {
	return &MemorySpillManager{
		data: make(map[SpillID][]byte),
	}

 ... (clipped 13 lines)
pkg/sql/colexec/group/exec.go [67-69]
	if group.SpillManager == nil {
		group.SpillManager = NewMemorySpillManager()
	}

Solution Walkthrough:

Before:

// pkg/sql/colexec/group/spill_memory.go

type MemorySpillManager struct {
    data     map[SpillID][]byte
    // ...
}

func (m *MemorySpillManager) Spill(data SpillableData) (SpillID, error) {
    serialized, err := data.Serialize()
    if err != nil {
        return "", err
    }

    id := ... // generate new ID
    m.data[id] = serialized // Data is stored in an in-memory map
    return id, nil
}

After:

// A conceptual disk-based spill manager

type DiskSpillManager struct {
    dirPath string
    // ...
}

func (m *DiskSpillManager) Spill(data SpillableData) (SpillID, error) {
    serialized, err := data.Serialize()
    if err != nil {
        return "", err
    }

    id := ... // generate new ID
    filePath := path.Join(m.dirPath, string(id))
    err = os.WriteFile(filePath, serialized, 0600) // Write data to disk
    if err != nil {
        return "", err
    }
    return id, nil
}
Suggestion importance[1-10]: 9

__

Why: This suggestion correctly identifies a critical design flaw where the in-memory MemorySpillManager defeats the primary purpose of spilling, which is to reduce process memory pressure by offloading data to persistent storage.

High
Possible issue
Ensure thread-safe map access
Suggestion Impact:A sync.Mutex field was added to MemorySpillManager, and Lock/Unlock (with defer) were introduced around all accesses to the m.data map in Spill, Retrieve, Delete, and Free.

code diff:

 import (
 	"fmt"
+	"sync"
 	"sync/atomic"
 
 	"github.com/matrixorigin/matrixone/pkg/common/mpool"
@@ -25,6 +26,7 @@
 	data     map[SpillID][]byte
 	nextID   int64
 	totalMem int64
+	mu       sync.Mutex
 }
 
 func NewMemorySpillManager() *MemorySpillManager {
@@ -40,12 +42,17 @@
 	}
 
 	id := SpillID(fmt.Sprintf("spill_%d", atomic.AddInt64(&m.nextID, 1)))
+	m.mu.Lock()
+	defer m.mu.Unlock()
 	m.data[id] = serialized
 	atomic.AddInt64(&m.totalMem, int64(len(serialized)))
 	return id, nil
 }
 
 func (m *MemorySpillManager) Retrieve(id SpillID, mp *mpool.MPool) (SpillableData, error) {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
 	serialized, exists := m.data[id]
 	if !exists {
 		return nil, fmt.Errorf("spill data not found: %s", id)
@@ -60,6 +67,9 @@
 }
 
 func (m *MemorySpillManager) Delete(id SpillID) error {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
 	if serialized, exists := m.data[id]; exists {
 		atomic.AddInt64(&m.totalMem, -int64(len(serialized)))
 		delete(m.data, id)
@@ -68,6 +78,9 @@
 }
 
 func (m *MemorySpillManager) Free() {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
 	for id := range m.data {
 		m.Delete(id)
 	}

Add a sync.RWMutex to MemorySpillManager to protect the m.data map from
concurrent access and prevent race conditions.

pkg/sql/colexec/group/spill_memory.go [36-46]

 func (m *MemorySpillManager) Spill(data SpillableData) (SpillID, error) {
 	serialized, err := data.Serialize()
 	if err != nil {
 		return "", err
 	}
 
 	id := SpillID(fmt.Sprintf("spill_%d", atomic.AddInt64(&m.nextID, 1)))
+	m.mu.Lock()
 	m.data[id] = serialized
+	m.mu.Unlock()
 	atomic.AddInt64(&m.totalMem, int64(len(serialized)))
 	return id, nil
 }

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a race condition due to unprotected concurrent access to a map, which could lead to crashes. Adding a mutex is a critical fix for thread safety.

Medium
Avoid using a fallback type

In the Deserialize method of SpillableAggState, instead of falling back to
types.T_any.ToType() when type information is missing for a vector, return an
error to prevent potential data corruption.

pkg/sql/colexec/group/spillable_agg_state.go [148-153]

 		var vecType types.Type
 		if i < len(s.GroupVectorTypes) {
 			vecType = s.GroupVectorTypes[i]
 		} else {
-			vecType = types.T_any.ToType()
+			return fmt.Errorf("missing type information for group vector at index %d", i)
 		}
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that using a fallback type T_any during deserialization can hide data corruption issues; failing fast by returning an error is a much safer and more robust approach.

Medium
Remove side effect from serialization

In the Serialize method of SpillableAggState, remove the code that modifies
s.GroupVectorTypes to eliminate side effects during serialization.

pkg/sql/colexec/group/spillable_agg_state.go [79-81]

-		if i >= len(s.GroupVectorTypes) {
-			s.GroupVectorTypes = append(s.GroupVectorTypes, *vec.GetType())
-		}
 
+
  • Apply / Chat
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that the Serialize method has a side effect, which is poor practice and can lead to bugs. Removing it improves code quality and predictability.

Low
  • Update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement Review effort 4/5 size/XL Denotes a PR that changes [1000, 1999] lines
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants