Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ if config.tilt_subcommand == "ci":
custom_build(
'rust-log-service',
'docker image tag rust-log-service:ci $EXPECTED_REF',
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The list of dependencies ['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'] is repeated for all Rust services (rust-log-service, rust-frontend-service, query-service, etc.).

To improve maintainability and keep the file DRY, consider defining this list as a variable at the top and reusing it for each service.

For example:

RUST_DEPS = ['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go']

# ...

if config.tilt_subcommand == "ci":
  custom_build(
    'rust-log-service',
    'docker image tag rust-log-service:ci $EXPECTED_REF',
    RUST_DEPS,
    disable_push=True
  )
else:
  docker_build(
    'rust-log-service',
    '.',
    only=RUST_DEPS,
    dockerfile='./rust/Dockerfile',
    target='log_service'
  )

# ... apply the same for other rust services
Context for Agents
[**BestPractice**]

The list of dependencies `['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go']` is repeated for all Rust services (`rust-log-service`, `rust-frontend-service`, `query-service`, etc.).

To improve maintainability and keep the file DRY, consider defining this list as a variable at the top and reusing it for each service.

For example:
```python
RUST_DEPS = ['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go']

# ...

if config.tilt_subcommand == "ci":
  custom_build(
    'rust-log-service',
    'docker image tag rust-log-service:ci $EXPECTED_REF',
    RUST_DEPS,
    disable_push=True
  )
else:
  docker_build(
    'rust-log-service',
    '.',
    only=RUST_DEPS,
    dockerfile='./rust/Dockerfile',
    target='log_service'
  )

# ... apply the same for other rust services
```

File: Tiltfile
Line: 23

disable_push=True
)
else:
docker_build(
'rust-log-service',
'.',
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
dockerfile='./rust/Dockerfile',
target='log_service'
)
Expand Down Expand Up @@ -68,14 +68,14 @@ if config.tilt_subcommand == "ci":
custom_build(
'rust-frontend-service',
'docker image tag rust-frontend-service:ci $EXPECTED_REF',
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
disable_push=True
)
else:
docker_build(
'rust-frontend-service',
'.',
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
dockerfile='./rust/Dockerfile',
target='cli'
)
Expand All @@ -84,14 +84,14 @@ if config.tilt_subcommand == "ci":
custom_build(
'query-service',
'docker image tag query-service:ci $EXPECTED_REF',
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
disable_push=True
)
else:
docker_build(
'query-service',
'.',
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
dockerfile='./rust/Dockerfile',
target='query_service'
)
Expand All @@ -100,14 +100,14 @@ if config.tilt_subcommand == "ci":
custom_build(
'compaction-service',
'docker image tag compactor-service:ci $EXPECTED_REF',
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
disable_push=True
)
else:
docker_build(
'compaction-service',
'.',
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
dockerfile='./rust/Dockerfile',
target='compaction_service'
)
Expand All @@ -116,14 +116,14 @@ if config.tilt_subcommand == "ci":
custom_build(
'garbage-collector',
'docker image tag garbage-collector:ci $EXPECTED_REF',
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
disable_push=True
)
else:
docker_build(
'garbage-collector',
'.',
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
dockerfile='./rust/Dockerfile',
target='garbage_collector'
)
Expand All @@ -132,14 +132,14 @@ if config.tilt_subcommand == "ci":
custom_build(
'load-service',
'docker image tag load-service:ci $EXPECTED_REF',
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
disable_push=True
)
else:
docker_build(
'load-service',
'.',
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
dockerfile='./rust/Dockerfile',
target='load_service'
)
Expand Down
8 changes: 8 additions & 0 deletions go/pkg/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ var (
// Segment metadata errors
ErrUnknownSegmentMetadataType = errors.New("segment metadata value type not supported")

// Task errors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit that I'd be better off taking up with the creators of go: Static strings for errors are hard to debug, especially if they occur at multiple places. Can we at least embed some information about what the error means? Like context about it.

ErrTaskAlreadyExists = errors.New("the task that was being created already exists for this collection")
ErrTaskNotFound = errors.New("the requested task was not found")
ErrInvalidTaskName = errors.New("task name cannot start with reserved prefix '_deleted_'")

// Operator errors
ErrOperatorNotFound = errors.New("operator not found")

// Others
ErrCompactionOffsetSomehowAhead = errors.New("system invariant was violated. Compaction offset in sysdb should always be behind or equal to offset in log")
)
264 changes: 264 additions & 0 deletions go/pkg/sysdb/coordinator/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package coordinator

