Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/replace transaction #1192

Merged
merged 19 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions code/go/0chain.net/blobber/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,29 +105,30 @@ func setupConfig(configDir string, deploymentMode int) {
func reloadConfig() error {
fmt.Print("> reload config")

db := datastore.GetStore().GetDB()
return datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
s, ok := config.Get(ctx, datastore.GetStore().GetDB())
if ok {
if err := s.CopyTo(&config.Configuration); err != nil {
return err
}
fmt.Print(" [OK]\n")
return nil
}

config.Configuration.Capacity = viper.GetInt64("capacity")

config.Configuration.MinLockDemand = viper.GetFloat64("min_lock_demand")
config.Configuration.NumDelegates = viper.GetInt("num_delegates")
config.Configuration.ReadPrice = viper.GetFloat64("read_price")
config.Configuration.ServiceCharge = viper.GetFloat64("service_charge")
config.Configuration.WritePrice = viper.GetFloat64("write_price")

s, ok := config.Get(context.TODO(), db)
if ok {
if err := s.CopyTo(&config.Configuration); err != nil {
if err := config.Update(ctx, datastore.GetStore().GetDB()); err != nil {
return err
}

fmt.Print(" [OK]\n")
return nil
}

config.Configuration.Capacity = viper.GetInt64("capacity")

config.Configuration.MinLockDemand = viper.GetFloat64("min_lock_demand")
config.Configuration.NumDelegates = viper.GetInt("num_delegates")
config.Configuration.ReadPrice = viper.GetFloat64("read_price")
config.Configuration.ServiceCharge = viper.GetFloat64("service_charge")
config.Configuration.WritePrice = viper.GetFloat64("write_price")

if err := config.Update(context.TODO(), db); err != nil {
return err
}

fmt.Print(" [OK]\n")
return nil
})
}
28 changes: 28 additions & 0 deletions code/go/0chain.net/blobber/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"net/http"
"net/http/pprof"
"runtime"
"strconv"
"sync"
Expand Down Expand Up @@ -53,19 +54,38 @@ func startServer(wg *sync.WaitGroup, r *mux.Router, mode string, port int, isTls
//address := publicIP + ":" + portString
address := ":" + strconv.Itoa(port)
var server *http.Server
var profServer *http.Server

if config.Development() {
// No WriteTimeout setup to enable pprof
server = &http.Server{
Addr: address,
ReadHeaderTimeout: 30 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
Handler: r,
}

pprofMux := http.NewServeMux()
profServer = &http.Server{
Addr: fmt.Sprintf(":%d", port-1000),
ReadTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
Handler: pprofMux,
}
initProfHandlers(pprofMux)
go func() {
err2 := profServer.ListenAndServe()
logging.Logger.Error("Http server shut down", zap.Error(err2))
}()

} else {
server = &http.Server{
Addr: address,
ReadHeaderTimeout: 30 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
Expand All @@ -91,3 +111,11 @@ func initHandlers(r *mux.Router) {
handler.SetupSwagger()
common.SetAdminCredentials()
}

func initProfHandlers(mux *http.ServeMux) {
mux.HandleFunc("/debug/pprof/", handler.RateLimitByGeneralRL(pprof.Index))
mux.HandleFunc("/debug/pprof/cmdline", handler.RateLimitByGeneralRL(pprof.Cmdline))
mux.HandleFunc("/debug/pprof/profile", handler.RateLimitByGeneralRL(pprof.Profile))
mux.HandleFunc("/debug/pprof/symbol", handler.RateLimitByGeneralRL(pprof.Symbol))
mux.HandleFunc("/debug/pprof/trace", handler.RateLimitByGeneralRL(pprof.Trace))
}
7 changes: 4 additions & 3 deletions code/go/0chain.net/blobber/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/handler"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"

"go.uber.org/zap"
Expand All @@ -30,13 +29,15 @@ func setupWorkers(ctx context.Context) {
// startRefreshSettings sync settings from blockchain
func startRefreshSettings(ctx context.Context) {
const REPEAT_DELAY = 60 * 3 // 3 minutes
var err error
for {
select {
case <-ctx.Done():
return
case <-time.After(REPEAT_DELAY * time.Second):
_, err = config.ReloadFromChain(common.GetRootContext(), datastore.GetStore().GetDB())
err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
_, e := config.ReloadFromChain(ctx, datastore.GetStore().GetDB())
return e
})
if err != nil {
logging.Logger.Warn("failed to refresh blobber settings from chain", zap.Error(err))
continue
Expand Down
203 changes: 98 additions & 105 deletions code/go/0chain.net/blobbercore/allocation/allocationchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,154 +254,147 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) (err er

logging.Logger.Info("Move to filestore", zap.String("allocation_id", a.AllocationID))

tx := datastore.GetStore().GetDB().Begin()

var refs []*Result
limitCh := make(chan struct{}, 10)
wg := &sync.WaitGroup{}

err = tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE).
FindInBatches(&refs, 50, func(tx *gorm.DB, batch int) error {
e := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
tx := datastore.GetStore().GetTransaction(ctx)
err = tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE).
FindInBatches(&refs, 50, func(tx *gorm.DB, batch int) error {

for _, ref := range refs {
for _, ref := range refs {

var count int64
if ref.PrevValidationRoot != "" {
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", a.AllocationID, ref.PrevValidationRoot).
Count(&count)
}
var count int64
if ref.PrevValidationRoot != "" {
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", a.AllocationID, ref.PrevValidationRoot).
Count(&count)
}

limitCh <- struct{}{}
wg.Add(1)
limitCh <- struct{}{}
wg.Add(1)

go func(ref *Result) {
defer func() {
<-limitCh
wg.Done()
}()
go func(ref *Result) {
defer func() {
<-limitCh
wg.Done()
}()

if count == 0 && ref.PrevValidationRoot != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot)
if count == 0 && ref.PrevValidationRoot != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
logging.Logger.Error(fmt.Sprintf("Error while moving file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while moving file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}

if ref.ThumbnailHash != "" && ref.ThumbnailHash != ref.PrevThumbnailHash {
if ref.PrevThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevThumbnailHash)
if ref.ThumbnailHash != "" && ref.ThumbnailHash != ref.PrevThumbnailHash {
if ref.PrevThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail file: %s", err.Error()),
zap.String("thumbnail_hash", ref.ThumbnailHash))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail file: %s", err.Error()),
logging.Logger.Error(fmt.Sprintf("Error while moving thumbnail file: %s", err.Error()),
zap.String("thumbnail_hash", ref.ThumbnailHash))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while moving thumbnail file: %s", err.Error()),
zap.String("thumbnail_hash", ref.ThumbnailHash))
}
}

}(ref)
}

return nil
}).Error
}(ref)
}

wg.Wait()
return nil
}).Error

if err != nil {
logging.Logger.Error("Error while moving to filestore", zap.Error(err))
tx.Rollback()
return err
}
wg.Wait()

err = tx.Exec("UPDATE reference_objects SET is_precommit=?, prev_validation_root=validation_root, prev_thumbnail_hash=thumbnail_hash WHERE allocation_id=? AND is_precommit=? AND deleted_at is NULL", false, a.AllocationID, true).Error
if err != nil {
logging.Logger.Error("Error while moving to filestore", zap.Error(err))
return err
}

if err != nil {
tx.Rollback()
return err
return tx.Exec("UPDATE reference_objects SET is_precommit=?, prev_validation_root=validation_root, prev_thumbnail_hash=thumbnail_hash WHERE allocation_id=? AND is_precommit=? AND deleted_at is NULL", false, a.AllocationID, true).Error
})
if e != nil {
return e
}
tx.Commit()

return deleteFromFileStore(ctx, a.AllocationID)
}

