Skip to content
This repository was archived by the owner on Dec 20, 2024. It is now read-only.

Commit 8cac49e

Browse files
committed
bugfix: notify p2p downloader to pull next piece after reporting
Signed-off-by: lowzj <[email protected]>
1 parent 5ff4530 commit 8cac49e

File tree

4 files changed

+51
-16
lines changed

4 files changed

+51
-16
lines changed

dfget/core/downloader/p2p_downloader/client_stream_writer.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ type ClientStreamWriter struct {
3838
// The downloader will put the piece into this queue after it downloaded a piece successfully.
3939
// And clientWriter will poll values from this queue constantly and write to disk.
4040
clientQueue queue.Queue
41+
42+
// notifyQueue sends a notification when all operation about a piece have
43+
// been completed successfully.
44+
notifyQueue queue.Queue
45+
4146
// finish indicates whether the task written is completed.
4247
finish chan struct{}
4348

@@ -68,11 +73,12 @@ type ClientStreamWriter struct {
6873
}
6974

7075
// NewClientStreamWriter creates and initialize a ClientStreamWriter instance.
71-
func NewClientStreamWriter(clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter {
76+
func NewClientStreamWriter(clientQueue, notifyQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config) *ClientStreamWriter {
7277
pr, pw := io.Pipe()
7378
limitReader := limitreader.NewLimitReader(pr, int64(cfg.LocalLimit), cfg.Md5 != "")
7479
clientWriter := &ClientStreamWriter{
7580
clientQueue: clientQueue,
81+
notifyQueue: notifyQueue,
7682
pipeReader: pr,
7783
pipeWriter: pw,
7884
limitReader: limitReader,
@@ -139,7 +145,7 @@ func (csw *ClientStreamWriter) write(piece *Piece) error {
139145

140146
err := csw.writePieceToPipe(piece)
141147
if err == nil {
142-
go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime))
148+
go sendSuccessPiece(csw.api, csw.cfg.RV.Cid, piece, time.Since(startTime), csw.notifyQueue)
143149
}
144150
return err
145151
}

dfget/core/downloader/p2p_downloader/client_stream_writer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
100100
copy(cases2, cases)
101101

102102
cfg := &config.Config{}
103-
csw := NewClientStreamWriter(nil, nil, cfg)
103+
csw := NewClientStreamWriter(nil, nil, nil, cfg)
104104
go func() {
105105
for _, v := range cases2 {
106106
err := csw.writePieceToPipe(v.piece)

dfget/core/downloader/p2p_downloader/client_writer.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ type ClientWriter struct {
5858
// The downloader will put the piece into this queue after it downloaded a piece successfully.
5959
// And clientWriter will poll values from this queue constantly and write to disk.
6060
clientQueue queue.Queue
61+
62+
// notifyQueue sends a notification when all operation about a piece have
63+
// been completed successfully.
64+
notifyQueue queue.Queue
65+
6166
// finish indicates whether the task written is completed.
6267
finish chan struct{}
6368

@@ -95,9 +100,11 @@ type ClientWriter struct {
95100

96101
// NewClientWriter creates and initialize a ClientWriter instance.
97102
func NewClientWriter(clientFilePath, serviceFilePath string,
98-
clientQueue queue.Queue, api api.SupernodeAPI, cfg *config.Config, cdnSource apiTypes.CdnSource) PieceWriter {
103+
clientQueue, notifyQueue queue.Queue,
104+
api api.SupernodeAPI, cfg *config.Config, cdnSource apiTypes.CdnSource) PieceWriter {
99105
clientWriter := &ClientWriter{
100106
clientQueue: clientQueue,
107+
notifyQueue: notifyQueue,
101108
clientFilePath: clientFilePath,
102109
serviceFilePath: serviceFilePath,
103110
api: api,
@@ -219,7 +226,7 @@ func (cw *ClientWriter) write(piece *Piece) error {
219226
cw.pieceIndex++
220227
err := writePieceToFile(piece, cw.serviceFile, cw.cdnSource)
221228
if err == nil {
222-
go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime))
229+
go sendSuccessPiece(cw.api, cw.cfg.RV.Cid, piece, time.Since(startTime), cw.notifyQueue)
223230
}
224231
return err
225232
}
@@ -247,7 +254,7 @@ func startSyncWriter(q queue.Queue) queue.Queue {
247254
return nil
248255
}
249256

250-
func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.Duration) {
257+
func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.Duration, notifyQueue queue.Queue) {
251258
reportPieceRequest := &types.ReportPieceRequest{
252259
TaskID: piece.TaskID,
253260
Cid: cid,
@@ -265,6 +272,9 @@ func sendSuccessPiece(api api.SupernodeAPI, cid string, piece *Piece, cost time.
265272

266273
_, err := api.ReportPiece(piece.SuperNode, reportPieceRequest)
267274
if err == nil {
275+
if notifyQueue != nil {
276+
notifyQueue.Put("success")
277+
}
268278
if retry > 0 {
269279
logrus.Warnf("success to report piece with request(%+v) after retrying (%d) times", reportPieceRequest, retry)
270280
}

dfget/core/downloader/p2p_downloader/p2p_downloader.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ type P2PDownloader struct {
8484
// And clientWriter will poll values from this queue constantly and write to disk.
8585
clientQueue queue.Queue
8686

87+
// notifyQueue maintains a queue for notifying p2p downloader to pull next download tasks.
88+
notifyQueue queue.Queue
89+
8790
// clientFilePath is the full path of the temp file.
8891
clientFilePath string
8992
// serviceFilePath is the full path of the temp service file which
@@ -150,6 +153,7 @@ func (p2p *P2PDownloader) init() {
150153
p2p.queue.Put(NewPieceSimple(p2p.taskID, p2p.node, constants.TaskStatusStart, p2p.RegisterResult.CDNSource))
151154

152155
p2p.clientQueue = queue.NewQueue(p2p.cfg.ClientQueueSize)
156+
p2p.notifyQueue = queue.NewQueue(p2p.cfg.ClientQueueSize)
153157

154158
p2p.clientFilePath = helper.GetTaskFile(p2p.taskFileName, p2p.cfg.RV.DataDir)
155159
p2p.serviceFilePath = helper.GetServiceFile(p2p.taskFileName, p2p.cfg.RV.DataDir)
@@ -165,7 +169,9 @@ func (p2p *P2PDownloader) Run(ctx context.Context) error {
165169
if p2p.streamMode {
166170
return fmt.Errorf("streamMode enabled, should be disable")
167171
}
168-
clientWriter := NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath, p2p.clientQueue, p2p.API, p2p.cfg, p2p.RegisterResult.CDNSource)
172+
clientWriter := NewClientWriter(p2p.clientFilePath, p2p.serviceFilePath,
173+
p2p.clientQueue, p2p.notifyQueue,
174+
p2p.API, p2p.cfg, p2p.RegisterResult.CDNSource)
169175
return p2p.run(ctx, clientWriter)
170176
}
171177

@@ -174,7 +180,7 @@ func (p2p *P2PDownloader) RunStream(ctx context.Context) (io.Reader, error) {
174180
if !p2p.streamMode {
175181
return nil, fmt.Errorf("streamMode disable, should be enabled")
176182
}
177-
clientStreamWriter := NewClientStreamWriter(p2p.clientQueue, p2p.API, p2p.cfg)
183+
clientStreamWriter := NewClientStreamWriter(p2p.clientQueue, p2p.notifyQueue, p2p.API, p2p.cfg)
178184
go func() {
179185
err := p2p.run(ctx, clientStreamWriter)
180186
if err != nil {
@@ -280,14 +286,10 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
280286
break
281287
}
282288

283-
sleepTime := time.Duration(rand.Intn(p2p.maxTimeout-p2p.minTimeout)+p2p.minTimeout) * time.Millisecond
284-
logrus.Infof("pull piece task(%+v) result:%s and sleep %.3fs", item, res, sleepTime.Seconds())
285-
time.Sleep(sleepTime)
286-
287-
// gradually increase the sleep time, up to [800-1600]
288-
if p2p.minTimeout < 800 {
289-
p2p.minTimeout *= 2
290-
p2p.maxTimeout *= 2
289+
actual, expected := p2p.sleepInterval()
290+
if expected > actual || logrus.IsLevelEnabled(logrus.DebugLevel) {
291+
logrus.Infof("pull piece task(%+v) result:%s and sleep actual:%.3fs expected:%.3fs",
292+
item, res, actual.Seconds(), expected.Seconds())
291293
}
292294
}
293295

@@ -314,6 +316,23 @@ func (p2p *P2PDownloader) pullPieceTask(item *Piece) (
314316
return p2p.pullPieceTask(item)
315317
}
316318

319+
// sleepInterval sleep for a while to wait for next pulling piece task until
320+
// receiving a notification which indicating that all the previous works have
321+
// been completed.
322+
func (p2p *P2PDownloader) sleepInterval() (actual, expected time.Duration) {
323+
expected = time.Duration(rand.Intn(p2p.maxTimeout-p2p.minTimeout)+p2p.minTimeout) * time.Millisecond
324+
start := time.Now()
325+
p2p.notifyQueue.PollTimeout(expected)
326+
actual = time.Now().Sub(start)
327+
328+
// gradually increase the sleep time, up to [800-1600]
329+
if p2p.minTimeout < 800 {
330+
p2p.minTimeout *= 2
331+
p2p.maxTimeout *= 2
332+
}
333+
return actual, expected
334+
}
335+
317336
// getPullRate gets download rate limit dynamically.
318337
func (p2p *P2PDownloader) getPullRate(data *types.PullPieceTaskResponseContinueData) {
319338
if time.Since(p2p.pullRateTime).Seconds() < 3 {

0 commit comments

Comments
 (0)