import (
"context"
"strings"
"time"

"github.com/chroma-core/chroma/go/pkg/common"
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
"github.com/chroma-core/chroma/go/pkg/sysdb/coordinator/model"
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
"github.com/chroma-core/chroma/go/pkg/types"
"github.com/google/uuid"
"github.com/pingcap/log"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

// CreateTask creates a new task in the database
func (s *Coordinator) CreateTask(ctx context.Context, req *coordinatorpb.CreateTaskRequest) (*coordinatorpb.CreateTaskResponse, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the transaction somehow carried on the context? Otherwise this is not transactional code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not, I was going to leave making task creation + collection creation a follow up to this because collection creation also needs to be refactored to take a transaction.

// Validate task name doesn't start with soft-deletion reserved prefix
if strings.HasPrefix(req.Name, "_deleted_") {
log.Error("CreateTask: task name cannot start with _deleted_")
return nil, common.ErrInvalidTaskName
}

var taskID uuid.UUID

// Execute all database operations in a transaction
err := s.catalog.txImpl.Transaction(ctx, func(txCtx context.Context) error {
// Check if task already exists
existingTask, err := s.catalog.metaDomain.TaskDb(txCtx).GetByName(req.InputCollectionId, req.Name)
if err != nil {
log.Error("CreateTask: failed to check task", zap.Error(err))
return err
}
if existingTask != nil {
log.Info("CreateTask: task already exists, returning existing")
taskID = existingTask.ID
return nil
}

// Generate new task UUID
taskID = uuid.New()
outputCollectionName := req.OutputCollectionName

// Look up database_id from databases table using database name and tenant
databases, err := s.catalog.metaDomain.DatabaseDb(txCtx).GetDatabases(req.TenantId, req.Database)
if err != nil {
log.Error("CreateTask: failed to get database", zap.Error(err))
return err
}
if len(databases) == 0 {
log.Error("CreateTask: database not found")
return common.ErrDatabaseNotFound
}

// Look up operator by name from the operators table
operator, err := s.catalog.metaDomain.OperatorDb(txCtx).GetByName(req.OperatorName)
if err != nil {
log.Error("CreateTask: failed to get operator", zap.Error(err))
return err
}
if operator == nil {
log.Error("CreateTask: operator not found", zap.String("operator_name", req.OperatorName))
return common.ErrOperatorNotFound
}
operatorID := operator.OperatorID

// Generate UUIDv7 for time-ordered nonce
nextNonce, err := uuid.NewV7()
if err != nil {
return err
}

// TODO(tanujnay112): Can combine the two collection checks into one
// Check if input collection exists
collections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections([]string{req.InputCollectionId}, nil, req.TenantId, req.Database, nil, nil, false)
if err != nil {
log.Error("CreateTask: failed to get input collection", zap.Error(err))
return err
}
if len(collections) == 0 {
log.Error("CreateTask: input collection not found")
return common.ErrCollectionNotFound
}

// Check if output collection already exists
existingOutputCollections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections(nil, &outputCollectionName, req.TenantId, req.Database, nil, nil, false)
if err != nil {
log.Error("CreateTask: failed to check output collection", zap.Error(err))
return err
}
if len(existingOutputCollections) > 0 {
log.Error("CreateTask: output collection already exists")
return common.ErrCollectionUniqueConstraintViolation
}

now := time.Now()
task := &dbmodel.Task{
ID: taskID,
Name: req.Name,
TenantID: req.TenantId,
DatabaseID: databases[0].ID,
InputCollectionID: req.InputCollectionId,
OutputCollectionName: req.OutputCollectionName,
OperatorID: operatorID,
OperatorParams: req.Params,
CompletionOffset: 0,
LastRun: nil,
NextRun: nil, // Will be set to zero initially, scheduled by task scheduler
MinRecordsForTask: int64(req.MinRecordsForTask),
CurrentAttempts: 0,
CreatedAt: now,
UpdatedAt: now,
NextNonce: nextNonce,
OldestWrittenNonce: nil,
}

// Try to insert task into database
err = s.catalog.metaDomain.TaskDb(txCtx).Insert(task)
if err != nil {
// Check if it's a unique constraint violation (concurrent creation)
if err == common.ErrTaskAlreadyExists {
log.Error("CreateTask: task already exists")
return common.ErrTaskAlreadyExists
}
Comment on lines +126 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

The current implementation has inconsistent behavior for handling pre-existing tasks. If a task is found with GetByName on line 30, its ID is returned successfully. However, if a task is created concurrently between that check and the Insert call (a race condition), the function returns ErrTaskAlreadyExists.

This could be confusing for clients. For robust idempotency, it would be better to handle the race condition by re-fetching the concurrently created task and returning its ID, making the function's behavior consistent in all "already exists" scenarios.

Consider replacing the error return with logic to re-fetch the task to ensure the CreateTask operation is fully idempotent.

Context for Agents
[**BestPractice**]

The current implementation has inconsistent behavior for handling pre-existing tasks. If a task is found with `GetByName` on line 30, its ID is returned successfully. However, if a task is created concurrently between that check and the `Insert` call (a race condition), the function returns `ErrTaskAlreadyExists`.

This could be confusing for clients. For robust idempotency, it would be better to handle the race condition by re-fetching the concurrently created task and returning its ID, making the function's behavior consistent in all "already exists" scenarios.

Consider replacing the error return with logic to re-fetch the task to ensure the `CreateTask` operation is fully idempotent.

File: go/pkg/sysdb/coordinator/task.go
Line: 125

log.Error("CreateTask: failed to insert task", zap.Error(err))
return err
}

log.Info("Task created successfully", zap.String("task_id", taskID.String()), zap.String("name", req.Name), zap.String("output_collection_name", outputCollectionName))
return nil
})

if err != nil {
return nil, err
}

return &coordinatorpb.CreateTaskResponse{
TaskId: taskID.String(),
}, nil
}

// GetTaskByName retrieves a task by name from the database
func (s *Coordinator) GetTaskByName(ctx context.Context, req *coordinatorpb.GetTaskByNameRequest) (*coordinatorpb.GetTaskByNameResponse, error) {
// Can do both calls with a JOIN
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByName(req.InputCollectionId, req.TaskName)
if err != nil {
return nil, err
}

// If task not found, return empty response
if task == nil {
return nil, common.ErrTaskNotFound
}

// Look up operator name from operators table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[PerformanceOptimization]

N+1 query pattern: In GetTaskByName, after retrieving the task, there's a separate query to get the operator details. If this function is called multiple times (e.g., in a batch operation), each call will make 2 database queries instead of using a JOIN.

Suggested change
// Look up operator name from operators table
// Consider using a JOIN query to fetch task and operator in one query:
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByNameWithOperator(req.InputCollectionId, req.TaskName)

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**PerformanceOptimization**]

N+1 query pattern: In `GetTaskByName`, after retrieving the task, there's a separate query to get the operator details. If this function is called multiple times (e.g., in a batch operation), each call will make 2 database queries instead of using a JOIN.

```suggestion
// Consider using a JOIN query to fetch task and operator in one query:
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByNameWithOperator(req.InputCollectionId, req.TaskName)
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: go/pkg/sysdb/coordinator/task.go
Line: 127

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a priority if we put the operation in a transaction.

operator, err := s.catalog.metaDomain.OperatorDb(ctx).GetByID(task.OperatorID)
if err != nil {
log.Error("GetTaskByName: failed to get operator", zap.Error(err))
return nil, err
}
if operator == nil {
log.Error("GetTaskByName: operator not found", zap.String("operator_id", task.OperatorID.String()))
return nil, common.ErrOperatorNotFound
}

// Debug logging
log.Info("Found task", zap.String("task_id", task.ID.String()), zap.String("name", task.Name), zap.String("input_collection_id", task.InputCollectionID), zap.String("output_collection_name", task.OutputCollectionName))

// Convert task to response
response := &coordinatorpb.GetTaskByNameResponse{
TaskId: proto.String(task.ID.String()),
Name: proto.String(task.Name),
OperatorName: proto.String(operator.OperatorName),
InputCollectionId: proto.String(task.InputCollectionID),
OutputCollectionName: proto.String(task.OutputCollectionName),
Params: proto.String(task.OperatorParams),
CompletionOffset: proto.Int64(task.CompletionOffset),
MinRecordsForTask: proto.Uint64(uint64(task.MinRecordsForTask)),
}
// Add output_collection_id if it's set
if task.OutputCollectionID != nil {
response.OutputCollectionId = task.OutputCollectionID
}
return response, nil
}

// DeleteTask soft deletes a task by name
func (s *Coordinator) DeleteTask(ctx context.Context, req *coordinatorpb.DeleteTaskRequest) (*coordinatorpb.DeleteTaskResponse, error) {
// First get the task to check if we need to delete the output collection
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByName(req.InputCollectionId, req.TaskName)
if err != nil {
log.Error("DeleteTask: failed to get task", zap.Error(err))
return nil, err
}
if task == nil {
log.Error("DeleteTask: task not found")
return nil, status.Errorf(codes.NotFound, "task not found")
}
Comment on lines +200 to +203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ArchitectureDecision]

In DeleteTask, the function returns a gRPC status.Errorf error directly. This is inconsistent with other functions in the coordinator layer (like GetTaskByName which returns common.ErrTaskNotFound) that return errors from the common package. The coordinator layer should ideally be agnostic of the transport layer. It's better to return common errors and let the gRPC service layer handle the translation to gRPC status codes.

Suggested change
if task == nil {
log.Error("DeleteTask: task not found")
return nil, status.Errorf(codes.NotFound, "task not found")
}
if task == nil {
log.Error("DeleteTask: task not found")
return nil, common.ErrTaskNotFound
}

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**ArchitectureDecision**]

In `DeleteTask`, the function returns a gRPC `status.Errorf` error directly. This is inconsistent with other functions in the coordinator layer (like `GetTaskByName` which returns `common.ErrTaskNotFound`) that return errors from the `common` package. The coordinator layer should ideally be agnostic of the transport layer. It's better to return `common` errors and let the gRPC service layer handle the translation to gRPC status codes.

```suggestion
	if task == nil {
		log.Error("DeleteTask: task not found")
		return nil, common.ErrTaskNotFound
	}
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

File: go/pkg/sysdb/coordinator/task.go
Line: 203


// If delete_output is true and output_collection_id is set, soft-delete the output collection
if req.DeleteOutput && task.OutputCollectionID != nil && *task.OutputCollectionID != "" {
collectionUUID, err := types.ToUniqueID(task.OutputCollectionID)
if err != nil {
log.Error("DeleteTask: invalid output_collection_id", zap.Error(err))
return nil, status.Errorf(codes.InvalidArgument, "invalid output_collection_id: %v", err)
}
Comment on lines +207 to +211
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ArchitectureDecision]

Similar to the other error handling feedback, returning status.Errorf here couples the coordinator with gRPC. It's better to return a standard Go error and let the caller (the gRPC handler) wrap it in a gRPC status.

Suggested Change
Suggested change
collectionUUID, err := types.ToUniqueID(task.OutputCollectionID)
if err != nil {
log.Error("DeleteTask: invalid output_collection_id", zap.Error(err))
return nil, status.Errorf(codes.InvalidArgument, "invalid output_collection_id: %v", err)
}
collectionUUID, err := types.ToUniqueID(task.OutputCollectionID)
if err != nil {
log.Error("DeleteTask: invalid output_collection_id", zap.Error(err))
return nil, err
}

Committable suggestion

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Context for Agents
[**ArchitectureDecision**]

Similar to the other error handling feedback, returning `status.Errorf` here couples the coordinator with gRPC. It's better to return a standard Go error and let the caller (the gRPC handler) wrap it in a gRPC status.

<details>
<summary>Suggested Change</summary>

```suggestion
		collectionUUID, err := types.ToUniqueID(task.OutputCollectionID)
		if err != nil {
			log.Error("DeleteTask: invalid output_collection_id", zap.Error(err))
			return nil, err
		}
```

⚡ **Committable suggestion**

Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

</details>

File: go/pkg/sysdb/coordinator/task.go
Line: 211


deleteCollection := &model.DeleteCollection{
ID: collectionUUID,
TenantID: task.TenantID,
DatabaseName: task.DatabaseID,
Comment on lines +213 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CriticalError]

There appears to be a bug here. The model.DeleteCollection struct expects a DatabaseName, but you are passing task.DatabaseID. This will likely cause SoftDeleteCollection to fail because it will be searching for a database with a name that is actually a UUID string.

The task model doesn't store the database name, so you'll need to fetch it. This might require adding a GetDatabaseByID method to the database DAO and interface, as one doesn't seem to exist currently.

Example of what the fix might look like:

// First, fetch the database to get its name
// This assumes a new method GetDatabaseByID is added to the DB interface
database, err := s.catalog.metaDomain.DatabaseDb(ctx).GetDatabaseByID(task.DatabaseID)
if err != nil {
    log.Error("DeleteTask: failed to get database by ID", zap.Error(err), zap.String("database_id", task.DatabaseID))
    return nil, err
}
if database == nil {
    log.Error("DeleteTask: database not found for ID", zap.String("database_id", task.DatabaseID))
    return nil, common.ErrDatabaseNotFound
}

deleteCollection := &model.DeleteCollection{
    ID:           collectionUUID,
    TenantID:     task.TenantID,
    DatabaseName: database.Name, // Use the fetched database name
}
Context for Agents
[**CriticalError**]

There appears to be a bug here. The `model.DeleteCollection` struct expects a `DatabaseName`, but you are passing `task.DatabaseID`. This will likely cause `SoftDeleteCollection` to fail because it will be searching for a database with a name that is actually a UUID string.

The `task` model doesn't store the database name, so you'll need to fetch it. This might require adding a `GetDatabaseByID` method to the database DAO and interface, as one doesn't seem to exist currently.

Example of what the fix might look like:
```go
// First, fetch the database to get its name
// This assumes a new method GetDatabaseByID is added to the DB interface
database, err := s.catalog.metaDomain.DatabaseDb(ctx).GetDatabaseByID(task.DatabaseID)
if err != nil {
    log.Error("DeleteTask: failed to get database by ID", zap.Error(err), zap.String("database_id", task.DatabaseID))
    return nil, err
}
if database == nil {
    log.Error("DeleteTask: database not found for ID", zap.String("database_id", task.DatabaseID))
    return nil, common.ErrDatabaseNotFound
}

deleteCollection := &model.DeleteCollection{
    ID:           collectionUUID,
    TenantID:     task.TenantID,
    DatabaseName: database.Name, // Use the fetched database name
}
```

File: go/pkg/sysdb/coordinator/task.go
Line: 216

}

