Skip to content

Commit

Permalink
Sprint july 2 (#1177)
Browse files Browse the repository at this point in the history
* Add path in thumbnail hash (#1098)

* get path from changes

* pass rootRef in apply changes

* add logs

* fix deletechange

* fix multi op and copy

* fix delete root dir

* fix delete root change

* fix lint issue

* cleanup and stats change

* rename func to processMove

* calculate object tree in ref path

* rmv return from deleteChange

* revert changes

* move to filestore changes

* check prevRoot

* add logs and condition

* check filestore

* requested changes

* validate nil proof

* add move and copy dir tests

* add logs

* use path to add child

* fix error log

* fix add child

* add proof log

* add more logs

* change read log

* log objectPath

* add commit logs

* commit thumbnail first

* add missing param

* add missing param

* add path in the thumbnail hash

* add allocID and validation root index

* use write

* add condn in count query

* rmv thumbnail_filename

* log wm

* cleanup

* add rollback wm check

* allow empty allocation root

* fix tests

* check path in upload

* fix check

* empty commit

---------

Co-authored-by: Kishan Dhakan <[email protected]>
Co-authored-by: Yury <[email protected]>

* Dep/update (#1153)

* dependencies update

* dependencies update

* Remove fileID from fileMetaHash (#1114)

* rmv fileID from hash calc

* Trigger Build

* adding false commit to restart systemtests

* add path in fileMetaHash

* empty commit

---------

Co-authored-by: Yury <[email protected]>
Co-authored-by: shahnawaz-creator <[email protected]>
Co-authored-by: boddumanohar <[email protected]>
Co-authored-by: Kishan Dhakan <[email protected]>

* optimize image (#1148)

Co-authored-by: boddumanohar <[email protected]>

* once for logger init

* Update challenge timing submission (#1140)

* update challenge timing submission

* fix createdAt in challenge timing table

---------

Co-authored-by: Yury <[email protected]>

* Fix blobber size (#1163)

* Do not redeem readmarkers for free reads (#1166)

* do not redeem readmarkers for free reads

* fix unit tests, remove readmarker handling from download method

* once for logger init (#1156)

* once for logger init

* init logging the same way we do in 0chain

* init logging the same way we do in 0chain

* Hotfix/map concurrent write (#1158)

* once for logger init

* wider locking window

* Hotfix/remove custom nonce (#1141)

* removed custom nonce managing logic

* fixed logging

* updated gosdk

* uncommented previous nonce logic

* uncommented previous nonce logic

* uncommented previous nonce logic

* uncommented previous nonce logic

* Use single file to avoid maintaining multiple version of same file (#1160)

* Use single file to avoid maintaining multiple version of same file

* Fix config path

* Remove variable

* Update path variable for blobber service

---------

Co-authored-by: Yury <[email protected]>

* fix consume quota lock (#1173)

* Fix rm and wm timestamp (#1162)

* once for logger init

* init logging the same way we do in 0chain

* init logging the same way we do in 0chain

* add lastChallengeTime log

* empty commit

* refactor timestamp check in markers

* Add diff txn for move to filestore

* return if duplicate

---------

Co-authored-by: dabasov <[email protected]>

* fixed gitactions fix issue

* remove printing private key in logs (#1161)

Co-authored-by: Yury <[email protected]>

* Fix concurrent upload issue (#1174)

* Fix concurrent upload issue

* Fix concurrent upload issue

* Fix/concurrent upload issue (#1175)

* Fix concurrent upload issue

* Fix concurrent upload issue

* Fix concurrent upload issue

* Fix concurrent upload issue

---------

Co-authored-by: Hitenjain14 <[email protected]>
Co-authored-by: Yury <[email protected]>
Co-authored-by: shahnawaz-creator <[email protected]>
Co-authored-by: boddumanohar <[email protected]>
Co-authored-by: Manali-Jain-squareops <[email protected]>
Co-authored-by: Jayash Satolia <[email protected]>
Co-authored-by: stewartie4 <[email protected]>
Co-authored-by: Dinmukhammed <[email protected]>
Co-authored-by: Laxmi Prasad Oli <[email protected]>
Co-authored-by: root <[email protected]>
  • Loading branch information
11 people authored Jul 16, 2023
1 parent a392a39 commit 54b3c2a
Show file tree
Hide file tree
Showing 38 changed files with 530 additions and 762 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-for-conductor-testing.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: "Build Docker Image for conductor testing"

concurrency:
group: "publish-${{ github.ref }}"
group: "conductor-${{ github.ref }}"
cancel-in-progress: true

on:
Expand Down
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobber/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func setupLogging() {
logging.InitLogging("production", logDir, "0chainBlobber.log")
}

zcncore.SetLogFile(logDir+"/0chainBlobber.log", false)
zcncore.SetLogFile(logDir+"/0chainBlobber-sdk.log", false)
zcncore.SetLogLevel(3)
fmt.Print(" [OK]\n")
}
3 changes: 1 addition & 2 deletions code/go/0chain.net/blobber/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ func setupNode() error {
}
}

fmt.Println("*== Validator Wallet Info ==*")
fmt.Println("*== Blobber Wallet Info ==*")
fmt.Println(" ID: ", node.Self.ID)
fmt.Println(" Public Key: ", publicKey)
fmt.Println(" Private Key: ", privateKey)
fmt.Println("*===========================*")

logging.Logger.Info(" Base URL" + node.Self.GetURLBase())
Expand Down
40 changes: 25 additions & 15 deletions code/go/0chain.net/blobbercore/allocation/allocationchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,25 +250,27 @@ type Result struct {
}

// TODO: Need to speed up this function
func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) error {
func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) (err error) {

logging.Logger.Info("Move to filestore", zap.String("allocation_id", a.AllocationID))

tx := datastore.GetStore().GetTransaction(ctx)
tx := datastore.GetStore().GetDB().Begin()

var refs []*Result
limitCh := make(chan struct{}, 10)
wg := &sync.WaitGroup{}

err := tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE).
err = tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE).
FindInBatches(&refs, 50, func(tx *gorm.DB, batch int) error {

for _, ref := range refs {

var count int64
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=? AND type=?", a.AllocationID, ref.PrevValidationRoot, reference.FILE).
Count(&count)
if ref.PrevValidationRoot != "" {
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", a.AllocationID, ref.PrevValidationRoot).
Count(&count)
}

limitCh <- struct{}{}
wg.Add(1)
Expand All @@ -279,13 +281,11 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) error {
wg.Done()
}()

if count == 0 {
if ref.PrevValidationRoot != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}
if count == 0 && ref.PrevValidationRoot != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot)
Expand Down Expand Up @@ -319,20 +319,23 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) error {

if err != nil {
logging.Logger.Error("Error while moving to filestore", zap.Error(err))
tx.Rollback()
return err
}

err = tx.Exec("UPDATE reference_objects SET is_precommit=?, prev_validation_root=validation_root, prev_thumbnail_hash=thumbnail_hash WHERE allocation_id=? AND is_precommit=? AND deleted_at is NULL", false, a.AllocationID, true).Error

if err != nil {
tx.Rollback()
return err
}
tx.Commit()
return deleteFromFileStore(ctx, a.AllocationID)
}

func deleteFromFileStore(ctx context.Context, allocationID string) error {

db := datastore.GetStore().GetTransaction(ctx)
db := datastore.GetStore().GetDB().Begin()
limitCh := make(chan struct{}, 10)
wg := &sync.WaitGroup{}
var results []Result
Expand Down Expand Up @@ -385,13 +388,20 @@ func deleteFromFileStore(ctx context.Context, allocationID string) error {
wg.Wait()
if err != nil && err != gorm.ErrRecordNotFound {
logging.Logger.Error("DeleteFromFileStore", zap.Error(err))
db.Rollback()
return err
}

return db.Model(&reference.Ref{}).Unscoped().
err = db.Model(&reference.Ref{}).Unscoped().
Delete(&reference.Ref{},
"allocation_id = ? AND deleted_at IS NOT NULL",
allocationID).Error
if err != nil {
db.Rollback()
return err
}
db.Commit()
return nil
}

// Note: We are also fetching refPath for srcPath in copy operation
Expand Down
12 changes: 12 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ func (a *Allocation) GetRequiredWriteBalance(blobberID string, writeSize int64,
return
}

// IsReadFree Determine if read price is 0
func (a *Allocation) IsReadFree(blobberID string) bool {
for _, d := range a.Terms {
if d.BlobberID == blobberID {
if d.ReadPrice == 0 {
return true
}
}
}
return false
}

type Pending struct {
// ID of format client_id:allocation_id
ID string `gorm:"column:id;primaryKey"`
Expand Down
21 changes: 10 additions & 11 deletions code/go/0chain.net/blobbercore/allocation/file_changer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,6 @@ func (fc *BaseFileChanger) DeleteTempFile() error {

func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error {

fileInputData := &filestore.FileInputData{}
fileInputData.Name = fc.Filename
fileInputData.Path = fc.Path
fileInputData.ValidationRoot = fc.ValidationRoot
fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot
fileInputData.ChunkSize = fc.ChunkSize
_, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData)
if err != nil {
return common.NewError("file_store_error", "Error committing to file store. "+err.Error())
}
if fc.ThumbnailSize > 0 {
fileInputData := &filestore.FileInputData{}
fileInputData.Name = fc.ThumbnailFilename
Expand All @@ -96,6 +86,15 @@ func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error {
return common.NewError("file_store_error", "Error committing thumbnail to file store. "+err.Error())
}
}

fileInputData := &filestore.FileInputData{}
fileInputData.Name = fc.Filename
fileInputData.Path = fc.Path
fileInputData.ValidationRoot = fc.ValidationRoot
fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot
fileInputData.ChunkSize = fc.ChunkSize
_, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData)
if err != nil {
return common.NewError("file_store_error", "Error committing to file store. "+err.Error())
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func (nf *UpdateFileChanger) ApplyChange(ctx context.Context, rootRef *reference
fileRef.EncryptedKey = nf.EncryptedKey
fileRef.ChunkSize = nf.ChunkSize
fileRef.IsPrecommit = true
fileRef.ThumbnailFilename = nf.ThumbnailFilename

return rootRef, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference
UpdatedAt: ts,
HashToBeComputed: true,
IsPrecommit: true,
ThumbnailFilename: nf.ThumbnailFilename,
}

fileID, ok := fileIDMeta[newFile.Path]
Expand Down
17 changes: 4 additions & 13 deletions code/go/0chain.net/blobbercore/allocation/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
Expand Down Expand Up @@ -103,8 +104,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
}
foundBlobber = true
a.AllocationRoot = ""
a.BlobberSize = (sa.Size + int64(len(sa.BlobberDetails)-1)) /
int64(len(sa.BlobberDetails))
a.BlobberSize = int64(math.Ceil(float64(sa.Size) / float64(sa.DataShards)))
a.BlobberSizeUsed = 0
break
}
Expand Down Expand Up @@ -154,23 +154,14 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc

logging.Logger.Info("Saving the allocation to DB")

if isExist {
err = tx.Save(a).Error
} else {
err = tx.Create(a).Error
}

err = tx.Save(a).Error
if err != nil {
return nil, err
}

// save/update related terms
for _, t := range a.Terms {
if isExist {
err = tx.Save(t).Error
} else {
err = tx.Create(t).Error
}
err = tx.Save(t).Error
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions code/go/0chain.net/blobbercore/allocation/zcn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/core/node"
"github.com/0chain/errors"
"gorm.io/gorm"
"math"
)

// SyncAllocation try to pull allocation from blockchain, and insert it in db.
Expand All @@ -24,8 +25,7 @@ func SyncAllocation(allocationId string) (*Allocation, error) {
belongToThisBlobber = true

alloc.AllocationRoot = ""
alloc.BlobberSize = (sa.Size + int64(len(sa.BlobberDetails)-1)) /
int64(len(sa.BlobberDetails))
alloc.BlobberSize = int64(math.Ceil(float64(sa.Size) / float64(sa.DataShards)))
alloc.BlobberSizeUsed = 0

break
Expand Down Expand Up @@ -62,12 +62,12 @@ func SyncAllocation(allocationId string) (*Allocation, error) {
}

err = datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error {
if err := tx.Table(TableNameAllocation).Create(alloc).Error; err != nil {
if err := tx.Table(TableNameAllocation).Save(alloc).Error; err != nil {
return err
}

for _, term := range terms {
if err := tx.Table(TableNameTerms).Create(term).Error; err != nil {
if err := tx.Table(TableNameTerms).Save(term).Error; err != nil {
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion code/go/0chain.net/blobbercore/challenge/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func syncOpenChallenges(ctx context.Context) {
if lastChallengeTimestamp > 0 {
params["from"] = strconv.Itoa(lastChallengeTimestamp)
}
logging.Logger.Info("[challenge]sync:pull", zap.Any("params", params))
start := time.Now()

var downloadElapsed, jsonElapsed time.Duration
Expand Down Expand Up @@ -118,6 +119,8 @@ func validateOnValidators(c *ChallengeEntity) {
logging.Logger.Error("[challengetiming]add: ",
zap.String("challenge_id", c.ChallengeID),
zap.Error(err))
deleteChallenge(int64(c.CreatedAt))
tx.Rollback()
}

createdTime := common.ToTime(c.CreatedAt)
Expand Down Expand Up @@ -246,5 +249,4 @@ func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, erro
}

return txn, nil

}
43 changes: 26 additions & 17 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/core/util"
sdkUtil "github.com/0chain/gosdk/core/util"
"github.com/remeh/sizedwaitgroup"
"gorm.io/gorm"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -96,35 +97,40 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
}

rootRef, err := reference.GetReference(ctx, cr.AllocationID, "/")
if err != nil {
if err != nil && err != gorm.ErrRecordNotFound {
allocMu.RUnlock()
cr.CancelChallenge(ctx, err)
return err
}

blockNum := int64(0)
if rootRef.NumBlocks > 0 {
r := rand.New(rand.NewSource(cr.RandomNumber))
blockNum = r.Int63n(rootRef.NumBlocks)
blockNum++
cr.BlockNum = blockNum
}
var objectPath *reference.ObjectPath
if rootRef != nil {
if rootRef.NumBlocks > 0 {
r := rand.New(rand.NewSource(cr.RandomNumber))
blockNum = r.Int63n(rootRef.NumBlocks)
blockNum++
cr.BlockNum = blockNum
}

logging.Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))
objectPath, err := reference.GetObjectPath(ctx, cr.AllocationID, blockNum)
if err != nil {
allocMu.RUnlock()
cr.CancelChallenge(ctx, err)
return err
}
logging.Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))
objectPath, err = reference.GetObjectPath(ctx, cr.AllocationID, blockNum)
if err != nil {
allocMu.RUnlock()
cr.CancelChallenge(ctx, err)
return err
}

cr.RefID = objectPath.RefID
cr.RefID = objectPath.RefID
cr.ObjectPath = objectPath
}
cr.RespondedAllocationRoot = allocationObj.AllocationRoot
cr.ObjectPath = objectPath

postData := make(map[string]interface{})
postData["challenge_id"] = cr.ChallengeID
postData["object_path"] = objectPath
if objectPath != nil {
postData["object_path"] = objectPath
}
markersArray := make([]map[string]interface{}, 0)
for _, wm := range wms {
markersMap := make(map[string]interface{})
Expand Down Expand Up @@ -188,6 +194,9 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
postData["challenge_proof"] = challengeResponse
}

if objectPath == nil {
objectPath = &reference.ObjectPath{}
}
err = UpdateChallengeTimingProofGenerationAndFileSize(
cr.ChallengeID,
proofGenTime,
Expand Down
Loading

0 comments on commit 54b3c2a

Please sign in to comment.