Skip to content

Commit c506cf6

Browse files
authored
Use WithNewTransaction (#1267)
* use withNew transaction * fix blobberstats
1 parent c9df376 commit c506cf6

File tree

8 files changed

+75
-100
lines changed

8 files changed

+75
-100
lines changed

code/go/0chain.net/blobbercore/challenge/challenge.go

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"time"
1010

1111
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
12-
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
1312
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
1413
"github.com/0chain/blobber/code/go/0chain.net/core/common"
1514
"github.com/0chain/blobber/code/go/0chain.net/core/node"
@@ -166,12 +165,7 @@ func validateOnValidators(ctx context.Context, c *ChallengeEntity) error {
166165
return nil
167166
}
168167

169-
func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, error) {
170-
ctx := datastore.GetStore().CreateTransaction(context.TODO())
171-
defer ctx.Done()
172-
173-
tx := datastore.GetStore().GetTransaction(ctx)
174-
168+
func (c *ChallengeEntity) getCommitTransaction(ctx context.Context) (*transaction.Transaction, error) {
175169
createdTime := common.ToTime(c.CreatedAt)
176170
logging.Logger.Info("[challenge]commit",
177171
zap.Any("challenge_id", c.ChallengeID),
@@ -180,24 +174,14 @@ func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, erro
180174

181175
if time.Since(common.ToTime(c.CreatedAt)) > config.StorageSCConfig.ChallengeCompletionTime {
182176
c.CancelChallenge(ctx, ErrExpiredCCT)
183-
if err := tx.Commit().Error; err != nil {
184-
logging.Logger.Error("[challenge]verify(Commit): ",
185-
zap.Any("challenge_id", c.ChallengeID),
186-
zap.Error(err))
187-
}
188-
return nil, ErrExpiredCCT
177+
return nil, nil
189178
}
190179

191180
txn, err := transaction.NewTransactionEntity()
192181
if err != nil {
193182
logging.Logger.Error("[challenge]createTxn", zap.Error(err))
194183
c.CancelChallenge(ctx, err)
195-
if err := tx.Commit().Error; err != nil {
196-
logging.Logger.Error("[challenge]verify(Commit): ",
197-
zap.Any("challenge_id", c.ChallengeID),
198-
zap.Error(err))
199-
}
200-
return nil, err
184+
return nil, nil
201185
}
202186

203187
sn := &ChallengeResponse{}
@@ -212,12 +196,7 @@ func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, erro
212196
if err != nil {
213197
logging.Logger.Info("Failed submitting challenge to the mining network", zap.String("err:", err.Error()))
214198
c.CancelChallenge(ctx, err)
215-
if err := tx.Commit().Error; err != nil {
216-
logging.Logger.Error("[challenge]verify(Commit): ",
217-
zap.Any("challenge_id", c.ChallengeID),
218-
zap.Error(err))
219-
}
220-
return nil, err
199+
return nil, nil
221200
}
222201

223202
err = UpdateChallengeTimingTxnSubmission(c.ChallengeID, txn.CreationDate)
@@ -229,11 +208,5 @@ func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, erro
229208
zap.Error(err))
230209
}
231210

232-
if err := tx.Commit().Error; err != nil {
233-
logging.Logger.Error("[challenge]verify(Commit): ",
234-
zap.Any("challenge_id", c.ChallengeID),
235-
zap.Error(err))
236-
}
237-
238211
return txn, nil
239212
}

code/go/0chain.net/blobbercore/challenge/protocol.go

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -327,12 +327,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
327327
return nil
328328
}
329329

330-
func (cr *ChallengeEntity) VerifyChallengeTransaction(txn *transaction.Transaction) error {
331-
ctx := datastore.GetStore().CreateTransaction(context.TODO())
332-
defer ctx.Done()
333-
334-
tx := datastore.GetStore().GetTransaction(ctx)
335-
330+
func (cr *ChallengeEntity) VerifyChallengeTransaction(ctx context.Context, txn *transaction.Transaction) error {
336331
if len(cr.LastCommitTxnIDs) > 0 {
337332
for _, lastTxn := range cr.LastCommitTxnIDs {
338333
logging.Logger.Info("[challenge]commit: Verifying the transaction : " + lastTxn)
@@ -373,11 +368,6 @@ func (cr *ChallengeEntity) VerifyChallengeTransaction(txn *transaction.Transacti
373368
err = ErrEntityNotFound
374369
}
375370
_ = cr.Save(ctx)
376-
if commitErr := tx.Commit().Error; commitErr != nil {
377-
logging.Logger.Error("[challenge]verify(Commit): ",
378-
zap.Any("challenge_id", cr.ChallengeID),
379-
zap.Error(commitErr))
380-
}
381371
return err
382372
}
383373
logging.Logger.Info("Success response from BC for challenge response transaction", zap.String("txn", txn.TransactionOutput), zap.String("challenge_id", cr.ChallengeID))
@@ -394,7 +384,6 @@ func IsEntityNotFoundError(err error) bool {
394384
}
395385

396386
func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *transaction.Transaction, toAdd bool) {
397-
tx := datastore.GetStore().GetTransaction(ctx)
398387
cr.Status = Committed
399388
cr.StatusMessage = t.TransactionOutput
400389
cr.CommitTxnID = t.Hash
@@ -417,9 +406,4 @@ func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *transacti
417406
zap.Time("txn_verified", txnVerification),
418407
zap.Error(err))
419408
}
420-
if err := tx.Commit().Error; err != nil {
421-
logging.Logger.Error("[challenge]verify(Commit): ",
422-
zap.Any("challenge_id", cr.ChallengeID),
423-
zap.Error(err))
424-
}
425409
}

