Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: agent group config gets l7 protocols from ck #9188

Merged
merged 5 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/controller/config/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package common

type Swagger struct {
Enabled bool `default:"false" yaml:"enabled"`
Enabled bool `default:"true" yaml:"enabled"`
}

// TODO use this
Expand Down
13 changes: 4 additions & 9 deletions server/controller/db/metadb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func GetDB(orgID int) (*DB, error) {
return GetDBs().NewDBIfNotExists(orgID)
}

func RemoveDB(orgID int) {
GetDBs().Delete(orgID)
}

func GetConfig() config.Config {
return GetDBs().GetConfig()
}
Expand Down Expand Up @@ -198,15 +202,6 @@ func (c *DBs) check(db *DB) error {
return edition.CheckDBVersion(db.DB, db.Config)
}

func (c *DBs) DoOnAllDBs(execFunc func(db *DB) error) error {
for _, db := range dbs.orgIDToDB {
if err := execFunc(db); err != nil {
return fmt.Errorf("org(id:%d, name:%s) %s", db.ORGID, db.Name, err.Error())
}
}
return nil
}

type DBNameLogPrefix struct {
Name string
}
Expand Down
19 changes: 19 additions & 0 deletions server/controller/db/metadb/org.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,22 @@ func getTagNameFromTag(tag, prefix string) string {
func GetColumnNameFromTag(tag string) string {
return getTagNameFromTag(tag, "column:")
}

func DoOnAllDBs(execFunc func(*DB) error) error {
orgIDs, err := GetORGIDs()
if err != nil {
return err
}
for _, orgID := range orgIDs {
db, err := GetDB(orgID)
if err != nil {
log.Errorf("failed to get db info: %v", err, db.LogPrefixORGID, db.LogPrefixName)
continue
}
if err := execFunc(db); err != nil {
log.Errorf("failed to execute function: %v", err, db.LogPrefixORGID, db.LogPrefixName)
return err
}
}
return nil
}
4 changes: 0 additions & 4 deletions server/controller/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ func (s *Server) RegisterRouters() {
i.RegisterTo(s.engine)
}
trouter.RegisterTo(s.engine)

for _, route := range s.engine.Routes() {
log.Infof(" TODO method: %s, path: %s", route.Method, route.Path)
}
}

