Skip to content

Commit 51e7975

Browse files
committed
Add models and migration for Create/Delete/Get Tasks
1 parent 2c9797a commit 51e7975

35 files changed

+1705
-58
lines changed

Tiltfile

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ if config.tilt_subcommand == "ci":
2020
custom_build(
2121
'rust-log-service',
2222
'docker image tag rust-log-service:ci $EXPECTED_REF',
23-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
23+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
2424
disable_push=True
2525
)
2626
else:
2727
docker_build(
2828
'rust-log-service',
2929
'.',
30-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
30+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
3131
dockerfile='./rust/Dockerfile',
3232
target='log_service'
3333
)
@@ -68,14 +68,14 @@ if config.tilt_subcommand == "ci":
6868
custom_build(
6969
'rust-frontend-service',
7070
'docker image tag rust-frontend-service:ci $EXPECTED_REF',
71-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
71+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
7272
disable_push=True
7373
)
7474
else:
7575
docker_build(
7676
'rust-frontend-service',
7777
'.',
78-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
78+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
7979
dockerfile='./rust/Dockerfile',
8080
target='cli'
8181
)
@@ -84,14 +84,14 @@ if config.tilt_subcommand == "ci":
8484
custom_build(
8585
'query-service',
8686
'docker image tag query-service:ci $EXPECTED_REF',
87-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
87+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
8888
disable_push=True
8989
)
9090
else:
9191
docker_build(
9292
'query-service',
9393
'.',
94-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
94+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
9595
dockerfile='./rust/Dockerfile',
9696
target='query_service'
9797
)
@@ -100,14 +100,14 @@ if config.tilt_subcommand == "ci":
100100
custom_build(
101101
'compaction-service',
102102
'docker image tag compactor-service:ci $EXPECTED_REF',
103-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
103+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
104104
disable_push=True
105105
)
106106
else:
107107
docker_build(
108108
'compaction-service',
109109
'.',
110-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
110+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
111111
dockerfile='./rust/Dockerfile',
112112
target='compaction_service'
113113
)
@@ -116,14 +116,14 @@ if config.tilt_subcommand == "ci":
116116
custom_build(
117117
'garbage-collector',
118118
'docker image tag garbage-collector:ci $EXPECTED_REF',
119-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
119+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
120120
disable_push=True
121121
)
122122
else:
123123
docker_build(
124124
'garbage-collector',
125125
'.',
126-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
126+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
127127
dockerfile='./rust/Dockerfile',
128128
target='garbage_collector'
129129
)
@@ -132,14 +132,14 @@ if config.tilt_subcommand == "ci":
132132
custom_build(
133133
'load-service',
134134
'docker image tag load-service:ci $EXPECTED_REF',
135-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
135+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
136136
disable_push=True
137137
)
138138
else:
139139
docker_build(
140140
'load-service',
141141
'.',
142-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
142+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
143143
dockerfile='./rust/Dockerfile',
144144
target='load_service'
145145
)

go/pkg/common/errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ var (
4848
// Segment metadata errors
4949
ErrUnknownSegmentMetadataType = errors.New("segment metadata value type not supported")
5050

51+
// Task errors
52+
ErrTaskAlreadyExists = errors.New("the task that was being created already exists for this collection")
53+
ErrTaskNotFound = errors.New("the requested task was not found")
54+
ErrInvalidTaskName = errors.New("task name cannot start with reserved prefix '_deleted_'")
55+
56+
// Operator errors
57+
ErrOperatorNotFound = errors.New("operator not found")
58+
5159
// Others
5260
ErrCompactionOffsetSomehowAhead = errors.New("system invariant was violated. Compaction offset in sysdb should always be behind or equal to offset in log")
5361
)