code/go/0chain.net/blobbercore/challenge/worker.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
99
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
1010
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
11+
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
1112
"github.com/emirpasic/gods/maps/treemap"
1213
"go.uber.org/zap"
1314
"golang.org/x/sync/semaphore"
@@ -141,7 +142,15 @@ func commitOnChainWorker(ctx context.Context) {
141142

142143
for _, challenge := range challenges {
143144
chall := challenge
144-
txn, _ := chall.getCommitTransaction()
145+
var (
146+
txn *transaction.Transaction
147+
err error
148+
)
149+
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
150+
txn, err = chall.getCommitTransaction(ctx)
151+
return err
152+
})
153+
145154
if txn != nil {
146155
wg.Add(1)
147156
go func(challenge *ChallengeEntity) {
@@ -151,10 +160,13 @@ func commitOnChainWorker(ctx context.Context) {
151160
logging.Logger.Error("verifyChallengeTransaction", zap.Any("err", r))
152161
}
153162
}()
154-
err := challenge.VerifyChallengeTransaction(txn)
155-
if err == nil || err != ErrEntityNotFound {
156-
deleteChallenge(int64(challenge.RoundCreatedAt))
157-
}
163+
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
164+
err := challenge.VerifyChallengeTransaction(ctx, txn)
165+
if err == nil || err != ErrEntityNotFound {
166+
deleteChallenge(int64(challenge.RoundCreatedAt))
167+
}
168+
return nil
169+
})
158170
}(&chall)
159171
}
160172
}

code/go/0chain.net/blobbercore/filestore/state.go

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -51,49 +51,48 @@ func (fs *FileStore) removeAllocation(ID string) {
5151
}
5252

5353
func (fs *FileStore) initMap() error {
54-
ctx, cnCl := context.WithCancel(context.Background())
55-
defer cnCl()
56-
57-
ctx = datastore.GetStore().CreateTransaction(ctx)
58-
db := datastore.GetStore().GetTransaction(ctx)
59-
if db == nil {
60-
return errors.New("could not get db client")
61-
}
54+
err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
55+
db := datastore.GetStore().GetTransaction(ctx)
56+
if db == nil {
57+
return errors.New("could not get db client")
58+
}
6259

63-
limitCh := make(chan struct{}, 50)
64-
wg := &sync.WaitGroup{}
65-
var dbAllocations []*dbAllocation
60+
limitCh := make(chan struct{}, 50)
61+
wg := &sync.WaitGroup{}
62+
var dbAllocations []*dbAllocation
6663

67-
err := db.Model(&dbAllocation{}).FindInBatches(&dbAllocations, 1000, func(tx *gorm.DB, batch int) error {
68-
allocsMap := make(map[string]*allocation)
64+
err := db.Model(&dbAllocation{}).FindInBatches(&dbAllocations, 1000, func(tx *gorm.DB, batch int) error {
65+
allocsMap := make(map[string]*allocation)
6966

70-
for _, dbAlloc := range dbAllocations {
71-
a := allocation{
72-
allocatedSize: uint64(dbAlloc.BlobberSize),
73-
mu: &sync.Mutex{},
74-
tmpMU: &sync.Mutex{},
75-
}
67+
for _, dbAlloc := range dbAllocations {
68+
a := allocation{
69+
allocatedSize: uint64(dbAlloc.BlobberSize),
70+
mu: &sync.Mutex{},
71+
tmpMU: &sync.Mutex{},
72+
}
7673

77-
allocsMap[dbAlloc.ID] = &a
74+
allocsMap[dbAlloc.ID] = &a
7875

79-
err := getStorageDetails(ctx, &a, dbAlloc.ID)
76+
err := getStorageDetails(ctx, &a, dbAlloc.ID)
8077

81-
if err != nil {
82-
return err
83-
}
78+
if err != nil {
79+
return err
80+
}
8481

85-
limitCh <- struct{}{}
86-
wg.Add(1)
87-
go fs.getTemporaryStorageDetails(ctx, &a, dbAlloc.ID, limitCh, wg)
82+
limitCh <- struct{}{}
83+
wg.Add(1)
84+
go fs.getTemporaryStorageDetails(ctx, &a, dbAlloc.ID, limitCh, wg)
8885

89-
}
86+
}
9087

91-
fs.setAllocations(allocsMap)
92-
return nil
93-
}).Error
88+
fs.setAllocations(allocsMap)
89+
return nil
90+
}).Error
9491

95-
wg.Wait()
96-
db.Commit()
92+
wg.Wait()
93+
db.Commit()
94+
return err
95+
})
9796
return err
9897
}
9998

