Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
93a85b3
refactor: [Coda] use enums for observability task models
taoyifan89 Oct 28, 2025
fbeae7e
refactor: [Coda] switch task processor to entity type
taoyifan89 Oct 29, 2025
5c31325
test: [Coda] add getNonFinalTaskInfos tests
taoyifan89 Oct 30, 2025
422c4ee
Refactor task.
taoyifan89 Oct 30, 2025
3eb6d43
Merge branch 'main' into refactor/auto_task
taoyifan89 Oct 31, 2025
e0c72e5
rename task processor.
taoyifan89 Oct 31, 2025
d1079c2
test: [Coda] 调整tracehub单测适配Processor接口
taoyifan89 Nov 5, 2025
333923d
Refactor backfill.
taoyifan89 Nov 5, 2025
2a063d4
test: [Coda] align tracehub tests with domain types
taoyifan89 Nov 5, 2025
6dac6de
Merge branch 'main' into refactor/auto_task
taoyifan89 Nov 5, 2025
0109505
Merge branch 'main' into refactor/auto_task
taoyifan89 Nov 5, 2025
0b3ff45
TraceHub refactor.
taoyifan89 Nov 6, 2025
eddb1b3
add topic proc
zero3233-bd Nov 7, 2025
b3179ae
fix consumer
zero3233-bd Nov 8, 2025
b3c5798
add SpanWithAnnotation proc
zero3233-bd Nov 8, 2025
792194e
Make new task consumer public.
taoyifan89 Nov 10, 2025
73e0ce6
add wire
zero3233-bd Nov 11, 2025
d93abc6
fix mq proc
zero3233-bd Nov 11, 2025
4e9719d
add wire gen
zero3233-bd Nov 11, 2025
665f5ae
fix time proc
zero3233-bd Nov 11, 2025
febba2d
add annotation proc
zero3233-bd Nov 11, 2025
bd1769c
rebase
zero3233-bd Nov 11, 2025
22f14d4
Merge branch 'main' into refactor/auto_task
taoyifan89 Nov 11, 2025
d3a0872
refactor scheduled task.
taoyifan89 Nov 12, 2025
763b1da
Merge branch 'main' into refactor/auto_task
taoyifan89 Nov 13, 2025
4051b25
Merge branch 'feat/feedback_auto_task' into refactor/auto_task
taoyifan89 Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/api/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion backend/api/handler/coze/loop/apis/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions backend/modules/observability/application/convertor/page.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2025 coze-dev Authors
// SPDX-License-Identifier: Apache-2.0

package convertor

import (
"github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/observability/domain/common"
entity "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/common"
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
)

func OrderByDTO2DO(orderBy *common.OrderBy) *entity.OrderBy {
if orderBy == nil {
return nil
}
return &entity.OrderBy{
Field: orderBy.GetField(),
IsAsc: orderBy.GetIsAsc(),
}
}

func OrderByDO2DTO(orderBy *entity.OrderBy) *common.OrderBy {
if orderBy == nil {
return nil
}
return &common.OrderBy{
Field: ptr.Of(orderBy.Field),
IsAsc: ptr.Of(orderBy.IsAsc),
}
}
103 changes: 103 additions & 0 deletions backend/modules/observability/application/convertor/task/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2025 coze-dev Authors
// SPDX-License-Identifier: Apache-2.0

package task

import (
"github.com/coze-dev/coze-loop/backend/kitex_gen/coze/loop/observability/domain/filter"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/entity"
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
)

func TaskFiltersDTO2DO(filters *filter.TaskFilterFields) *entity.TaskFilterFields {
if filters == nil {
return nil
}
result := &entity.TaskFilterFields{}
if filters.QueryAndOr != nil {
relation := entity.QueryRelation(*filters.QueryAndOr)
result.QueryAndOr = &relation
}
if len(filters.FilterFields) == 0 {
return result
}
result.FilterFields = make([]*entity.TaskFilterField, 0, len(filters.FilterFields))
for _, field := range filters.FilterFields {
if field == nil {
continue
}
result.FilterFields = append(result.FilterFields, taskFilterFieldDTO2DO(field))
}
return result
}

func taskFilterFieldDTO2DO(field *filter.TaskFilterField) *entity.TaskFilterField {
if field == nil {
return nil
}
result := &entity.TaskFilterField{
Values: append([]string(nil), field.Values...),
SubFilter: taskFilterFieldDTO2DO(field.SubFilter),
}
if field.FieldName != nil {
name := entity.TaskFieldName(*field.FieldName)
result.FieldName = &name
}
if field.FieldType != nil {
fieldType := entity.FieldType(*field.FieldType)
result.FieldType = &fieldType
}
if field.QueryType != nil {
queryType := entity.QueryType(*field.QueryType)
result.QueryType = &queryType
}
if field.QueryAndOr != nil {
relation := entity.QueryRelation(*field.QueryAndOr)
result.QueryAndOr = &relation
}
return result
}