go/pkg/sysdb/coordinator/task.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package coordinator
2+
3+
import (
4+
"context"
5+
"strings"
6+
"time"
7+
8+
"github.com/chroma-core/chroma/go/pkg/common"
9+
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
10+
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
11+
"github.com/google/uuid"
12+
"github.com/pingcap/log"
13+
"go.uber.org/zap"
14+
"google.golang.org/protobuf/proto"
15+
)
16+
17+
// CreateTask creates a new task in the database
18+
func (s *Coordinator) CreateTask(ctx context.Context, req *coordinatorpb.CreateTaskRequest) (*coordinatorpb.CreateTaskResponse, error) {
19+
// Validate task name doesn't start with soft-deletion reserved prefix
20+
if strings.HasPrefix(req.Name, "_deleted_") {
21+
log.Error("CreateTask: task name cannot start with _deleted_")
22+
return nil, common.ErrInvalidTaskName
23+
}
24+
25+
var taskID uuid.UUID
26+
27+
// Execute all database operations in a transaction
28+
err := s.catalog.txImpl.Transaction(ctx, func(txCtx context.Context) error {
29+
// Check if task already exists
30+
existingTask, err := s.catalog.metaDomain.TaskDb(txCtx).GetByName(req.InputCollectionId, req.Name)
31+
if err != nil {
32+
log.Error("CreateTask: failed to check task", zap.Error(err))
33+
return err
34+
}
35+
if existingTask != nil {
36+
log.Info("CreateTask: task already exists, returning existing")
37+
taskID = existingTask.ID
38+
return nil
39+
}
40+
41+
// Generate new task UUID
42+
taskID = uuid.New()
43+
outputCollectionName := req.OutputCollectionName
44+
45+
// Look up database_id from databases table using database name and tenant
46+
databases, err := s.catalog.metaDomain.DatabaseDb(txCtx).GetDatabases(req.TenantId, req.Database)
47+
if err != nil {
48+
log.Error("CreateTask: failed to get database", zap.Error(err))
49+
return err
50+
}
51+
if len(databases) == 0 {
52+
log.Error("CreateTask: database not found")
53+
return common.ErrDatabaseNotFound
54+
}
55+
56+
// Look up operator by name from the operators table
57+
operator, err := s.catalog.metaDomain.OperatorDb(txCtx).GetByName(req.OperatorName)
58+
if err != nil {
59+
log.Error("CreateTask: failed to get operator", zap.Error(err))
60+
return err
61+
}
62+
if operator == nil {
63+
log.Error("CreateTask: operator not found", zap.String("operator_name", req.OperatorName))
64+
return common.ErrOperatorNotFound
65+
}
66+
operatorID := operator.OperatorID
67+
68+
// Generate UUIDv7 for time-ordered nonce
69+
nextNonce, err := uuid.NewV7()
70+
if err != nil {
71+
return err
72+
}
73+
74+
// TODO(tanujnay112): Can combine the two collection checks into one
75+
// Check if input collection exists
76+
collections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections([]string{req.InputCollectionId}, nil, req.TenantId, req.Database, nil, nil, false)
77+
if err != nil {
78+
log.Error("CreateTask: failed to get input collection", zap.Error(err))
79+
return err
80+
}
81+
if len(collections) == 0 {
82+
log.Error("CreateTask: input collection not found")
83+
return common.ErrCollectionNotFound
84+
}
85+
86+
// Check if output collection already exists
87+
existingOutputCollections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections(nil, &outputCollectionName, req.TenantId, req.Database, nil, nil, false)
88+
if err != nil {
89+
log.Error("CreateTask: failed to check output collection", zap.Error(err))
90+
return err
91+
}
92+
if len(existingOutputCollections) > 0 {
93+
log.Error("CreateTask: output collection already exists")
94+
return common.ErrCollectionUniqueConstraintViolation
95+
}
96+
97+
now := time.Now()
98+
task := &dbmodel.Task{
99+
ID: taskID,
100+
Name: req.Name,
101+
TenantID: req.TenantId,
102+
DatabaseID: databases[0].ID,
103+
InputCollectionID: req.InputCollectionId,
104+
OutputCollectionName: req.OutputCollectionName,
105+
OperatorID: operatorID,
106+
OperatorParams: req.Params,
107+
CompletionOffset: 0,
108+
LastRun: nil,
109+
NextRun: nil, // Will be set to zero initially, scheduled by task scheduler
110+
MinRecordsForTask: int64(req.MinRecordsForTask),
111+
CurrentAttempts: 0,
112+
CreatedAt: now,
113+
UpdatedAt: now,
114+
NextNonce: nextNonce,
115+
OldestWrittenNonce: nil,
116+
}
117+
118+
// Try to insert task into database
119+
err = s.catalog.metaDomain.TaskDb(txCtx).Insert(task)
120+
if err != nil {
121+
// Check if it's a unique constraint violation (concurrent creation)
122+
if err == common.ErrTaskAlreadyExists {
123+
log.Error("CreateTask: task already exists")
124+
return common.ErrTaskAlreadyExists
125+
}
126+
log.Error("CreateTask: failed to insert task", zap.Error(err))
127+
return err
128+
}
129+
130+
log.Info("Task created successfully", zap.String("task_id", taskID.String()), zap.String("name", req.Name), zap.String("output_collection_name", outputCollectionName))
131+
return nil
132+
})
133+
134+
if err != nil {
135+
return nil, err
136+
}
137+
138+
return &coordinatorpb.CreateTaskResponse{
139+
TaskId: taskID.String(),
140+
}, nil
141+
}
142+
143+
// GetTaskByName retrieves a task by name from the database
144+
func (s *Coordinator) GetTaskByName(ctx context.Context, req *coordinatorpb.GetTaskByNameRequest) (*coordinatorpb.GetTaskByNameResponse, error) {
145+
// Can do both calls with a JOIN
146+
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByName(req.InputCollectionId, req.TaskName)
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
// If task not found, return empty response
152+
if task == nil {
153+
return nil, common.ErrTaskNotFound
154+
}
155+
156+
// Look up operator name from operators table
157+
operator, err := s.catalog.metaDomain.OperatorDb(ctx).GetByID(task.OperatorID)
158+
if err != nil {
159+
log.Error("GetTaskByName: failed to get operator", zap.Error(err))
160+
return nil, err
161+
}
162+
if operator == nil {
163+
log.Error("GetTaskByName: operator not found", zap.String("operator_id", task.OperatorID.String()))
164+
return nil, common.ErrOperatorNotFound
165+
}
166+
167+
// Debug logging
168+
log.Info("Found task", zap.String("task_id", task.ID.String()), zap.String("name", task.Name), zap.String("input_collection_id", task.InputCollectionID), zap.String("output_collection_name", task.OutputCollectionName))
169+
170+
// Convert task to response
171+
return &coordinatorpb.GetTaskByNameResponse{
172+
TaskId: proto.String(task.ID.String()),
173+
Name: proto.String(task.Name),
174+
OperatorName: proto.String(operator.OperatorName),
175+
InputCollectionId: proto.String(task.InputCollectionID),
176+
OutputCollectionName: proto.String(task.OutputCollectionName),
177+
Params: proto.String(task.OperatorParams),
178+
CompletionOffset: proto.Int64(task.CompletionOffset),
179+
MinRecordsForTask: proto.Uint64(uint64(task.MinRecordsForTask)),
180+
}, nil
181+
}
182+
183+
// DeleteTask soft deletes a task by name
184+
func (s *Coordinator) DeleteTask(ctx context.Context, req *coordinatorpb.DeleteTaskRequest) (*coordinatorpb.DeleteTaskResponse, error) {
185+
err := s.catalog.metaDomain.TaskDb(ctx).SoftDelete(req.InputCollectionId, req.TaskName)
186+
if err != nil {
187+
log.Error("DeleteTask failed", zap.Error(err))
188+
return nil, err
189+
}
190+
191+
log.Info("Task deleted", zap.String("input_collection_id", req.InputCollectionId), zap.String("task_name", req.TaskName))
192+
193+
return &coordinatorpb.DeleteTaskResponse{
194+
Success: true,
195+
}, nil
196+
}
197+
198+
// GetOperators retrieves all operators from the database
199+
func (s *Coordinator) GetOperators(ctx context.Context, req *coordinatorpb.GetOperatorsRequest) (*coordinatorpb.GetOperatorsResponse, error) {
200+
operators, err := s.catalog.metaDomain.OperatorDb(ctx).GetAll()
201+
if err != nil {
202+
log.Error("GetOperators failed", zap.Error(err))
203+
return nil, err
204+
}
205+
206+
// Convert to proto response
207+
protoOperators := make([]*coordinatorpb.Operator, len(operators))
208+
for i, op := range operators {
209+
protoOperators[i] = &coordinatorpb.Operator{
210+
Id: op.OperatorID.String(),
211+
Name: op.OperatorName,
212+
}
213+
}
214+
215+
log.Info("GetOperators succeeded", zap.Int("count", len(operators)))
216+
217+
return &coordinatorpb.GetOperatorsResponse{
218+
Operators: protoOperators,
219+
}, nil
220+
}

