Skip to content

Commit

Permalink
fix: stats countable leak
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed Dec 17, 2024
1 parent ae69aa5 commit 490043d
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 10 deletions.
3 changes: 2 additions & 1 deletion server/ingester/event/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func NewDecoder(
}
}
return &Decoder{
index: index,
eventType: eventType,
platformData: platformData,
inQueue: inQueue,
Expand All @@ -109,7 +110,7 @@ func (d *Decoder) GetCounter() interface{} {
func (d *Decoder) Run() {
log.Infof("event (%s) decoder run", d.eventType)
ingestercommon.RegisterCountableForIngester("decoder", d, stats.OptionStatTags{
"event_type": d.eventType.String()})
"index": strconv.Itoa(d.index), "event_type": d.eventType.String()})
buffer := make([]interface{}, BUFFER_SIZE)
decoder := &codec.SimpleDecoder{}
for {
Expand Down
2 changes: 1 addition & 1 deletion server/ingester/ext_metrics/dbwriter/ext_metrics_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func NewExtMetricsWriter(
table := s.GenCKTable(w.ckdbCluster, w.ckdbStoragePolicy, config.Base.CKDB.Type, w.ttl, ckdb.GetColdStorage(w.ckdbColdStorages, s.DatabaseName(), s.TableName()))
ckwriter, err := ckwriter.NewCKWriter(
*w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword,
fmt.Sprintf("%s-%s-%d", w.msgType, s.TableName(), w.decoderIndex), w.ckdbTimeZone,
fmt.Sprintf("%s-%s-%d", w.msgType, flowTagTablePrefix, w.decoderIndex), w.ckdbTimeZone,
table, w.writerConfig.QueueCount, w.writerConfig.QueueSize, w.writerConfig.BatchSize, w.writerConfig.FlushTimeout, w.ckdbWatcher)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions server/ingester/flow_log/flow_log/flow_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func NewLogger(msgType datatype.MessageType, config *config.Config, platformData
if err != nil {
return nil, err
}
appServiceTagWriter, err := flow_tag.NewAppServiceTagWriter(i, common.FLOW_LOG_DB, config.FlowLogTTL.L7FlowLog, ckdb.TimeFuncTwelveHour, config.Base)
appServiceTagWriter, err := flow_tag.NewAppServiceTagWriter(i, common.FLOW_LOG_DB, msgType.String(), config.FlowLogTTL.L7FlowLog, ckdb.TimeFuncTwelveHour, config.Base)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func NewL7FlowLogger(config *config.Config, platformDataManager *grpc.PlatformDa
if err != nil {
return nil, err
}
appServiceTagWriter, err = flow_tag.NewAppServiceTagWriter(i, common.FLOW_LOG_DB, config.FlowLogTTL.L7FlowLog, ckdb.TimeFuncTwelveHour, config.Base)
appServiceTagWriter, err = flow_tag.NewAppServiceTagWriter(i, common.FLOW_LOG_DB, msgType.String(), config.FlowLogTTL.L7FlowLog, ckdb.TimeFuncTwelveHour, config.Base)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion server/ingester/flow_metrics/flow_metrics/flow_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewFlowMetrics(cfg *config.Config, recv *receiver.Receiver, platformDataMan
if err != nil {
return nil, err
}
appServiceTagWriter, err := flow_tag.NewAppServiceTagWriter(i, ckdb.METRICS_DB, cfg.FlowMetricsTTL.VtapApp1M, ckdb.TimeFuncTwelveHour, cfg.Base)
appServiceTagWriter, err := flow_tag.NewAppServiceTagWriter(i, ckdb.METRICS_DB, datatype.MESSAGE_TYPE_METRICS.String(), cfg.FlowMetricsTTL.VtapApp1M, ckdb.TimeFuncTwelveHour, cfg.Base)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions server/ingester/flow_tag/app_service_tag_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type AppServiceTagWriter struct {

func NewAppServiceTagWriter(
decoderIndex int,
db string,
db, msgType string,
ttl int,
partition ckdb.TimeFuncType,
config *config.Config) (*AppServiceTagWriter, error) {
Expand All @@ -76,7 +76,7 @@ func NewAppServiceTagWriter(
tableName := fmt.Sprintf("%s_app_service", db)
w.ckwriter, err = ckwriter.NewCKWriter(
*w.ckdbAddrs, w.ckdbUsername, w.ckdbPassword,
fmt.Sprintf("tag-%s-%d", tableName, decoderIndex),
fmt.Sprintf("tag-%s-%s-%d", tableName, msgType, decoderIndex),
config.CKDB.TimeZone,
GenAppServiceTagCKTable(config.CKDB.ClusterName, config.CKDB.StoragePolicy, tableName, config.CKDB.Type, ttl, partition),
WRITER_QUEUE_COUNT, WRITER_QUEUE_SIZE, WRITER_BATCH_SIZE, WRITER_FLUSH_TIMEOUT, config.CKDB.Watcher)
Expand All @@ -85,7 +85,7 @@ func NewAppServiceTagWriter(
}
w.ckwriter.Run()

common.RegisterCountableForIngester("app_service_tag_writer", w, stats.OptionStatTags{"type": tableName, "decoder_index": strconv.Itoa(decoderIndex)})
common.RegisterCountableForIngester("app_service_tag_writer", w, stats.OptionStatTags{"type": msgType, "decoder_index": strconv.Itoa(decoderIndex)})
return w, nil
}

Expand Down
2 changes: 1 addition & 1 deletion server/ingester/flow_tag/flow_tag_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func NewFlowTagWriter(
w.ckwriters[tagType].Run()
}

common.RegisterCountableForIngester("flow_tag_writer", w, stats.OptionStatTags{"type": name, "decoder_index": strconv.Itoa(decoderIndex)})
common.RegisterCountableForIngester("flow_tag_writer", w, stats.OptionStatTags{"type": srcDB + "_" + name, "decoder_index": strconv.Itoa(decoderIndex)})
return w, nil
}

Expand Down
2 changes: 1 addition & 1 deletion server/ingester/profile/profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewProfiler(msgType datatype.MessageType, config *config.Config, platformDa
}
debug.ServerRegisterSimple(ingesterctl.CMD_PLATFORMDATA_PROFILE, platformDatas[i])
}
appServiceTagWriter, err := flow_tag.NewAppServiceTagWriter(i, dbwriter.PROFILE_DB, config.ProfileTTL, ckdb.TimeFuncTwelveHour, config.Base)
appServiceTagWriter, err := flow_tag.NewAppServiceTagWriter(i, dbwriter.PROFILE_DB, msgType.String(), config.ProfileTTL, ckdb.TimeFuncTwelveHour, config.Base)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 490043d

Please sign in to comment.