Skip to content
5 changes: 2 additions & 3 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
es := getSenderEth()
sdeps.EthSender = es

pdp.NewWatcherCreate(db, must.One(dependencies.EthClient.Val()), chainSched)
pdp.NewWatcherPieceAdd(db, must.One(dependencies.EthClient.Val()), chainSched)
pdp.NewDataSetWatch(db, must.One(dependencies.EthClient.Val()), chainSched)

pdpProveTask := pdp.NewProveTask(chainSched, db, must.One(dependencies.EthClient.Val()), dependencies.Chain, es, dependencies.CachedPieceReader)
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
Expand Down Expand Up @@ -512,7 +511,7 @@ func machineDetails(deps *deps.Deps, activeTasks []harmonytask.TaskInterface, ma
})
sort.Strings(miners)

_, err := deps.DB.Exec(context.Background(), `INSERT INTO harmony_machine_details
_, err := deps.DB.Exec(context.Background(), `INSERT INTO harmony_machine_details
(tasks, layers, startup_time, miners, machine_id, machine_name) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (machine_id) DO UPDATE SET tasks=$1, layers=$2, startup_time=$3, miners=$4, machine_id=$5, machine_name=$6`,
strings.Join(taskNames, ","), strings.Join(deps.Layers, ","),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Changes the data_set column to be nullable in the pdp_data_set_piece_adds table to faciliate create-and-add workflow.
-- Combined migration: make `data_set` nullable and adjust PK
-- New primary key: (add_message_hash HASH, add_message_index ASC)
-- Old primary key: (data_set HASH, add_message_hash ASC, add_message_index ASC)

DO $$
BEGIN
-- Step 1: Drop existing PK if it still uses data_set
IF EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'pdp_data_set_piece_adds_pk'
AND conrelid = 'pdp_data_set_piece_adds'::regclass
) THEN
ALTER TABLE pdp_data_set_piece_adds
DROP CONSTRAINT pdp_data_set_piece_adds_pk;
END IF;

-- Step 2: Create new PK with add_message_hash as HASH key
ALTER TABLE pdp_data_set_piece_adds
ADD CONSTRAINT pdp_data_set_piece_adds_pk
PRIMARY KEY (add_message_hash HASH, add_message_index ASC);

-- Step 3: Make `data_set` nullable if it is currently NOT NULL
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_name = 'pdp_data_set_piece_adds'
AND column_name = 'data_set'
AND is_nullable = 'NO'
) THEN
ALTER TABLE pdp_data_set_piece_adds
ALTER COLUMN data_set DROP NOT NULL;
END IF;
END $$;
50 changes: 3 additions & 47 deletions pdp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func Routes(r *chi.Mux, p *PDPService) {
// POST /pdp/data-sets - Create a new data set
r.Post("/", p.handleCreateDataSet)

// POST /pdp/data-sets/create-and-add - Create a new data set and add pieces at the same time
r.Post("/create-and-add", p.handleCreateDataSetAndAddPieces)

// GET /pdp/data-sets/created/{txHash} - Get the status of a data set creation
r.Get("/created/{txHash}", p.handleGetDataSetCreationStatus)

Expand Down Expand Up @@ -289,53 +292,6 @@ func (p *PDPService) getSenderAddress(ctx context.Context) (common.Address, erro
return address, nil
}

// insertMessageWaitsAndDataSetCreate inserts records into message_waits_eth and pdp_data_set_creates
func (p *PDPService) insertMessageWaitsAndDataSetCreate(ctx context.Context, txHashHex string, serviceLabel string) error {
// Begin a database transaction
_, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
// Insert into message_waits_eth
log.Debugw("Inserting into message_waits_eth",
"txHash", txHashHex,
"status", "pending")
_, err := tx.Exec(`
INSERT INTO message_waits_eth (signed_tx_hash, tx_status)
VALUES ($1, $2)
`, txHashHex, "pending")
if err != nil {
log.Errorw("Failed to insert into message_waits_eth",
"txHash", txHashHex,
"error", err)
return false, err // Return false to rollback the transaction
}

// Insert into pdp_data_set_creates
log.Debugw("Inserting into pdp_data_set_creates",
"txHash", txHashHex,
"service", serviceLabel)
_, err = tx.Exec(`
INSERT INTO pdp_data_set_creates (create_message_hash, service)
VALUES ($1, $2)
`, txHashHex, serviceLabel)
if err != nil {
log.Errorw("Failed to insert into pdp_data_set_creates",
"txHash", txHashHex,
"error", err)
return false, err // Return false to rollback the transaction
}

log.Infow("Successfully inserted orphaned transaction for watching",
"txHash", txHashHex,
"service", serviceLabel,
"waiter_machine_id", "NULL")
// Return true to commit the transaction
return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return err
}
return nil
}

// handleGetDataSetCreationStatus handles the GET request to retrieve the status of a data set creation
func (p *PDPService) handleGetDataSetCreationStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
Expand Down
Loading