code/go/0chain.net/blobbercore/handler/handler.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -710,12 +710,16 @@ func writeResponse(w http.ResponseWriter, resp []byte) {
710710
// todo wrap with connection
711711
func StatsHandler(w http.ResponseWriter, r *http.Request) {
712712
isJSON := r.Header.Get("Accept") == "application/json"
713-
714713
if isJSON {
714+
var (
715+
blobberStats any
716+
err error
717+
)
715718
blobberInfo := GetBlobberInfoJson()
716-
717-
ctx := datastore.GetStore().CreateTransaction(r.Context())
718-
blobberStats, err := stats.StatsJSONHandler(ctx, r)
719+
err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
720+
blobberStats, err = stats.StatsJSONHandler(ctx, r)
721+
return err
722+
})
719723

720724
if err != nil {
721725
Logger.Error("Error getting blobber JSON stats", zap.Error(err))

code/go/0chain.net/blobbercore/handler/handler_common.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,11 @@ func WithStatusConnection(handler common.StatusCodeResponderF) common.StatusCode
142142
func WithStatusReadOnlyConnection(handler common.StatusCodeResponderF) common.StatusCodeResponderF {
143143
return func(ctx context.Context, r *http.Request) (interface{}, int, error) {
144144
ctx = GetMetaDataStore().CreateTransaction(ctx)
145-
resp, statusCode, err := handler(ctx, r)
145+
tx := GetMetaDataStore().GetTransaction(ctx)
146146
defer func() {
147-
GetMetaDataStore().GetTransaction(ctx).Rollback()
147+
tx.Rollback()
148148
}()
149+
resp, statusCode, err := handler(ctx, r)
149150
return resp, statusCode, err
150151
}
151152
}

code/go/0chain.net/blobbercore/stats/challengestats.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package stats
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
87
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
8+
"github.com/0chain/blobber/code/go/0chain.net/core/common"
99
"gorm.io/datatypes"
1010
)
1111

@@ -28,8 +28,8 @@ type ChallengeEntity struct {
2828
LastCommitTxnIDs []string `json:"last_commit_txn_ids" gorm:"-"`
2929
ObjectPathString datatypes.JSON `json:"-" gorm:"column:object_path"`
3030
ObjectPath *reference.ObjectPath `json:"object_path" gorm:"-"`
31-
CreatedAt time.Time `gorm:"created_at"`
32-
UpdatedAt time.Time `gorm:"updated_at"`
31+
CreatedAt common.Timestamp `gorm:"created_at"`
32+
UpdatedAt common.Timestamp `gorm:"updated_at"`
3333
}
3434

3535
func (ChallengeEntity) TableName() string {

code/go/0chain.net/blobbercore/stats/handler.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -433,11 +433,13 @@ const tpl = `
433433

434434
func StatsHandler(w http.ResponseWriter, r *http.Request) {
435435
t := template.Must(template.New("diagnostics").Funcs(funcMap).Parse(tpl))
436-
ctx := datastore.GetStore().CreateTransaction(r.Context())
437-
ctx = setStatsRequestDataInContext(ctx, r)
438-
db := datastore.GetStore().GetTransaction(ctx)
439-
defer db.Rollback()
440-
bs := LoadBlobberStats(ctx)
436+
ctx := setStatsRequestDataInContext(context.TODO(), r)
437+
ctx = datastore.GetStore().CreateTransaction(ctx)
438+
var bs *BlobberStats
439+
_ = datastore.GetStore().WithTransaction(ctx, func(ctx context.Context) error {
440+
bs = LoadBlobberStats(ctx)
441+
return common.NewError("rollback", "read only")
442+
})
441443

442444
err := t.Execute(w, bs)
443445
if err != nil {

0 commit comments

Comments
 (0)