Skip to content

Commit 797324b

Browse files
xiaochaoren1SongZhen0704
authored andcommitted
fix: show metrics error when table is event
1 parent 8029a37 commit 797324b

File tree

7 files changed

+56
-51
lines changed

7 files changed

+56
-51
lines changed

server/controller/recorder/pubsub/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ type Manager struct {
101101
func (m *Manager) Subscribe(pubSubType string, topic int, subscriber interface{}) error {
102102
ps, ok := m.TypeToPubSub[pubSubType]
103103
if !ok {
104-
log.Errorf("pubsub type not found: %d", pubSubType)
104+
log.Errorf("pubsub type not found: %s", pubSubType)
105105
return errors.New("pubsub type not found")
106106
}
107107
ps.Subscribe(topic, subscriber)

server/querier/engine/clickhouse/clickhouse.go

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/xwb1989/sqlparser"
3333
"golang.org/x/exp/slices"
3434

35+
ctlcommon "github.com/deepflowio/deepflow/server/controller/common"
3536
"github.com/deepflowio/deepflow/server/querier/common"
3637
"github.com/deepflowio/deepflow/server/querier/config"
3738
"github.com/deepflowio/deepflow/server/querier/engine/clickhouse/client"
@@ -104,6 +105,7 @@ type CHEngine struct {
104105
DerivativeGroupBy []string
105106
ORGID string
106107
Language string
108+
NativeField map[string]*metrics.Metrics
107109
}
108110

109111
func init() {
@@ -126,7 +128,6 @@ func (e *CHEngine) ExecuteQuery(args *common.QuerierParams) (*common.Result, map
126128
e.ORGID = args.ORGID
127129
}
128130
query_uuid := args.QueryUUID // FIXME: should be queryUUID
129-
log.Debugf("query_uuid: %s | raw sql: %s", query_uuid, sql)
130131
debug_info := &client.DebugInfo{}
131132
// Parse withSql
132133
withResult, withDebug, err := e.QueryWithSql(sql, args)
@@ -215,15 +216,13 @@ func (e *CHEngine) ExecuteQuery(args *common.QuerierParams) (*common.Result, map
215216
usedEngine.View.NoPreWhere = usedEngine.NoPreWhere
216217
}
217218
chSql := usedEngine.ToSQLString()
218-
log.Debug(chSql)
219219
callbacks := usedEngine.View.GetCallbacks()
220220
debug.Sql = chSql
221221
if !isShow {
222222
for _, ColumnSchema := range usedEngine.ColumnSchemas {
223223
ColumnSchemaMap[ColumnSchema.Name] = ColumnSchema
224224
}
225225
}
226-
log.Debug(ColumnSchemaMap)
227226
params := &client.QueryParams{
228227
Sql: chSql,
229228
UseQueryCache: args.UseQueryCache,
@@ -1232,6 +1231,38 @@ func (e *CHEngine) TransFrom(froms sqlparser.TableExprs) error {
12321231
table = strings.ReplaceAll(table, "vtap_acl", "traffic_policy")
12331232
}
12341233
e.Table = table
1234+
// native field
1235+
if config.ControllerCfg.DFWebService.Enabled && slices.Contains([]string{chCommon.DB_NAME_DEEPFLOW_ADMIN, chCommon.DB_NAME_DEEPFLOW_TENANT, chCommon.DB_NAME_APPLICATION_LOG, chCommon.DB_NAME_EXT_METRICS}, e.DB) || slices.Contains([]string{chCommon.TABLE_NAME_L7_FLOW_LOG, chCommon.TABLE_NAME_EVENT, chCommon.TABLE_NAME_PERF_EVENT}, e.Table) {
1236+
e.NativeField = map[string]*metrics.Metrics{}
1237+
getNativeUrl := fmt.Sprintf("http://localhost:%d/v1/native-fields/?db=%s&table_name=%s", config.ControllerCfg.ListenPort, e.DB, e.Table)
1238+
resp, err := ctlcommon.CURLPerform("GET", getNativeUrl, nil, ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, e.ORGID))
1239+
if err != nil {
1240+
log.Errorf("request controller failed: %s, URL: %s", resp, getNativeUrl)
1241+
} else {
1242+
resultArray := resp.Get("DATA").MustArray()
1243+
for i := range resultArray {
1244+
nativeMetric := resp.Get("DATA").GetIndex(i).Get("NAME").MustString()
1245+
displayName := resp.Get("DATA").GetIndex(i).Get("DISPLAY_NAME").MustString()
1246+
description := resp.Get("DATA").GetIndex(i).Get("DESCRIPTION").MustString()
1247+
fieldType := resp.Get("DATA").GetIndex(i).Get("FIELD_TYPE").MustInt()
1248+
if fieldType == chCommon.NATIVE_FIELD_TYPE_METRIC {
1249+
metric := metrics.NewMetrics(
1250+
0, nativeMetric,
1251+
displayName, displayName, displayName, "", "", "", metrics.METRICS_TYPE_COUNTER,
1252+
chCommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
1253+
)
1254+
e.NativeField[nativeMetric] = metric
1255+
} else {
1256+
metric := metrics.NewMetrics(
1257+
0, nativeMetric,
1258+
displayName, displayName, displayName, "", "", "", metrics.METRICS_TYPE_NAME_MAP["tag"],
1259+
chCommon.NATIVE_FIELD_CATEGORY_CUSTOM_TAG, []bool{true, true, true}, "", table, "", "", "", "", "",
1260+
)
1261+
e.NativeField[nativeMetric] = metric
1262+
}
1263+
}
1264+
}
1265+
}
12351266
// ext_metrics只有metrics表,使用virtual_table_name做过滤区分
12361267
if e.DB == "ext_metrics" {
12371268
table = "metrics"
@@ -1705,7 +1736,7 @@ func (e *CHEngine) parseSelectBinaryExpr(node sqlparser.Expr) (binary Function,
17051736
if fieldFunc != nil {
17061737
return fieldFunc, nil
17071738
}
1708-
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID)
1739+
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
17091740
if ok {
17101741
return &Field{Value: metricStruct.DBField}, nil
17111742
}
@@ -1818,7 +1849,7 @@ func (e *CHEngine) parseWhere(node sqlparser.Expr, w *Where, isCheck bool) (view
18181849
switch comparExpr.(type) {
18191850
case *sqlparser.ColName, *sqlparser.SQLVal:
18201851
whereTag := chCommon.ParseAlias(node.Left)
1821-
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID)
1852+
metricStruct, ok := metrics.GetMetrics(whereTag, e.DB, e.Table, e.ORGID, e.NativeField)
18221853
if ok && metricStruct.Type != metrics.METRICS_TYPE_TAG {
18231854
whereTag = metricStruct.DBField
18241855
}

server/querier/engine/clickhouse/client/client.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,6 @@ func (c *Client) DoQuery(params *QueryParams) (result *common.Result, err error)
221221
log.Error("Execute Callback %v Error: %v", callback, err)
222222
}
223223
}
224-
log.Debugf("sql: %s, query_uuid: %s", sqlstr, c.Debug.QueryUUID)
225224
log.Infof("query_uuid: %s. query api statistics: %d rows, %d columns, %d bytes, cost %f ms", c.Debug.QueryUUID, resRows, resColumns, resSize, float64(queryTime.Milliseconds()))
226225
return result, nil
227226
}

server/querier/engine/clickhouse/function.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func GetAggFunc(name string, args []string, alias string, derivativeArgs []strin
108108
if !ok {
109109
return nil, 0, "", nil
110110
}
111-
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID)
111+
metricStruct, ok := metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
112112
if !ok {
113113
return nil, 0, "", nil
114114
}
@@ -231,7 +231,7 @@ func GetTopKTrans(name string, args []string, alias string, e *CHEngine) (Statem
231231
var metricStruct *metrics.Metrics
232232
for i, field := range fields {
233233
field = strings.Trim(field, "`")
234-
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID)
234+
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
235235
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
236236
return nil, 0, "", nil
237237
}
@@ -334,7 +334,7 @@ func GetUniqTrans(name string, args []string, alias string, e *CHEngine) (Statem
334334
var metricStruct *metrics.Metrics
335335
for _, field := range fields {
336336
field = strings.Trim(field, "`")
337-
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID)
337+
metricStruct, ok = metrics.GetAggMetrics(field, e.DB, e.Table, e.ORGID, e.NativeField)
338338
if !ok || metricStruct.Type == metrics.METRICS_TYPE_ARRAY {
339339
return nil, 0, "", nil
340340
}

server/querier/engine/clickhouse/metrics/ext_common.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,14 @@ func GetExtMetrics(db, table, where, queryCacheTTL, orgID string, useQueryCache
7373
)
7474
loadMetrics[fmt.Sprintf("%s-%s", metricName, tableName)] = lm
7575
}
76-
lm := NewMetrics(
77-
len(loadMetrics), "metrics",
78-
"metrics", "metrics", "metrics", "", "", "", METRICS_TYPE_ARRAY,
79-
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "",
80-
)
81-
loadMetrics[fmt.Sprintf("%s-%s", "metrics", table)] = lm
76+
if !slices.Contains([]string{common.TABLE_NAME_EVENT, common.TABLE_NAME_PERF_EVENT}, table) {
77+
lm := NewMetrics(
78+
len(loadMetrics), "metrics",
79+
"metrics", "metrics", "metrics", "", "", "", METRICS_TYPE_ARRAY,
80+
common.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, "", "", "", "", "",
81+
)
82+
loadMetrics[fmt.Sprintf("%s-%s", "metrics", table)] = lm
83+
}
8284

8385
// native metrics
8486
if config.ControllerCfg.DFWebService.Enabled {

server/querier/engine/clickhouse/metrics/metrics.go

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525

2626
"golang.org/x/exp/slices"
2727

28-
ctlcommon "github.com/deepflowio/deepflow/server/controller/common"
2928
"github.com/deepflowio/deepflow/server/querier/common"
3029
"github.com/deepflowio/deepflow/server/querier/config"
3130
ckcommon "github.com/deepflowio/deepflow/server/querier/engine/clickhouse/common"
@@ -115,7 +114,7 @@ func NewReplaceMetrics(dbField string, condition string) *Metrics {
115114
}
116115
}
117116

118-
func GetAggMetrics(field, db, table, orgID string) (*Metrics, bool) {
117+
func GetAggMetrics(field, db, table, orgID string, nativeField map[string]*Metrics) (*Metrics, bool) {
119118
field = strings.Trim(field, "`")
120119
if field == COUNT_METRICS_NAME {
121120
return &Metrics{
@@ -128,7 +127,7 @@ func GetAggMetrics(field, db, table, orgID string) (*Metrics, bool) {
128127
Table: table,
129128
}, true
130129
}
131-
return GetMetrics(field, db, table, orgID)
130+
return GetMetrics(field, db, table, orgID, nativeField)
132131
}
133132

134133
func GetTagTypeMetrics(tagDescriptions *common.Result, newAllMetrics map[string]*Metrics, db, table, orgID string) error {
@@ -224,7 +223,7 @@ func GetTagTypeMetrics(tagDescriptions *common.Result, newAllMetrics map[string]
224223
return nil
225224
}
226225

227-
func GetMetrics(field, db, table, orgID string) (*Metrics, bool) {
226+
func GetMetrics(field, db, table, orgID string, nativeField map[string]*Metrics) (*Metrics, bool) {
228227
newAllMetrics := map[string]*Metrics{}
229228
field = strings.Trim(field, "`")
230229
// flow_tag database has no metrics
@@ -264,36 +263,10 @@ func GetMetrics(field, db, table, orgID string) (*Metrics, bool) {
264263
}
265264
} else {
266265
// native metrics
267-
if config.ControllerCfg.DFWebService.Enabled {
268-
getNativeUrl := fmt.Sprintf("http://localhost:%d/v1/native-fields/?db=%s&table_name=%s", config.ControllerCfg.ListenPort, db, table)
269-
resp, err := ctlcommon.CURLPerform("GET", getNativeUrl, nil, ctlcommon.WithHeader(ctlcommon.HEADER_KEY_X_ORG_ID, orgID))
270-
if err != nil {
271-
log.Errorf("request controller failed: %s, URL: %s", resp, getNativeUrl)
272-
} else {
273-
resultArray := resp.Get("DATA").MustArray()
274-
for i := range resultArray {
275-
nativeMetric := resp.Get("DATA").GetIndex(i).Get("NAME").MustString()
276-
displayName := resp.Get("DATA").GetIndex(i).Get("DISPLAY_NAME").MustString()
277-
description := resp.Get("DATA").GetIndex(i).Get("DESCRIPTION").MustString()
278-
fieldType := resp.Get("DATA").GetIndex(i).Get("FIELD_TYPE").MustInt()
279-
if nativeMetric == field {
280-
if fieldType == ckcommon.NATIVE_FIELD_TYPE_METRIC {
281-
metric := NewMetrics(
282-
0, field,
283-
displayName, displayName, displayName, "", "", "", METRICS_TYPE_COUNTER,
284-
ckcommon.NATIVE_FIELD_CATEGORY_METRICS, []bool{true, true, true}, "", table, description, description, description, "", "",
285-
)
286-
return metric, true
287-
} else {
288-
metric := NewMetrics(
289-
0, field,
290-
displayName, displayName, displayName, "", "", "", METRICS_TYPE_NAME_MAP["tag"],
291-
ckcommon.NATIVE_FIELD_CATEGORY_CUSTOM_TAG, []bool{true, true, true}, "", table, "", "", "", "", "",
292-
)
293-
return metric, true
294-
}
295-
}
296-
}
266+
if nativeField != nil {
267+
metric, ok := nativeField[field]
268+
if ok {
269+
return metric, true
297270
}
298271
}
299272
}

server/querier/engine/clickhouse/tag.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func GetPrometheusAllTagTranslator(e *CHEngine) (string, string, error) {
268268
}
269269

270270
func GetMetricsTag(name string, alias string, e *CHEngine) (Statement, error) {
271-
metricStruct, ok := metrics.GetMetrics(strings.Trim(name, "`"), e.DB, e.Table, e.ORGID)
271+
metricStruct, ok := metrics.GetMetrics(strings.Trim(name, "`"), e.DB, e.Table, e.ORGID, e.NativeField)
272272
if !ok {
273273
return nil, nil
274274
}

0 commit comments

Comments
 (0)