func TaskFiltersDO2DTO(filters *entity.TaskFilterFields) *filter.TaskFilterFields {
if filters == nil {
return nil
}
result := &filter.TaskFilterFields{}
if filters.QueryAndOr != nil {
result.QueryAndOr = ptr.Of(filter.QueryRelation(*filters.QueryAndOr))
}
if len(filters.FilterFields) == 0 {
return result
}
result.FilterFields = make([]*filter.TaskFilterField, 0, len(filters.FilterFields))
for _, field := range filters.FilterFields {
if field == nil {
continue
}
result.FilterFields = append(result.FilterFields, taskFilterFieldDO2DTO(field))
}
return result
}

func taskFilterFieldDO2DTO(field *entity.TaskFilterField) *filter.TaskFilterField {
if field == nil {
return nil
}
result := &filter.TaskFilterField{
Values: append([]string(nil), field.Values...),
SubFilter: taskFilterFieldDO2DTO(field.SubFilter),
}
if field.FieldName != nil {
result.FieldName = ptr.Of(string(*field.FieldName))
}
if field.FieldType != nil {
result.FieldType = ptr.Of(filter.FieldType(*field.FieldType))
}
if field.QueryType != nil {
result.QueryType = ptr.Of(filter.QueryType(*field.QueryType))
}
if field.QueryAndOr != nil {
result.QueryAndOr = ptr.Of(filter.QueryRelation(*field.QueryAndOr))
}
return result
}
130 changes: 20 additions & 110 deletions backend/modules/observability/application/convertor/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"github.com/coze-dev/coze-loop/backend/modules/observability/application/convertor"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/task/entity"
entity_common "github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/common"
obErrorx "github.com/coze-dev/coze-loop/backend/modules/observability/pkg/errno"
"github.com/coze-dev/coze-loop/backend/pkg/errorx"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
"github.com/coze-dev/coze-loop/backend/pkg/lang/ptr"
"github.com/coze-dev/coze-loop/backend/pkg/lang/slices"
"github.com/coze-dev/coze-loop/backend/pkg/logs"
"github.com/samber/lo"
)

func TaskDOs2DTOs(ctx context.Context, taskPOs []*entity.ObservabilityTask, userInfos map[string]*entity_common.UserInfo) []*task.Task {
Expand Down Expand Up @@ -61,8 +61,8 @@ func TaskDO2DTO(ctx context.Context, v *entity.ObservabilityTask, userMap map[st
Name: v.Name,
Description: v.Description,
WorkspaceID: ptr.Of(v.WorkspaceID),
TaskType: v.TaskType,
TaskStatus: ptr.Of(v.TaskStatus),
TaskType: task.TaskType(v.TaskType),
TaskStatus: ptr.Of(task.TaskStatus(v.TaskStatus)),
Rule: RuleDO2DTO(v.SpanFilter, v.EffectiveTime, v.Sampler, v.BackfillEffectiveTime),
TaskConfig: TaskConfigDO2DTO(v.TaskConfig),
TaskDetail: taskDetail,
Expand All @@ -84,8 +84,8 @@ func TaskRunDO2DTO(ctx context.Context, v *entity.TaskRun, userMap map[string]*e
ID: v.ID,
WorkspaceID: v.WorkspaceID,
TaskID: v.TaskID,
TaskType: v.TaskType,
RunStatus: v.RunStatus,
TaskType: task.TaskRunType(v.TaskType),
RunStatus: task.RunStatus(v.RunStatus),
RunDetail: RunDetailDO2DTO(v.RunDetail),
BackfillRunDetail: BackfillRunDetailDO2DTO(v.BackfillDetail),
RunStartAt: v.RunStartAt.UnixMilli(),
Expand Down Expand Up @@ -177,8 +177,8 @@ func SpanFilterDO2DTO(spanFilter *entity.SpanFilterFields) *filter.SpanFilterFie

return &filter.SpanFilterFields{
Filters: convertor.FilterFieldsDO2DTO(&spanFilter.Filters),
PlatformType: &spanFilter.PlatformType,
SpanListType: &spanFilter.SpanListType,
PlatformType: lo.ToPtr(common.PlatformType(spanFilter.PlatformType)),
SpanListType: lo.ToPtr(common.SpanListType(spanFilter.SpanListType)),
}
}

Expand All @@ -204,7 +204,7 @@ func SamplerDO2DTO(sampler *entity.Sampler) *task.Sampler {
IsCycle: ptr.Of(sampler.IsCycle),
CycleCount: ptr.Of(sampler.CycleCount),
CycleInterval: ptr.Of(sampler.CycleInterval),
CycleTimeUnit: ptr.Of(sampler.CycleTimeUnit),
CycleTimeUnit: ptr.Of(string(sampler.CycleTimeUnit)),
}
}

Expand Down Expand Up @@ -305,7 +305,7 @@ func UserInfoPO2DO(userInfo *entity_common.UserInfo, userID string) *common.User
}
}

