Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e87c370
dyncfg 90% done
snadrus Sep 26, 2025
0662132
tests, docs, and clean-up
snadrus Oct 9, 2025
260e345
updates
snadrus Oct 9, 2025
0403006
checkers
snadrus Oct 9, 2025
d88a898
allow cmp.Equal
snadrus Oct 10, 2025
886dfd0
fixed dynamic
snadrus Oct 13, 2025
02772cc
avoid unsettable
snadrus Oct 13, 2025
de69e43
Merge branch 'main' into dyn-cfg
snadrus Oct 13, 2025
3240f22
change-notification
snadrus Oct 13, 2025
e03333d
ingest-easy
snadrus Oct 13, 2025
bb71ceb
go mod tidy
snadrus Oct 13, 2025
317fed1
md
snadrus Oct 13, 2025
575f029
address1
snadrus Oct 13, 2025
cabb400
dbg basetext
snadrus Oct 13, 2025
8f7a6ac
fix bad import
snadrus Oct 14, 2025
ff6ebdc
test logger
snadrus Oct 14, 2025
17f1050
dbgFail: is it in baseTest
snadrus Oct 14, 2025
aa24527
found 1 blocker for test failure: base was not included right
snadrus Oct 14, 2025
65ae076
Merge branch 'dyn-cfg' into apply-dyn-cfg
snadrus Oct 14, 2025
dcc4786
rm dbg
snadrus Oct 14, 2025
1808bc9
completed addresses
snadrus Oct 15, 2025
3779387
lint
snadrus Oct 15, 2025
5d27fbc
move mineraddress onchange one level up
snadrus Oct 16, 2025
2c0286e
no pq, fix unmarshal
snadrus Oct 16, 2025
b9cb21c
complex equal
snadrus Oct 16, 2025
1e0a2e1
Merge branch 'dyn-cfg' into apply-dyn-cfg
snadrus Oct 16, 2025
deb9967
mod tidy & naming
snadrus Oct 17, 2025
6d721d8
fix Fil cmp panic
snadrus Oct 17, 2025
7282be5
Merge branch 'dyn-cfg' into apply-dyn-cfg
snadrus Oct 17, 2025
0127d96
transparent-decode
snadrus Oct 17, 2025
164bee8
finish remaining holes
snadrus Oct 17, 2025
342582b
rm comment
snadrus Oct 17, 2025
1dd9d6c
Merge branch 'main' into apply-dyn-cfg
snadrus Oct 17, 2025
0f34307
merge oops
snadrus Oct 17, 2025
51fc3a6
marshaltoml warning
snadrus Oct 17, 2025
452644e
fixed doc
snadrus Oct 17, 2025
a3f3153
eqEmpty needed for test
snadrus Oct 17, 2025
ab411fc
tests fixed
snadrus Oct 17, 2025
87c3d13
lint
snadrus Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions alertmanager/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,17 @@ func (al *alerts) getAddresses() ([]address.Address, []address.Address, error) {
return nil, nil, err
}

_, err = toml.Decode(text, cfg)
_, err = config.TransparentDecode(text, cfg)
if err != nil {
return nil, nil, xerrors.Errorf("could not read layer, bad toml %s: %w", layer, err)
}

