-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH]: Add models and migration for Create/Delete/Get Tasks #5546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
6abeaa6
to
ee50783
Compare
ee50783
to
dde608e
Compare
// Segment metadata errors | ||
ErrUnknownSegmentMetadataType = errors.New("segment metadata value type not supported") | ||
|
||
// Task errors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nit that I'd be better off taking up with the creators of go: Static strings for errors are hard to debug, especially if they occur at multiple places. Can we at least embed some information about what the error means? Like context about it.
) | ||
|
||
// CreateTask creates a new task in the database | ||
func (s *Coordinator) CreateTask(ctx context.Context, req *coordinatorpb.CreateTaskRequest) (*coordinatorpb.CreateTaskResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the transaction somehow carried on the context? Otherwise this is not transactional code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not, I was going to leave making task creation + collection creation a follow up to this because collection creation also needs to be refactored to take a transaction.
e96d01b
to
728aefb
Compare
Task/Operator System: Add Models, Migrations, API, and Codegen Synchronization for Create/Delete/Get Tasks This PR introduces a new system for defining, managing, and retrieving 'task' objects and their associated 'operators' in the metadata database. The changes include new database tables ( Key Changes• Created new Affected Areas• This summary was automatically generated by @propel-code-bot |
return &coordinatorpb.GetTaskByNameResponse{}, nil | ||
} | ||
|
||
// Look up operator name from operators table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[PerformanceOptimization]
N+1 query pattern: In GetTaskByName
, after retrieving the task, there's a separate query to get the operator details. If this function is called multiple times (e.g., in a batch operation), each call will make 2 database queries instead of using a JOIN.
// Look up operator name from operators table | |
// Consider using a JOIN query to fetch task and operator in one query: | |
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByNameWithOperator(req.InputCollectionId, req.TaskName) |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**PerformanceOptimization**]
N+1 query pattern: In `GetTaskByName`, after retrieving the task, there's a separate query to get the operator details. If this function is called multiple times (e.g., in a batch operation), each call will make 2 database queries instead of using a JOIN.
```suggestion
// Consider using a JOIN query to fetch task and operator in one query:
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByNameWithOperator(req.InputCollectionId, req.TaskName)
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: go/pkg/sysdb/coordinator/task.go
Line: 127
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a priority if we put the operation in a transaction.
728aefb
to
fce19ab
Compare
go/pkg/sysdb/coordinator/task.go
Outdated
} | ||
|
||
// Debug logging | ||
log.Info("Found task", zap.String("task_id", task.ID.String()), zap.String("name", task.Name), zap.String("input_collection_id", task.InputCollectionID), zap.String("output_collection_id", task.OutputCollectionID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
This log message for a successful GetTaskByName
operation seems more appropriate for the Debug
level. Logging successful reads at the Info
level can create a lot of noise in production environments, making it harder to spot actual informational messages or warnings.
Context for Agents
[**BestPractice**]
This log message for a successful `GetTaskByName` operation seems more appropriate for the `Debug` level. Logging successful reads at the `Info` level can create a lot of noise in production environments, making it harder to spot actual informational messages or warnings.
File: go/pkg/sysdb/coordinator/task.go
Line: 151
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments inline. Propel hit a few out of the park that we should look into.
fce19ab
to
76128ff
Compare
c210ac9
to
067ee2a
Compare
@@ -1,3 +1,5 @@ | |||
mod operator_codegen; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
The build is failing because the operator_codegen
module cannot be found. It seems the file rust/types/operator_codegen.rs
, which should contain this module, was not included in the pull request. This is causing CI failures across all Rust-related jobs.
Please add the missing file to resolve the build errors.
Context for Agents
[**CriticalError**]
The build is failing because the `operator_codegen` module cannot be found. It seems the file `rust/types/operator_codegen.rs`, which should contain this module, was not included in the pull request. This is causing CI failures across all Rust-related jobs.
Please add the missing file to resolve the build errors.
File: rust/types/build.rs
Line: 1
); | ||
|
||
-- Create "task_templates" table | ||
CREATE TABLE "public"."task_templates" ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
This migration creates a task_templates
table, but I don't see any corresponding Go models (dbmodel
) or logic in this PR that uses this table. Is this intended for future work? If so, it might be better to move the creation of task_templates
to a future PR where it's actually implemented. This would keep this PR focused on the task and operator functionality and avoid introducing unused schema into the database.
Context for Agents
[**BestPractice**]
This migration creates a `task_templates` table, but I don't see any corresponding Go models (`dbmodel`) or logic in this PR that uses this table. Is this intended for future work? If so, it might be better to move the creation of `task_templates` to a future PR where it's actually implemented. This would keep this PR focused on the task and operator functionality and avoid introducing unused schema into the database.
File: go/pkg/sysdb/metastore/db/migrations/20251001073000.sql
Line: 46
067ee2a
to
e7ce88d
Compare
e7ce88d
to
80725c9
Compare
if err == common.ErrTaskAlreadyExists { | ||
log.Error("CreateTask: task already exists") | ||
return common.ErrTaskAlreadyExists | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The current implementation has inconsistent behavior for handling pre-existing tasks. If a task is found with GetByName
on line 30, its ID is returned successfully. However, if a task is created concurrently between that check and the Insert
call (a race condition), the function returns ErrTaskAlreadyExists
.
This could be confusing for clients. For robust idempotency, it would be better to handle the race condition by re-fetching the concurrently created task and returning its ID, making the function's behavior consistent in all "already exists" scenarios.
Consider replacing the error return with logic to re-fetch the task to ensure the CreateTask
operation is fully idempotent.
Context for Agents
[**BestPractice**]
The current implementation has inconsistent behavior for handling pre-existing tasks. If a task is found with `GetByName` on line 30, its ID is returned successfully. However, if a task is created concurrently between that check and the `Insert` call (a race condition), the function returns `ErrTaskAlreadyExists`.
This could be confusing for clients. For robust idempotency, it would be better to handle the race condition by re-fetching the concurrently created task and returning its ID, making the function's behavior consistent in all "already exists" scenarios.
Consider replacing the error return with logic to re-fetch the task to ensure the `CreateTask` operation is fully idempotent.
File: go/pkg/sysdb/coordinator/task.go
Line: 125
51e7975
to
a4823d6
Compare
use std::fs; | ||
use std::path::Path; | ||
|
||
pub fn generate_operator_constants() -> Result<(), Box<dyn std::error::Error>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The current parsing logic for the Go constants file is a bit fragile as it relies on split
and strip_prefix
. This might be the cause of the CI failures (Invalid UUID length: type
). To make it more robust, consider using regular expressions to precisely match and extract the constant names and their values. This would prevent accidentally parsing comments or other lines that might coincidentally match the simple string checks.
For example:
use regex::Regex;
// For UUIDs:
let re_uuid = Regex::new(r#"^\s*([A-Za-z0-9_]+)\s*=\s*uuid\.MustParse\("([a-fA-F0-9-]+)"\)"#).unwrap();
// For names:
let re_name = Regex::new(r#"^\s*([A-Za-z0-9_]+)\s*=\s*"([a-z_]+)""#).unwrap();
// Then iterate through lines and use captures from the regex match.
Context for Agents
[**BestPractice**]
The current parsing logic for the Go constants file is a bit fragile as it relies on `split` and `strip_prefix`. This might be the cause of the CI failures (`Invalid UUID length: type`). To make it more robust, consider using regular expressions to precisely match and extract the constant names and their values. This would prevent accidentally parsing comments or other lines that might coincidentally match the simple string checks.
For example:
```rust
use regex::Regex;
// For UUIDs:
let re_uuid = Regex::new(r#"^\s*([A-Za-z0-9_]+)\s*=\s*uuid\.MustParse\("([a-fA-F0-9-]+)"\)"#).unwrap();
// For names:
let re_name = Regex::new(r#"^\s*([A-Za-z0-9_]+)\s*=\s*"([a-z_]+)""#).unwrap();
// Then iterate through lines and use captures from the regex match.
```
File: rust/types/operator_codegen.rs
Line: 8
ad3767e
to
3baa37f
Compare
'rust-log-service', | ||
'docker image tag rust-log-service:ci $EXPECTED_REF', | ||
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'], | ||
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The list of dependencies ['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go']
is repeated for all Rust services (rust-log-service
, rust-frontend-service
, query-service
, etc.).
To improve maintainability and keep the file DRY, consider defining this list as a variable at the top and reusing it for each service.
For example:
RUST_DEPS = ['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go']
# ...
if config.tilt_subcommand == "ci":
custom_build(
'rust-log-service',
'docker image tag rust-log-service:ci $EXPECTED_REF',
RUST_DEPS,
disable_push=True
)
else:
docker_build(
'rust-log-service',
'.',
only=RUST_DEPS,
dockerfile='./rust/Dockerfile',
target='log_service'
)
# ... apply the same for other rust services
Context for Agents
[**BestPractice**]
The list of dependencies `['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go']` is repeated for all Rust services (`rust-log-service`, `rust-frontend-service`, `query-service`, etc.).
To improve maintainability and keep the file DRY, consider defining this list as a variable at the top and reusing it for each service.
For example:
```python
RUST_DEPS = ['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go']
# ...
if config.tilt_subcommand == "ci":
custom_build(
'rust-log-service',
'docker image tag rust-log-service:ci $EXPECTED_REF',
RUST_DEPS,
disable_push=True
)
else:
docker_build(
'rust-log-service',
'.',
only=RUST_DEPS,
dockerfile='./rust/Dockerfile',
target='log_service'
)
# ... apply the same for other rust services
```
File: Tiltfile
Line: 23
deleteCollection := &model.DeleteCollection{ | ||
ID: collectionUUID, | ||
TenantID: task.TenantID, | ||
DatabaseName: task.DatabaseID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
There appears to be a bug here. The model.DeleteCollection
struct expects a DatabaseName
, but you are passing task.DatabaseID
. This will likely cause SoftDeleteCollection
to fail because it will be searching for a database with a name that is actually a UUID string.
The task
model doesn't store the database name, so you'll need to fetch it. This might require adding a GetDatabaseByID
method to the database DAO and interface, as one doesn't seem to exist currently.
Example of what the fix might look like:
// First, fetch the database to get its name
// This assumes a new method GetDatabaseByID is added to the DB interface
database, err := s.catalog.metaDomain.DatabaseDb(ctx).GetDatabaseByID(task.DatabaseID)
if err != nil {
log.Error("DeleteTask: failed to get database by ID", zap.Error(err), zap.String("database_id", task.DatabaseID))
return nil, err
}
if database == nil {
log.Error("DeleteTask: database not found for ID", zap.String("database_id", task.DatabaseID))
return nil, common.ErrDatabaseNotFound
}
deleteCollection := &model.DeleteCollection{
ID: collectionUUID,
TenantID: task.TenantID,
DatabaseName: database.Name, // Use the fetched database name
}
Context for Agents
[**CriticalError**]
There appears to be a bug here. The `model.DeleteCollection` struct expects a `DatabaseName`, but you are passing `task.DatabaseID`. This will likely cause `SoftDeleteCollection` to fail because it will be searching for a database with a name that is actually a UUID string.
The `task` model doesn't store the database name, so you'll need to fetch it. This might require adding a `GetDatabaseByID` method to the database DAO and interface, as one doesn't seem to exist currently.
Example of what the fix might look like:
```go
// First, fetch the database to get its name
// This assumes a new method GetDatabaseByID is added to the DB interface
database, err := s.catalog.metaDomain.DatabaseDb(ctx).GetDatabaseByID(task.DatabaseID)
if err != nil {
log.Error("DeleteTask: failed to get database by ID", zap.Error(err), zap.String("database_id", task.DatabaseID))
return nil, err
}
if database == nil {
log.Error("DeleteTask: database not found for ID", zap.String("database_id", task.DatabaseID))
return nil, common.ErrDatabaseNotFound
}
deleteCollection := &model.DeleteCollection{
ID: collectionUUID,
TenantID: task.TenantID,
DatabaseName: database.Name, // Use the fetched database name
}
```
File: go/pkg/sysdb/coordinator/task.go
Line: 216
3baa37f
to
bfad21e
Compare
if task == nil { | ||
log.Error("DeleteTask: task not found") | ||
return nil, status.Errorf(codes.NotFound, "task not found") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[ArchitectureDecision]
In DeleteTask
, the function returns a gRPC status.Errorf
error directly. This is inconsistent with other functions in the coordinator layer (like GetTaskByName
which returns common.ErrTaskNotFound
) that return errors from the common
package. The coordinator layer should ideally be agnostic of the transport layer. It's better to return common
errors and let the gRPC service layer handle the translation to gRPC status codes.
if task == nil { | |
log.Error("DeleteTask: task not found") | |
return nil, status.Errorf(codes.NotFound, "task not found") | |
} | |
if task == nil { | |
log.Error("DeleteTask: task not found") | |
return nil, common.ErrTaskNotFound | |
} |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**ArchitectureDecision**]
In `DeleteTask`, the function returns a gRPC `status.Errorf` error directly. This is inconsistent with other functions in the coordinator layer (like `GetTaskByName` which returns `common.ErrTaskNotFound`) that return errors from the `common` package. The coordinator layer should ideally be agnostic of the transport layer. It's better to return `common` errors and let the gRPC service layer handle the translation to gRPC status codes.
```suggestion
if task == nil {
log.Error("DeleteTask: task not found")
return nil, common.ErrTaskNotFound
}
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: go/pkg/sysdb/coordinator/task.go
Line: 203
collectionUUID, err := types.ToUniqueID(task.OutputCollectionID) | ||
if err != nil { | ||
log.Error("DeleteTask: invalid output_collection_id", zap.Error(err)) | ||
return nil, status.Errorf(codes.InvalidArgument, "invalid output_collection_id: %v", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[ArchitectureDecision]
Similar to the other error handling feedback, returning status.Errorf
here couples the coordinator with gRPC. It's better to return a standard Go error and let the caller (the gRPC handler) wrap it in a gRPC status.
Suggested Change
collectionUUID, err := types.ToUniqueID(task.OutputCollectionID) | |
if err != nil { | |
log.Error("DeleteTask: invalid output_collection_id", zap.Error(err)) | |
return nil, status.Errorf(codes.InvalidArgument, "invalid output_collection_id: %v", err) | |
} | |
collectionUUID, err := types.ToUniqueID(task.OutputCollectionID) | |
if err != nil { | |
log.Error("DeleteTask: invalid output_collection_id", zap.Error(err)) | |
return nil, err | |
} |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**ArchitectureDecision**]
Similar to the other error handling feedback, returning `status.Errorf` here couples the coordinator with gRPC. It's better to return a standard Go error and let the caller (the gRPC handler) wrap it in a gRPC status.
<details>
<summary>Suggested Change</summary>
```suggestion
collectionUUID, err := types.ToUniqueID(task.OutputCollectionID)
if err != nil {
log.Error("DeleteTask: invalid output_collection_id", zap.Error(err))
return nil, err
}
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
</details>
File: go/pkg/sysdb/coordinator/task.go
Line: 211
… Tasks (#5546)" (#5570) ## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - ... - New functionality - ... ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
## Description of changes _Summarize the changes made by this PR._ - Improvements & Bug fixes - Added db models and SysDB routes to Create, Delete and Get Tasks. - New functionality - ^^^ `CreateTask` here does not kick off the requisite backfill task runs in this change. That is left as followup work. ## Test plan Tests have been added in `go/pkg/sysdb/metastore/db/dao/task_test.go`. - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
…5573) ## Description of changes _Summarize the changes made by this PR._ This is a redo of [this change](#5546) which inadvertently merge conflicted with another change when it landed. That change generated task operator constants in rust by moving the go operator constants file into each service and manually generating a corresponding rust file during each docker containers build phase. This would've required every new service to make sure to copy in this Go file during build even if it didn't otherwise need Go code. This diff changes that by having a contributor manually generate said constants in rust using a supplied script to avoid the above logistics. There is a rust unit test to make sure the generated constants are in sync with what is prepopulated in the SysDB operators table. - Improvements & Bug fixes - ^^^ - New functionality - ... ## Test plan _How are these changes tested?_ - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan _Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?_ ## Observability plan _What is the plan to instrument and monitor this change?_ ## Documentation Changes _Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the_ [_docs section](https://github.com/chroma-core/chroma/tree/main/docs/docs.trychroma.com)?_
Description of changes
Summarize the changes made by this PR.
CreateTask
here does not kick off the requisite backfill task runs in this change. That is left as followup work.Test plan
Tests have been added in
go/pkg/sysdb/metastore/db/dao/task_test.go
.pytest
for python,yarn test
for js,cargo test
for rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_