func TaskDTO2DO(taskDTO *task.Task, userID string, spanFilters *entity.SpanFilterFields) *entity.ObservabilityTask {
func TaskDTO2DO(taskDTO *task.Task) *entity.ObservabilityTask {
if taskDTO == nil {
return nil
}
Expand All @@ -316,31 +316,16 @@ func TaskDTO2DO(taskDTO *task.Task, userID string, spanFilters *entity.SpanFilte
if taskDTO.GetBaseInfo().GetUpdatedBy() != nil {
updatedBy = taskDTO.GetBaseInfo().GetUpdatedBy().GetUserID()
}
if userID != "" {
createdBy = userID
updatedBy = userID
} else {
if taskDTO.GetBaseInfo().GetCreatedBy() != nil {
createdBy = taskDTO.GetBaseInfo().GetCreatedBy().GetUserID()
}
if taskDTO.GetBaseInfo().GetUpdatedBy() != nil {
updatedBy = taskDTO.GetBaseInfo().GetUpdatedBy().GetUserID()
}
}
var spanFilterDO *entity.SpanFilterFields
if spanFilters != nil {
spanFilterDO = spanFilters
} else {
spanFilterDO = SpanFilterDTO2DO(taskDTO.GetRule().GetSpanFilters())
}

spanFilterDO := SpanFilterDTO2DO(taskDTO.GetRule().GetSpanFilters())

return &entity.ObservabilityTask{
ID: taskDTO.GetID(),
WorkspaceID: taskDTO.GetWorkspaceID(),
Name: taskDTO.GetName(),
Description: ptr.Of(taskDTO.GetDescription()),
TaskType: taskDTO.GetTaskType(),
TaskStatus: taskDTO.GetTaskStatus(),
TaskType: entity.TaskType(taskDTO.GetTaskType()),
TaskStatus: entity.TaskStatus(taskDTO.GetTaskStatus()),
TaskDetail: RunDetailDTO2DO(taskDTO.GetTaskDetail()),
SpanFilter: spanFilterDO,
EffectiveTime: EffectiveTimeDTO2DO(taskDTO.GetRule().GetEffectiveTime()),
Expand All @@ -359,8 +344,8 @@ func SpanFilterDTO2DO(spanFilterFields *filter.SpanFilterFields) *entity.SpanFil
return nil
}
return &entity.SpanFilterFields{
PlatformType: *spanFilterFields.PlatformType,
SpanListType: *spanFilterFields.SpanListType,
PlatformType: loop_span.PlatformType(*spanFilterFields.PlatformType),
SpanListType: loop_span.SpanListType(*spanFilterFields.SpanListType),
Filters: *convertor.FilterFieldsDTO2DO(spanFilterFields.Filters),
}
}
Expand Down Expand Up @@ -396,7 +381,7 @@ func SamplerDTO2DO(sampler *task.Sampler) *entity.Sampler {
IsCycle: sampler.GetIsCycle(),
CycleCount: sampler.GetCycleCount(),
CycleInterval: sampler.GetCycleInterval(),
CycleTimeUnit: sampler.GetCycleTimeUnit(),
CycleTimeUnit: entity.TimeUnit(sampler.GetCycleTimeUnit()),
}
}

Expand All @@ -408,6 +393,7 @@ func TaskConfigDTO2DO(taskConfig *task.TaskConfig) *entity.TaskConfig {
for _, autoEvaluateConfig := range taskConfig.AutoEvaluateConfigs {
var fieldMappings []*entity.EvaluateFieldMapping
if len(autoEvaluateConfig.FieldMappings) > 0 {
// todo tyf 这段逻辑挪到service层
var evalSetNames []string
jspnPathMapping := make(map[string]string)
for _, config := range autoEvaluateConfig.FieldMappings {
Expand Down Expand Up @@ -471,8 +457,8 @@ func TaskRunDTO2DO(taskRun *task.TaskRun) *entity.TaskRun {
ID: taskRun.ID,
TaskID: taskRun.TaskID,
WorkspaceID: taskRun.WorkspaceID,
TaskType: taskRun.TaskType,
RunStatus: taskRun.RunStatus,
TaskType: entity.TaskRunType(taskRun.TaskType),
RunStatus: entity.TaskRunStatus(taskRun.RunStatus),
RunDetail: RunDetailDTO2DO(taskRun.RunDetail),
BackfillDetail: BackfillRunDetailDTO2DO(taskRun.BackfillRunDetail),
RunStartAt: time.UnixMilli(taskRun.RunStartAt),
Expand Down Expand Up @@ -531,82 +517,6 @@ func BackfillRunDetailDTO2DO(v *task.BackfillDetail) *entity.BackfillDetail {
}
}

func CheckEffectiveTime(ctx context.Context, effectiveTime *task.EffectiveTime, taskStatus task.TaskStatus, effectiveTimeDO *entity.EffectiveTime) (*entity.EffectiveTime, error) {
if effectiveTimeDO == nil {
logs.CtxError(ctx, "EffectiveTimePO2DO error")
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("effective time is nil"))
}
var validEffectiveTime entity.EffectiveTime
// 开始时间不能大于结束时间
if effectiveTime.GetStartAt() >= effectiveTime.GetEndAt() {
logs.CtxError(ctx, "Start time must be less than end time")
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("start time must be less than end time"))
}
// 开始、结束时间不能小于当前时间
if effectiveTimeDO.StartAt != effectiveTime.GetStartAt() && effectiveTime.GetStartAt() < time.Now().UnixMilli() {
logs.CtxError(ctx, "update time must be greater than current time")
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("start time must be greater than current time"))
}
if effectiveTimeDO.EndAt != effectiveTime.GetEndAt() && effectiveTime.GetEndAt() < time.Now().UnixMilli() {
logs.CtxError(ctx, "update time must be greater than current time")
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("start time must be greater than current time"))
}
validEffectiveTime.StartAt = effectiveTimeDO.StartAt
validEffectiveTime.EndAt = effectiveTimeDO.EndAt
switch taskStatus {
case task.TaskStatusUnstarted:
if validEffectiveTime.StartAt != 0 {
validEffectiveTime.StartAt = *effectiveTime.StartAt
}
if validEffectiveTime.EndAt != 0 {
validEffectiveTime.EndAt = *effectiveTime.EndAt
}
case task.TaskStatusRunning, task.TaskStatusPending:
if validEffectiveTime.EndAt != 0 {
validEffectiveTime.EndAt = *effectiveTime.EndAt
}
default:
logs.CtxError(ctx, "Invalid task status:%s", taskStatus)
return nil, errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("invalid task status"))
}
return &validEffectiveTime, nil
}