err = s.SoftDeleteCollection(ctx, deleteCollection)
if err != nil {
// Log but don't fail - we still want to delete the task
log.Warn("DeleteTask: failed to delete output collection", zap.Error(err), zap.String("collection_id", *task.OutputCollectionID))
} else {
log.Info("DeleteTask: deleted output collection", zap.String("collection_id", *task.OutputCollectionID))
}
}

// Now soft-delete the task
err = s.catalog.metaDomain.TaskDb(ctx).SoftDelete(req.InputCollectionId, req.TaskName)
if err != nil {
log.Error("DeleteTask failed", zap.Error(err))
return nil, err
}

log.Info("Task deleted", zap.String("input_collection_id", req.InputCollectionId), zap.String("task_name", req.TaskName))

return &coordinatorpb.DeleteTaskResponse{
Success: true,
}, nil
}

// GetOperators retrieves all operators from the database
func (s *Coordinator) GetOperators(ctx context.Context, req *coordinatorpb.GetOperatorsRequest) (*coordinatorpb.GetOperatorsResponse, error) {
operators, err := s.catalog.metaDomain.OperatorDb(ctx).GetAll()
if err != nil {
log.Error("GetOperators failed", zap.Error(err))
return nil, err
}

// Convert to proto response
protoOperators := make([]*coordinatorpb.Operator, len(operators))
for i, op := range operators {
protoOperators[i] = &coordinatorpb.Operator{
Id: op.OperatorID.String(),
Name: op.OperatorName,
}
}

log.Info("GetOperators succeeded", zap.Int("count", len(operators)))

return &coordinatorpb.GetOperatorsResponse{
Operators: protoOperators,
}, nil
}
Loading
Loading