go/pkg/sysdb/grpc/task_service.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
6+
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
7+
"github.com/pingcap/log"
8+
"go.uber.org/zap"
9+
)
10+
11+
func (s *Server) CreateTask(ctx context.Context, req *coordinatorpb.CreateTaskRequest) (*coordinatorpb.CreateTaskResponse, error) {
12+
log.Info("CreateTask", zap.String("name", req.Name), zap.String("operator_name", req.OperatorName))
13+
14+
res, err := s.coordinator.CreateTask(ctx, req)
15+
if err != nil {
16+
log.Error("CreateTask failed", zap.Error(err))
17+
return nil, err
18+
}
19+
20+
return res, nil
21+
}
22+
23+
func (s *Server) GetTaskByName(ctx context.Context, req *coordinatorpb.GetTaskByNameRequest) (*coordinatorpb.GetTaskByNameResponse, error) {
24+
log.Info("GetTaskByName", zap.String("input_collection_id", req.InputCollectionId), zap.String("task_name", req.TaskName))
25+
26+
res, err := s.coordinator.GetTaskByName(ctx, req)
27+
if err != nil {
28+
log.Error("GetTaskByName failed", zap.Error(err))
29+
return nil, err
30+
}
31+
32+
return res, nil
33+
}
34+
35+
func (s *Server) DeleteTask(ctx context.Context, req *coordinatorpb.DeleteTaskRequest) (*coordinatorpb.DeleteTaskResponse, error) {
36+
log.Info("DeleteTask", zap.String("input_collection_id", req.InputCollectionId), zap.String("task_name", req.TaskName))
37+
38+
res, err := s.coordinator.DeleteTask(ctx, req)
39+
if err != nil {
40+
log.Error("DeleteTask failed", zap.Error(err))
41+
return nil, err
42+
}
43+
44+
return res, nil
45+
}
46+
47+
func (s *Server) GetOperators(ctx context.Context, req *coordinatorpb.GetOperatorsRequest) (*coordinatorpb.GetOperatorsResponse, error) {
48+
log.Info("GetOperators")
49+
50+
res, err := s.coordinator.GetOperators(ctx, req)
51+
if err != nil {
52+
log.Error("GetOperators failed", zap.Error(err))
53+
return nil, err
54+
}
55+
56+
return res, nil
57+
}

0 commit comments

Comments
 (0)