func CheckTaskStatus(ctx context.Context, taskStatus task.TaskStatus, currentTaskStatus task.TaskStatus) (task.TaskStatus, error) {
var validTaskStatus task.TaskStatus
// [0530]todo: 任务状态校验
switch taskStatus {
case task.TaskStatusUnstarted:
if currentTaskStatus == task.TaskStatusUnstarted {
validTaskStatus = taskStatus
} else {
logs.CtxError(ctx, "Invalid task status:%s", taskStatus)
return "", errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("invalid task status"))
}
case task.TaskStatusRunning:
if currentTaskStatus == task.TaskStatusUnstarted || currentTaskStatus == task.TaskStatusPending {
validTaskStatus = taskStatus
} else {
logs.CtxError(ctx, "Invalid task status:%s,currentTaskStatus:%s", taskStatus, currentTaskStatus)
return "", errorx.NewByCode(obErrorx.CommercialCommonInvalidParamCodeCode, errorx.WithExtraMsg("invalid task status"))
}
case task.TaskStatusPending:
if currentTaskStatus == task.TaskStatusRunning {
validTaskStatus = task.TaskStatusPending
}
case task.TaskStatusDisabled:
if currentTaskStatus == task.TaskStatusUnstarted || currentTaskStatus == task.TaskStatusPending {
validTaskStatus = task.TaskStatusDisabled
}
case task.TaskStatusSuccess:
if currentTaskStatus != task.TaskStatusSuccess {
validTaskStatus = task.TaskStatusSuccess
}
}

return validTaskStatus, nil
}

func getLastPartAfterDot(s string) string {
s = strings.TrimRight(s, ".")
lastDotIndex := strings.LastIndex(s, ".")
Expand Down
Loading
Loading