// 检查Arrow Schema是否已初始化
if p.arrowSchema == nil {
return fmt.Errorf("arrow schema not initialized")
}
// 检查Arrow写入器是否已初始化
if p.arrowWriter == nil {
return fmt.Errorf("arrow writer not initialized")
}
// 当前时间
now := time.Now().UnixMilli()
// 优化Arrow Record构建器的使用
// 每次创建新的RecordBuilder以确保内存正确释放
if p.recordBuilder != nil {
p.recordBuilder.Release()
p.recordBuilder = nil
}
p.recordBuilder = array.NewRecordBuilder(p.mem, p.arrowSchema)
// 按字段填充数据
for i, field := range p.arrowSchema.Fields() {
fieldName := field.Name
// timestamp字段特殊处理
if fieldName == "timestamp" {
builder := p.recordBuilder.Field(i).(*array.Int64Builder)
for j := 0; j < len(entities); j++ {
builder.Append(now)
}
continue
}
// 根据字段类型填充数据
switch field.Type.ID() {
case arrow.STRING:
builder := p.recordBuilder.Field(i).(*array.StringBuilder)
for _, entity := range entities {
val, ok := entity.Values[fieldName]
if !ok || val == nil {
builder.AppendNull()
} else {
var strVal string
switch v := val.(type) {
case string:
strVal = v
default:
strVal = fmt.Sprintf("%v", v)
}
builder.Append(strVal)
}
}
case arrow.INT32:
builder := p.recordBuilder.Field(i).(*array.Int32Builder)
for _, entity := range entities {
val, ok := entity.Values[fieldName]
if !ok || val == nil {
builder.AppendNull()
} else {
var intVal int32
switch v := val.(type) {
case int:
intVal = int32(v)
case int64:
intVal = int32(v)
case float64:
intVal = int32(v)
case string:
var f float64
if _, err := fmt.Sscanf(v, "%f", &f); err == nil {
intVal = int32(f)
}
}
builder.Append(intVal)
}
}
case arrow.INT64:
builder := p.recordBuilder.Field(i).(*array.Int64Builder)
for _, entity := range entities {
val, ok := entity.Values[fieldName]
if !ok || val == nil {
builder.AppendNull()
} else {
var int64Val int64
switch v := val.(type) {
case int:
int64Val = int64(v)
case int64:
int64Val = v
case float64:
int64Val = int64(v)
case time.Time:
int64Val = v.UnixMilli()
case string:
var f float64
if _, err := fmt.Sscanf(v, "%f", &f); err == nil {
int64Val = int64(f)
} else {
// 尝试解析时间字符串
formats := []string{
time.RFC3339,
"2006-01-02 15:04:05",
"2006/01/02 15:04:05",
"01/02/2006 15:04:05",
"02/01/2006 15:04:05",
}
for _, format := range formats {
if t, err := time.Parse(format, v); err == nil {
int64Val = t.UnixMilli()
break
}
}
}
}
builder.Append(int64Val)
}
}
case arrow.FLOAT64:
builder := p.recordBuilder.Field(i).(*array.Float64Builder)
for _, entity := range entities {
val, ok := entity.Values[fieldName]
if !ok || val == nil {
builder.AppendNull()
} else {
var floatVal float64
switch v := val.(type) {
case float64:
floatVal = v
case float32:
floatVal = float64(v)
case int:
floatVal = float64(v)
case int64:
floatVal = float64(v)
case string:
fmt.Sscanf(v, "%f", &floatVal)
}
builder.Append(floatVal)
}
}
case arrow.BOOL:
builder := p.recordBuilder.Field(i).(*array.BooleanBuilder)
for _, entity := range entities {
val, ok := entity.Values[fieldName]
if !ok || val == nil {
builder.AppendNull()
} else {
var boolVal bool
switch v := val.(type) {
case bool:
boolVal = v
case string:
switch v {
case "true", "True", "TRUE", "1":
boolVal = true
case "false", "False", "FALSE", "0":
boolVal = false
}
case int:
boolVal = v != 0
case float64:
boolVal = v != 0
}
builder.Append(boolVal)
}
}
default:
log.Printf("警告: 字段 %s 的类型 %s 不支持", fieldName, field.Type.ID())
}
}
// 构建Arrow RecordBatch
recordBatch := p.recordBuilder.NewRecordBatch()
defer recordBatch.Release()
// 使用Arrow Go v18最新的方式写入Parquet文件
if err := p.arrowWriter.Write(recordBatch); err != nil {
return fmt.Errorf("failed to write record batch to parquet: %w", err)
}
// 更新写入计数器和时间
p.writeCount++
p.lastFlushTime = time.Now()
p.currentRowGroupSize += int64(len(entities))
p.totalRecords += int64(len(entities))
// 温和的内存管理策略:记录写入统计信息
if p.writeCount%1000 == 0 {
log.Printf("ParquetProcessor[%d] 已写入 %d 次,行组大小: %d,总记录: %d",
p.instanceID, p.writeCount, p.currentRowGroupSize, p.totalRecords)
}
// 内存优化策略:通过控制批处理大小来减少内存压力
if len(entities) > 1000 {
log.Printf("ParquetProcessor[%d] 警告:批处理大小较大 (%d),可能增加内存压力", p.instanceID, len(entities))
}
// 🔥 核心内存优化:智能批处理级别的C++资源释放
// 每10批数据或每1000条记录后释放C++对象资源,平衡性能和内存
if p.writeCount%10 == 0 || len(entities) >= 1000 {
if err := p.forceCppMemoryRelease(); err != nil {
log.Printf("ParquetProcessor[%d] C++资源释放失败: %v", p.instanceID, err)
// 不返回错误,继续处理后续数据
}
}
// 层级2: 每10万条记录进行深度内存释放(文件分段)- 降低阈值以防止内存溢出
/*if p.totalRecords > 0 && p.totalRecords%100000 == 0 {
log.Printf("ParquetProcessor[%d] 总记录数达到 %d,开始深度内存释放", p.instanceID, p.totalRecords)
if err := p.forceRowGroupFlush(); err != nil {
log.Printf("ParquetProcessor[%d] 深度内存释放失败: %v", p.instanceID, err)
} else {
p.currentRowGroupSize = 0 // 重置行组计数器
log.Printf("ParquetProcessor[%d] 深度内存释放成功,新文件已创建", p.instanceID)
}
}*/
// 调用TotalCounter.CountN方法,记录成功写入的记录数
(*p.TotalCounter).CountN(int64(len(entities)))
return nil
Describe the bug, including details regarding any error messages, version, and platform.
the momry is increasing unitil there is no free memory to use
the witie code as follows
`func (p *ParquetProcessor) flushBatch(entities []model.Event) error {
if len(entities) == 0 {
return nil
}
}`
Component(s)
Parquet