for i := range cfg.Addresses {
prec := cfg.Addresses[i].PreCommitControl
com := cfg.Addresses[i].CommitControl
term := cfg.Addresses[i].TerminateControl
miners := cfg.Addresses[i].MinerAddresses
addrs := cfg.Addresses.Get()
for i := range addrs {
prec := addrs[i].PreCommitControl
com := addrs[i].CommitControl
term := addrs[i].TerminateControl
miners := addrs[i].MinerAddresses
for j := range prec {
if prec[j] != "" {
addrMap[prec[j]] = struct{}{}
Expand Down
11 changes: 6 additions & 5 deletions cmd/curio/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,18 +572,19 @@ func TestConfig(t *testing.T) {
_, err := deps.LoadConfigWithUpgrades(baseText, baseCfg)
require.NoError(t, err)

baseCfg.Addresses = append(baseCfg.Addresses, addr1)
baseCfg.Addresses = lo.Filter(baseCfg.Addresses, func(a config.CurioAddresses, _ int) bool {
addrs := []config.CurioAddresses{addr1}
addrs = lo.Filter(addrs, func(a config.CurioAddresses, _ int) bool {
return len(a.MinerAddresses) > 0
})

baseCfg.Addresses.Set(addrs)
_, err = config.ConfigUpdate(baseCfg, config.DefaultCurioConfig(), config.Commented(true), config.DefaultKeepUncommented(), config.NoEnv())
require.NoError(t, err)

baseCfg.Addresses = append(baseCfg.Addresses, addr2)
baseCfg.Addresses = lo.Filter(baseCfg.Addresses, func(a config.CurioAddresses, _ int) bool {
baseCfg.Addresses.Set(append(baseCfg.Addresses.Get(), addr2))
baseCfg.Addresses.Set(lo.Filter(baseCfg.Addresses.Get(), func(a config.CurioAddresses, _ int) bool {
return len(a.MinerAddresses) > 0
})
}))

_, err = config.ConfigUpdate(baseCfg, config.DefaultCurioConfig(), config.Commented(true), config.DefaultKeepUncommented(), config.NoEnv())
require.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions cmd/curio/guidedsetup/guidedsetup.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func stepNewMinerConfig(d *MigrationData) {

// Only add miner address for SP setup
if !d.nonSP {
curioCfg.Addresses = append(curioCfg.Addresses, config.CurioAddresses{
curioCfg.Addresses.Set([]config.CurioAddresses{{
PreCommitControl: []string{},
CommitControl: []string{},
DealPublishControl: []string{},
Expand All @@ -548,7 +548,7 @@ func stepNewMinerConfig(d *MigrationData) {
DisableWorkerFallback: false,
MinerAddresses: []string{d.MinerID.String()},
BalanceManager: config.DefaultBalanceManager(),
})
}})
}

sk, err := io.ReadAll(io.LimitReader(rand.Reader, 32))
Expand Down Expand Up @@ -604,9 +604,9 @@ func stepNewMinerConfig(d *MigrationData) {
// If 'base' layer is not present
if !lo.Contains(titles, "base") {
if !d.nonSP {
curioCfg.Addresses = lo.Filter(curioCfg.Addresses, func(a config.CurioAddresses, _ int) bool {
curioCfg.Addresses.Set(lo.Filter(curioCfg.Addresses.Get(), func(a config.CurioAddresses, _ int) bool {
return len(a.MinerAddresses) > 0
})
}))
}
cb, err := config.ConfigUpdate(curioCfg, config.DefaultCurioConfig(), config.Commented(true), config.DefaultKeepUncommented(), config.NoEnv())
if err != nil {
Expand Down Expand Up @@ -661,10 +661,10 @@ func stepNewMinerConfig(d *MigrationData) {
os.Exit(1)
}

baseCfg.Addresses = append(baseCfg.Addresses, curioCfg.Addresses...)
baseCfg.Addresses = lo.Filter(baseCfg.Addresses, func(a config.CurioAddresses, _ int) bool {
baseCfg.Addresses.Set(append(baseCfg.Addresses.Get(), curioCfg.Addresses.Get()...))
baseCfg.Addresses.Set(lo.Filter(baseCfg.Addresses.Get(), func(a config.CurioAddresses, _ int) bool {
return len(a.MinerAddresses) > 0
})
}))

cb, err := config.ConfigUpdate(baseCfg, config.DefaultCurioConfig(), config.Commented(true), config.DefaultKeepUncommented(), config.NoEnv())
if err != nil {
Expand Down
55 changes: 30 additions & 25 deletions cmd/curio/guidedsetup/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"path"
"strings"

"github.com/BurntSushi/toml"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
Expand Down Expand Up @@ -150,7 +149,7 @@ func SaveConfigToLayerMigrateSectors(db *harmonydb.DB, minerRepoPath, chainApiIn

minerAddress = addr

curioCfg.Addresses = []config.CurioAddresses{{
curioCfg.Addresses.Set([]config.CurioAddresses{{
MinerAddresses: []string{addr.String()},
PreCommitControl: smCfg.Addresses.PreCommitControl,
CommitControl: smCfg.Addresses.CommitControl,
Expand All @@ -159,7 +158,7 @@ func SaveConfigToLayerMigrateSectors(db *harmonydb.DB, minerRepoPath, chainApiIn
DisableOwnerFallback: smCfg.Addresses.DisableOwnerFallback,
DisableWorkerFallback: smCfg.Addresses.DisableWorkerFallback,
BalanceManager: config.DefaultBalanceManager(),
}}
}})

ks, err := lr.KeyStore()
if err != nil {
Expand All @@ -174,10 +173,11 @@ func SaveConfigToLayerMigrateSectors(db *harmonydb.DB, minerRepoPath, chainApiIn

curioCfg.Apis.ChainApiInfo = append(curioCfg.Apis.ChainApiInfo, chainApiInfo)
// Express as configTOML
configTOML := &bytes.Buffer{}
if err = toml.NewEncoder(configTOML).Encode(curioCfg); err != nil {
configTOMLBytes, err := config.TransparentMarshal(curioCfg)
if err != nil {
return minerAddress, err
}
configTOML := bytes.NewBuffer(configTOMLBytes)

if lo.Contains(titles, "base") {
// append addresses
Expand All @@ -192,17 +192,20 @@ func SaveConfigToLayerMigrateSectors(db *harmonydb.DB, minerRepoPath, chainApiIn
if err != nil {
return minerAddress, xerrors.Errorf("Cannot load base config: %w", err)
}
for _, addr := range baseCfg.Addresses {
if lo.Contains(addr.MinerAddresses, curioCfg.Addresses[0].MinerAddresses[0]) {
addrs := baseCfg.Addresses.Get()
for _, addr := range addrs {
ma := addr.MinerAddresses
if lo.Contains(ma, addrs[0].MinerAddresses[0]) {
goto skipWritingToBase
}
}
// write to base
{
baseCfg.Addresses = append(baseCfg.Addresses, curioCfg.Addresses[0])
baseCfg.Addresses = lo.Filter(baseCfg.Addresses, func(a config.CurioAddresses, _ int) bool {
addrs := baseCfg.Addresses.Get()
addrs = append(addrs, addrs[0])
baseCfg.Addresses.Set(lo.Filter(addrs, func(a config.CurioAddresses, _ int) bool {
return len(a.MinerAddresses) > 0
})
}))
if baseCfg.Apis.ChainApiInfo == nil {
baseCfg.Apis.ChainApiInfo = append(baseCfg.Apis.ChainApiInfo, chainApiInfo)
}
Expand All @@ -223,7 +226,7 @@ func SaveConfigToLayerMigrateSectors(db *harmonydb.DB, minerRepoPath, chainApiIn
}
say(plain, "Configuration 'base' was updated to include this miner's address (%s) and its wallet setup.", minerAddress)
}
say(plain, "Compare the configurations %s to %s. Changes between the miner IDs other than wallet addreses should be a new, minimal layer for runners that need it.", "base", "mig-"+curioCfg.Addresses[0].MinerAddresses[0])
say(plain, "Compare the configurations %s to %s. Changes between the miner IDs other than wallet addreses should be a new, minimal layer for runners that need it.", "base", "mig-"+curioCfg.Addresses.Get()[0].MinerAddresses[0])
skipWritingToBase:
} else {
_, err = db.Exec(ctx, `INSERT INTO harmony_config (title, config) VALUES ('base', $1)
Expand All @@ -236,20 +239,20 @@ func SaveConfigToLayerMigrateSectors(db *harmonydb.DB, minerRepoPath, chainApiIn
}

{ // make a layer representing the migration
layerName := fmt.Sprintf("mig-%s", curioCfg.Addresses[0].MinerAddresses[0])
layerName := fmt.Sprintf("mig-%s", curioCfg.Addresses.Get()[0].MinerAddresses[0])
_, err = db.Exec(ctx, "DELETE FROM harmony_config WHERE title=$1", layerName)
if err != nil {
return minerAddress, xerrors.Errorf("Cannot delete existing layer: %w", err)
}

// Express as new toml to avoid adding StorageRPCSecret in more than 1 layer
curioCfg.Apis.StorageRPCSecret = ""
ct := &bytes.Buffer{}
if err = toml.NewEncoder(ct).Encode(curioCfg); err != nil {
ctBytes, err := config.TransparentMarshal(curioCfg)
if err != nil {
return minerAddress, err
}

_, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ($1, $2)", layerName, ct.String())
_, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ($1, $2)", layerName, string(ctBytes))
if err != nil {
return minerAddress, xerrors.Errorf("Cannot insert layer after layer created message: %w", err)
}
Expand Down Expand Up @@ -287,22 +290,24 @@ func getDBSettings(smCfg config.StorageMiner) string {

func ensureEmptyArrays(cfg *config.CurioConfig) {
if cfg.Addresses == nil {
cfg.Addresses = []config.CurioAddresses{}
cfg.Addresses.Set([]config.CurioAddresses{})
} else {
for i := range cfg.Addresses {
if cfg.Addresses[i].PreCommitControl == nil {
cfg.Addresses[i].PreCommitControl = []string{}
addrs := cfg.Addresses.Get()
for i := range addrs {
if addrs[i].PreCommitControl == nil {
addrs[i].PreCommitControl = []string{}
}
if cfg.Addresses[i].CommitControl == nil {
cfg.Addresses[i].CommitControl = []string{}
if addrs[i].CommitControl == nil {
addrs[i].CommitControl = []string{}
}
if cfg.Addresses[i].DealPublishControl == nil {
cfg.Addresses[i].DealPublishControl = []string{}
if addrs[i].DealPublishControl == nil {
addrs[i].DealPublishControl = []string{}
}
if cfg.Addresses[i].TerminateControl == nil {
cfg.Addresses[i].TerminateControl = []string{}
if addrs[i].TerminateControl == nil {
addrs[i].TerminateControl = []string{}
}
}
cfg.Addresses.Set(addrs)
}
if cfg.Apis.ChainApiInfo == nil {
cfg.Apis.ChainApiInfo = []string{}
Expand Down
83 changes: 46 additions & 37 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var log = logging.Logger("curio/deps")

func WindowPostScheduler(ctx context.Context, fc config.CurioFees, pc config.CurioProvingConfig,
api api.Chain, verif storiface.Verifier, paramck func() (bool, error), sender *message.Sender, chainSched *chainsched.CurioChainSched,
as *multictladdr.MultiAddressSelector, addresses map[dtypes.MinerAddress]bool, db *harmonydb.DB,
as *multictladdr.MultiAddressSelector, addresses *config.Dynamic[map[dtypes.MinerAddress]bool], db *harmonydb.DB,
stor paths.Store, idx paths.SectorIndex, max int) (*window2.WdPostTask, *window2.WdPostSubmitTask, *window2.WdPostRecoverDeclareTask, error) {

// todo config
Expand Down Expand Up @@ -243,10 +243,15 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
}
}

miners := make([]address.Address, 0, len(maddrs))
for k := range maddrs {
miners = append(miners, address.Address(k))
miners := config.NewDynamic(make([]address.Address, 0, len(maddrs.Get())))
forMiners := func() {
minersTmp := make([]address.Address, 0, len(maddrs.Get()))
for k := range maddrs.Get() {
minersTmp = append(minersTmp, address.Address(k))
}
miners.Set(minersTmp)
}
maddrs.OnChange(forMiners)

if cfg.Subsystems.EnableBalanceManager {
balMgrTask, err := storage_market.NewBalanceManager(full, miners, cfg, sender)
Expand Down Expand Up @@ -534,50 +539,54 @@ func machineDetails(deps *deps.Deps, activeTasks []harmonytask.TaskInterface, ma
return item.TypeDetails().Name
})

miners := lo.Map(maps.Keys(deps.Maddrs), func(item dtypes.MinerAddress, _ int) string {
return address.Address(item).String()
})
sort.Strings(miners)
doMachineDetails := func() {
miners := lo.Map(maps.Keys(deps.Maddrs.Get()), func(item dtypes.MinerAddress, _ int) string {
return address.Address(item).String()
})
sort.Strings(miners)

_, err := deps.DB.Exec(context.Background(), `INSERT INTO harmony_machine_details
_, err := deps.DB.Exec(context.Background(), `INSERT INTO harmony_machine_details
(tasks, layers, startup_time, miners, machine_id, machine_name) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (machine_id) DO UPDATE SET tasks=$1, layers=$2, startup_time=$3, miners=$4, machine_id=$5, machine_name=$6`,
strings.Join(taskNames, ","), strings.Join(deps.Layers, ","),
time.Now(), strings.Join(miners, ","), machineID, machineName)

if err != nil {
log.Errorf("failed to update machine details: %s", err)
return
}
strings.Join(taskNames, ","), strings.Join(deps.Layers, ","),
time.Now(), strings.Join(miners, ","), machineID, machineName)

// maybePostWarning
if !lo.Contains(taskNames, "WdPost") && !lo.Contains(taskNames, "WinPost") {
// Maybe we aren't running a PoSt for these miners?
var allMachines []struct {
MachineID int `db:"machine_id"`
Miners string `db:"miners"`
Tasks string `db:"tasks"`
}
err := deps.DB.Select(context.Background(), &allMachines, `SELECT machine_id, miners, tasks FROM harmony_machine_details`)
if err != nil {
log.Errorf("failed to get machine details: %s", err)
log.Errorf("failed to update machine details: %s", err)
return
}

for _, miner := range miners {
var myPostIsHandled bool
for _, m := range allMachines {
if !lo.Contains(strings.Split(m.Miners, ","), miner) {
continue
// maybePostWarning
if !lo.Contains(taskNames, "WdPost") && !lo.Contains(taskNames, "WinPost") {
// Maybe we aren't running a PoSt for these miners?
var allMachines []struct {
MachineID int `db:"machine_id"`
Miners string `db:"miners"`
Tasks string `db:"tasks"`
}
err := deps.DB.Select(context.Background(), &allMachines, `SELECT machine_id, miners, tasks FROM harmony_machine_details`)
if err != nil {
log.Errorf("failed to get machine details: %s", err)
return
}

for _, miner := range miners {
var myPostIsHandled bool
for _, m := range allMachines {
if !lo.Contains(strings.Split(m.Miners, ","), miner) {
continue
}
if lo.Contains(strings.Split(m.Tasks, ","), "WdPost") && lo.Contains(strings.Split(m.Tasks, ","), "WinPost") {
myPostIsHandled = true
break
}
}
if lo.Contains(strings.Split(m.Tasks, ","), "WdPost") && lo.Contains(strings.Split(m.Tasks, ","), "WinPost") {
myPostIsHandled = true
break
if !myPostIsHandled {
log.Errorf("No PoSt tasks are running for miner %s. Start handling PoSts immediately with:\n\tcurio run --layers=\"post\" ", miner)
}
}
if !myPostIsHandled {
log.Errorf("No PoSt tasks are running for miner %s. Start handling PoSts immediately with:\n\tcurio run --layers=\"post\" ", miner)
}
}
}
doMachineDetails()
deps.Maddrs.OnChange(doMachineDetails)
}
10 changes: 5 additions & 5 deletions cmd/curio/test-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var wdPostTaskCmd = &cli.Command{
}

var taskIDs []int64
for addr := range deps.Maddrs {
for addr := range deps.Maddrs.Get() {
maddr, err := address.IDFromAddress(address.Address(addr))
if err != nil {
return xerrors.Errorf("cannot get miner id %w", err)
Expand Down Expand Up @@ -257,7 +257,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o
}
_, _ = wdPoStSubmitTask, derlareRecoverTask

if len(deps.Maddrs) == 0 {
if len(deps.Maddrs.Get()) == 0 {
return errors.New("no miners to compute WindowPoSt for")
}
head, err := deps.Chain.ChainHead(ctx)
Expand All @@ -267,7 +267,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o

di := dline.NewInfo(head.Height(), cctx.Uint64("deadline"), 0, 0, 0, 10 /*challenge window*/, 0, 0)

for maddr := range deps.Maddrs {
for maddr := range deps.Maddrs.Get() {
if spAddr != address.Undef && address.Address(maddr) != spAddr {
continue
}
Expand Down Expand Up @@ -337,7 +337,7 @@ var wdPostVanillaCmd = &cli.Command{
}
_, _ = wdPoStSubmitTask, derlareRecoverTask

if len(deps.Maddrs) == 0 {
if len(deps.Maddrs.Get()) == 0 {
return errors.New("no miners to compute WindowPoSt for")
}
head, err := deps.Chain.ChainHead(ctx)
Expand All @@ -347,7 +347,7 @@ var wdPostVanillaCmd = &cli.Command{

di := dline.NewInfo(head.Height(), cctx.Uint64("deadline"), 0, 0, 0, 10 /*challenge window*/, 0, 0)

for maddr := range deps.Maddrs {
for maddr := range deps.Maddrs.Get() {
if spAddr != address.Undef && address.Address(maddr) != spAddr {
continue
}
Expand Down
Loading