func deleteFromFileStore(ctx context.Context, allocationID string) error {

db := datastore.GetStore().GetDB().Begin()
limitCh := make(chan struct{}, 10)
wg := &sync.WaitGroup{}
var results []Result

err := db.Model(&reference.Ref{}).Unscoped().Select("id", "validation_root", "thumbnail_hash").
Where("allocation_id=? AND is_precommit=? AND type=? AND deleted_at is not NULL", allocationID, true, reference.FILE).
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
return datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
db := datastore.GetStore().GetTransaction(ctx)

for _, res := range results {
var count int64
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", allocationID, res.ValidationRoot).
Count(&count)
err := db.Model(&reference.Ref{}).Unscoped().Select("id", "validation_root", "thumbnail_hash").
Where("allocation_id=? AND is_precommit=? AND type=? AND deleted_at is not NULL", allocationID, true, reference.FILE).
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {

if count != 0 && res.ThumbnailHash == "" {
continue
}
for _, res := range results {
var count int64
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", allocationID, res.ValidationRoot).
Count(&count)

if count != 0 && res.ThumbnailHash == "" {
continue
}

limitCh <- struct{}{}
wg.Add(1)
limitCh <- struct{}{}
wg.Add(1)

go func(res Result, count int64) {
defer func() {
<-limitCh
wg.Done()
}()
go func(res Result, count int64) {
defer func() {
<-limitCh
wg.Done()
}()

if count == 0 {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", res.ValidationRoot))
if count == 0 {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", res.ValidationRoot))
}
}
}

if res.ThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail: %s", err.Error()),
zap.String("thumbnail", res.ThumbnailHash))
if res.ThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail: %s", err.Error()),
zap.String("thumbnail", res.ThumbnailHash))
}
}
}

}(res, count)
}(res, count)

}
return nil
}).Error
}
return nil
}).Error

wg.Wait()
if err != nil && err != gorm.ErrRecordNotFound {
logging.Logger.Error("DeleteFromFileStore", zap.Error(err))
db.Rollback()
return err
}
wg.Wait()
if err != nil && err != gorm.ErrRecordNotFound {
logging.Logger.Error("DeleteFromFileStore", zap.Error(err))
return err
}

err = db.Model(&reference.Ref{}).Unscoped().
Delete(&reference.Ref{},
"allocation_id = ? AND deleted_at IS NOT NULL",
allocationID).Error
if err != nil {
db.Rollback()
return err
}
db.Commit()
return nil
return db.Model(&reference.Ref{}).Unscoped().
Delete(&reference.Ref{},
"allocation_id = ? AND deleted_at IS NOT NULL",
allocationID).Error
})
}

// Note: We are also fetching refPath for srcPath in copy operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (rf *CopyFileChange) DeleteTempFile() error {
func (rf *CopyFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {

totalRefs, err := reference.CountRefs(rf.AllocationID)
totalRefs, err := reference.CountRefs(ctx, rf.AllocationID)
if err != nil {
return nil, err
}
Expand Down
Loading