func (s *Server) appendRegistrant() []registrant.Registrant {
Expand Down
46 changes: 41 additions & 5 deletions server/controller/http/service/agent_group_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"bytes"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"

"github.com/google/uuid"
Expand All @@ -33,6 +35,7 @@ import (
"github.com/deepflowio/deepflow/server/controller/db/metadb/model"
httpcommon "github.com/deepflowio/deepflow/server/controller/http/common"
"github.com/deepflowio/deepflow/server/controller/trisolaris/refresh"
querierConfig "github.com/deepflowio/deepflow/server/querier/config"
)

var (
Expand Down Expand Up @@ -141,11 +144,11 @@ func (a *AgentGroupConfig) GetAgentGroupConfigTemplateJson() ([]byte, error) {
if err := yaml.Unmarshal(domainKeyToInfoYamlBytes, &domainKeyToInfoNode); err != nil {
return nil, err
}
// TODO get from ck
l7Protocols := []string{
"HTTP", "HTTP2", "Dubbo", "gRPC", "SOFARPC", "FastCGI", "bRPC", "Tars", "Some/IP", "MySQL", "PostgreSQL",
"Oracle", "Redis", "MongoDB", "Memcached", "Kafka", "MQTT", "AMQP", "OpenWire", "NATS", "Pulsar", "ping", "ZMTP",
"RocketMQ", "DNS", "TLS", "Custom"}

l7Protocols, err := a.getL7ProtocolsFromCK(dbInfo)
if err != nil {
return nil, err
}
l7ProtocolsYamlBytes, err := yaml.Marshal(l7Protocols)
if err != nil {
return nil, err
Expand All @@ -170,6 +173,39 @@ func (a *AgentGroupConfig) GetAgentGroupConfigTemplateJson() ([]byte, error) {
return agentconf.ConvertTemplateYAMLToJSON(dynamicOptions)
}

func (a *AgentGroupConfig) getL7ProtocolsFromCK(db *metadb.DB) ([]string, error) {
reqBody := url.Values{
"db": {"flow_log"},
"sql": {"show tag l7_protocol values from l7_flow_log"},
}
url := fmt.Sprintf("http://%s:%d/v1/query", common.GetPodIP(), querierConfig.Cfg.ListenPort)
resp, err := common.CURLForm(
http.MethodPost,
url,
reqBody,
common.WithORGHeader(strconv.Itoa(db.GetORGID())),
)
if err != nil {
log.Errorf("failed to get l7 protocols from ck: %v", err, db.LogPrefixORGID)
return nil, err
}
result := resp.Get("result")
var displayNameIdx int
cols := result.Get("columns").MustArray()
for i, col := range cols {
if col == "display_name" {
displayNameIdx = i
}
}

l7Protocols := make([]string, 0)
for i := range resp.Get("result").Get("values").MustArray() {
value := resp.Get("result").Get("values").GetIndex(i)
l7Protocols = append(l7Protocols, value.GetIndex(displayNameIdx).MustString())
}
return l7Protocols, nil
}

func (a *AgentGroupConfig) GetAgentGroupConfig(groupLcuuid string, dataType int) ([]byte, error) {
dbInfo, err := metadb.GetDB(a.resourceAccess.UserInfo.ORGID)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/controller/monitor/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *AnalyzerCheck) Start(sCtx context.Context) {
for {
select {
case <-ticker.C:
if err := metadb.GetDBs().DoOnAllDBs(func(db *metadb.DB) error {
if err := metadb.DoOnAllDBs(func(db *metadb.DB) error {
// 数据节点健康检查
c.healthCheck(db)
// 检查没有分配数据节点的采集器,并进行分配
Expand Down
2 changes: 1 addition & 1 deletion server/controller/monitor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *ControllerCheck) Start(sCtx context.Context) {
for {
select {
case <-ticker.C:
if err := metadb.GetDBs().DoOnAllDBs(func(db *metadb.DB) error {
if err := metadb.DoOnAllDBs(func(db *metadb.DB) error {
// 控制器健康检查
c.healthCheck(db)
// 检查没有分配控制器的采集器,并进行分配
Expand Down
2 changes: 1 addition & 1 deletion server/controller/monitor/license/pseudo_license.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (v *VTapLicenseAllocation) Start(sCtx context.Context) {
for {
select {
case <-ticker.C:
if err := metadb.GetDBs().DoOnAllDBs(func(db *metadb.DB) error {
if err := metadb.DoOnAllDBs(func(db *metadb.DB) error {
v.allocLicense(db)
return nil
}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/controller/monitor/vtap/vtap.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (v *VTapCheck) Start(sCtx context.Context) {
for {
select {
case <-ticker.C:
metadb.GetDBs().DoOnAllDBs(func(db *metadb.DB) error {
metadb.DoOnAllDBs(func(db *metadb.DB) error {
// check launch_server resource if exist
v.launchServerCheck(db)
// check vtap type
Expand Down
8 changes: 6 additions & 2 deletions server/controller/tagrecorder/ch_app_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ func (l *ChAPPLabel) generateNewData(db *metadb.DB) (map[PrometheusAPPLabelKey]m
for _, prometheusLabel := range prometheusLabels {
labelName := prometheusLabel.Name
if slices.Contains(appLabelSlice, labelName) {
labelNameID := labelNameIDMap[labelName]
labelNameID, nameOK := labelNameIDMap[labelName]
labelValue := prometheusLabel.Value
labelValueID := valueNameIDMap[labelValue]
labelValueID, valueOK := valueNameIDMap[labelValue]
if !nameOK || !valueOK {
log.Warningf("label name or value not found in db, labelName: %s, labelValue: %s", labelName, labelValue)
continue
}
keyToItem[PrometheusAPPLabelKey{LabelNameID: labelNameID, LabelValueID: labelValueID}] = metadbmodel.ChAPPLabel{
LabelNameID: labelNameID,
LabelValue: labelValue,
Expand Down
2 changes: 1 addition & 1 deletion server/controller/tagrecorder/ch_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ func (c *ChCustomServiceDevice) sourceToTarget(md *message.Metadata, source *met
// onResourceUpdated implements SubscriberDataGenerator
func (c *ChCustomServiceDevice) onResourceUpdated(sourceID int, fieldsUpdate *message.CustomServiceFieldsUpdate, db *metadb.DB) {
updateInfo := make(map[string]interface{})

log.Infof("TODO fieldsUpdate.Name: %#v", fieldsUpdate.Name, db.LogPrefixORGID)
if fieldsUpdate.Name.IsDifferent() {
updateInfo["name"] = fieldsUpdate.Name.GetNew()
}
Expand Down
2 changes: 1 addition & 1 deletion server/controller/tagrecorder/check/tagrecorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type TagRecorder struct {

func (c *TagRecorder) Check() {
go func() {
if err := metadb.GetDBs().DoOnAllDBs(func(db *metadb.DB) error {
if err := metadb.DoOnAllDBs(func(db *metadb.DB) error {
t := time.Now()
log.Infof("database=%s tagrecorder health check data run", db.Name, db.LogPrefixORGID)
tagrecorder.GetTeamInfo(db)
Expand Down
31 changes: 19 additions & 12 deletions server/controller/tagrecorder/db_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
)

type operator[MT MySQLChModel, KT ChModelKey] interface {
batchPage(keys []KT, items []MT, operateFunc func([]KT, []MT, *metadb.DB), db *metadb.DB)
add(keys []KT, dbItems []MT, db *metadb.DB)
update(oldDBItem MT, updateInfo map[string]interface{}, key KT, db *metadb.DB)
delete(keys []KT, dbItems []MT, db *metadb.DB)
batchPage(keys []KT, items []MT, operateFunc func([]KT, []MT, *metadb.DB) error, db *metadb.DB) error
add(keys []KT, dbItems []MT, db *metadb.DB) error
update(oldDBItem MT, updateInfo map[string]interface{}, key KT, db *metadb.DB) error
delete(keys []KT, dbItems []MT, db *metadb.DB) error
setConfig(config.ControllerConfig)
}

Expand All @@ -46,7 +46,7 @@ func (b *operatorComponent[MT, KT]) setConfig(cfg config.ControllerConfig) {
b.cfg = cfg
}

func (b *operatorComponent[MT, KT]) batchPage(keys []KT, items []MT, operateFunc func([]KT, []MT, *metadb.DB), db *metadb.DB) {
func (b *operatorComponent[MT, KT]) batchPage(keys []KT, items []MT, operateFunc func([]KT, []MT, *metadb.DB) error, db *metadb.DB) error {
count := len(items)
offset := b.cfg.TagRecorderCfg.MySQLBatchSize
var pages int
Expand All @@ -61,35 +61,42 @@ func (b *operatorComponent[MT, KT]) batchPage(keys []KT, items []MT, operateFunc
if end > count {
end = count
}
operateFunc(keys[start:end], items[start:end], db)
err := operateFunc(keys[start:end], items[start:end], db)
if err != nil {
return err
}
}
return nil
}

func (b *operatorComponent[MT, KT]) add(keys []KT, dbItems []MT, db *metadb.DB) {
func (b *operatorComponent[MT, KT]) add(keys []KT, dbItems []MT, db *metadb.DB) error {
err := db.Clauses(clause.OnConflict{
UpdateAll: true,
}).Create(&dbItems).Error
if err != nil {
log.Errorf("add %s (keys: %+v values: %+v) failed: %s", b.resourceTypeName, keys, dbItems, err.Error(), db.LogPrefixORGID) // TODO is key needed?
return
return err
}
log.Infof("add %s (keys: %+v values: %+v) success", b.resourceTypeName, keys, dbItems, db.LogPrefixORGID)
return nil
}

func (b *operatorComponent[MT, KT]) update(oldDBItem MT, updateInfo map[string]interface{}, key KT, db *metadb.DB) {
func (b *operatorComponent[MT, KT]) update(oldDBItem MT, updateInfo map[string]interface{}, key KT, db *metadb.DB) error {
err := db.Model(&oldDBItem).Updates(updateInfo).Error
if err != nil {
log.Errorf("update %s (key: %+v value: %+v) failed: %s", b.resourceTypeName, key, oldDBItem, err.Error(), db.LogPrefixORGID)
return
return err
}
log.Infof("update %s (key: %+v value: %+v, update info: %v) success", b.resourceTypeName, key, oldDBItem, updateInfo, db.LogPrefixORGID)
return nil
}

func (b *operatorComponent[MT, KT]) delete(keys []KT, dbItems []MT, db *metadb.DB) {
func (b *operatorComponent[MT, KT]) delete(keys []KT, dbItems []MT, db *metadb.DB) error {
err := db.Delete(&dbItems).Error
if err != nil {
log.Errorf("delete %s (keys: %+v values: %+v) failed: %s", b.resourceTypeName, keys, dbItems, err.Error(), db.LogPrefixORGID)
return
return err
}
log.Infof("delete %s (keys: %+v values: %+v) success", b.resourceTypeName, keys, dbItems, db.LogPrefixORGID)
return nil
}
15 changes: 12 additions & 3 deletions server/controller/tagrecorder/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,19 @@ func (b *UpdaterComponent[MT, KT]) Refresh() bool {
} else {
updateInfo, ok := b.updaterDG.generateUpdateInfo(oldDBItem, newDBItem)
if ok {
b.dbOperator.update(oldDBItem, updateInfo, key, db)
err := b.dbOperator.update(oldDBItem, updateInfo, key, db)
if err != nil {
log.Errorf("failed to update %s: %s", b.resourceTypeName, err, db.LogPrefixORGID)
}
isUpdate = true
}
}
}
if len(itemsToAdd) > 0 {
b.dbOperator.batchPage(keysToAdd, itemsToAdd, b.dbOperator.add, db) // 1是个占位符
err := b.dbOperator.batchPage(keysToAdd, itemsToAdd, b.dbOperator.add, db) // 1是个占位符
if err != nil {
log.Errorf("failed to add %s: %s", b.resourceTypeName, err, db.LogPrefixORGID)
}
}

for key, oldDBItem := range oldKeyToDBItem {
Expand All @@ -203,7 +209,10 @@ func (b *UpdaterComponent[MT, KT]) Refresh() bool {
}
}
if len(itemsToDelete) > 0 {
b.dbOperator.batchPage(keysToDelete, itemsToDelete, b.dbOperator.delete, db) // 1是个占位符
err := b.dbOperator.batchPage(keysToDelete, itemsToDelete, b.dbOperator.delete, db) // 1是个占位符
if err != nil {
log.Errorf("failed to delete %s: %s", b.resourceTypeName, err, db.LogPrefixORGID)
}
}

if len(itemsToDelete) > 0 && len(itemsToAdd) == 0 && !isUpdate {
Expand Down
12 changes: 12 additions & 0 deletions server/controller/trisolaris/dbcache/db_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type DBDataCache struct {
cens []*models.CEN
processes []*models.Process
vips []*models.VIP
customServices []*models.CustomService

podNSs []*models.PodNamespace
vtaps []*models.VTap
Expand Down Expand Up @@ -290,6 +291,10 @@ func (d *DBDataCache) GetVIPs() []*models.VIP {
return d.vips
}

func (d *DBDataCache) GetCustomServices() []*models.CustomService {
return d.customServices
}

func GetTapTypesFromDB(db *gorm.DB) []*models.TapType {
tapTypes, err := dbmgr.DBMgr[models.TapType](db).Gets()
if err != nil {
Expand Down Expand Up @@ -639,4 +644,11 @@ func (d *DBDataCache) GetDataCacheFromDB(db *gorm.DB) {
} else {
log.Error(d.Log(err.Error()))
}

customServices, err := dbmgr.DBMgr[models.CustomService](db).Gets()
if err == nil {
d.customServices = customServices
} else {
log.Error(d.Log(err.Error()))
}
}
Loading
Loading