@@ -14,13 +14,18 @@ use thiserror::Error;
14
14
use tokio:: io:: AsyncWriteExt ;
15
15
16
16
/// Generic receive function
17
+ ///
18
+ /// # Returns
19
+ /// * `Ok(true)` if the transfer should continue
20
+ /// * `Ok(false)` if the transfer should stop
17
21
pub async fn receive_file < R , W > (
18
22
recv : & mut R ,
19
23
file : & mut W ,
20
24
skip : u64 ,
21
25
size : u64 ,
22
26
read_callback : & mut impl FnMut ( u64 ) ,
23
- ) -> std:: io:: Result < ( ) >
27
+ should_continue : & mut impl FnMut ( ) -> bool ,
28
+ ) -> std:: io:: Result < bool >
24
29
where
25
30
R : tokio:: io:: AsyncReadExt + Unpin ,
26
31
W : tokio:: io:: AsyncWriteExt + tokio:: io:: AsyncSeekExt + Unpin ,
31
36
let mut written = skip;
32
37
33
38
while written < size {
39
+ if !should_continue ( ) {
40
+ return Ok ( false ) ;
41
+ }
42
+
34
43
let to_write = std:: cmp:: min ( BUF_SIZE as u64 , size - written) ;
35
44
let n = recv. read_exact ( & mut buf[ ..to_write as usize ] ) . await ?;
36
45
@@ -47,52 +56,70 @@ where
47
56
read_callback ( n as u64 ) ;
48
57
}
49
58
50
- Ok ( ( ) )
59
+ Ok ( true )
51
60
}
52
61
53
- /// Receive a directory
54
- // #[async_recursion]
62
+ /// # Returns
63
+ /// * `Ok(true)` if the transfer should continue
64
+ /// * `Ok(false)` if the transfer should stop
55
65
pub fn receive_directory < S > (
56
66
send : & mut S ,
57
67
root_path : & std:: path:: Path ,
58
68
files : & [ FileSendRecvTree ] ,
59
69
read_callback : & mut impl FnMut ( u64 ) ,
60
- ) -> std:: io:: Result < ( ) >
70
+ should_continue : & mut impl FnMut ( ) -> bool ,
71
+ ) -> std:: io:: Result < bool >
61
72
where
62
73
S : tokio:: io:: AsyncReadExt + Unpin + Send ,
63
74
{
64
75
for file in files {
65
76
match file {
66
77
FileSendRecvTree :: File { name, skip, size } => {
67
78
let path = root_path. join ( name) ;
68
- tokio:: task:: block_in_place ( || {
79
+
80
+ let continues = tokio:: task:: block_in_place ( || {
69
81
let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
70
82
rt. block_on ( async {
71
83
let mut file = tokio:: fs:: OpenOptions :: new ( )
72
84
. write ( true )
73
85
. create ( true )
74
86
. open ( & path)
75
87
. await ?;
76
- receive_file ( send, & mut file, * skip, * size, read_callback) . await ?;
88
+ let continues = receive_file (
89
+ send,
90
+ & mut file,
91
+ * skip,
92
+ * size,
93
+ read_callback,
94
+ should_continue,
95
+ )
96
+ . await ?;
77
97
78
98
file. sync_all ( ) . await ?;
79
99
file. shutdown ( ) . await ?;
80
- Ok :: < ( ) , std:: io:: Error > ( ( ) )
100
+ Ok :: < bool , std:: io:: Error > ( continues )
81
101
} )
82
102
} ) ?;
103
+
104
+ if !continues {
105
+ return Ok ( false ) ;
106
+ }
83
107
}
84
108
FileSendRecvTree :: Dir { name, files } => {
85
109
let root_path = root_path. join ( name) ;
86
110
87
111
if !root_path. exists ( ) {
88
112
std:: fs:: create_dir ( & root_path) ?;
89
113
}
90
- receive_directory ( send, & root_path, files, read_callback) ?;
114
+
115
+ if !receive_directory ( send, & root_path, files, read_callback, should_continue) ? {
116
+ return Ok ( false ) ;
117
+ }
91
118
}
92
119
}
93
120
}
94
121
95
- Ok ( ( ) )
122
+ Ok ( true )
96
123
}
97
124
98
125
#[ derive( Debug , Error ) ]
@@ -181,12 +208,18 @@ impl Receiver {
181
208
/// * `initial_progress_callback` - Callback with the initial progress of each file to send (name, current, total)
182
209
/// * `accept_files_callback` - Callback to accept or reject the files (Some(path) to accept, None to reject)
183
210
/// * `read_callback` - Callback every time data is written to disk
211
+ /// * `should_continue` - Callback to check if the transfer should continue
212
+ ///
213
+ /// # Returns
214
+ /// * `Ok(true)` if the transfer was finished successfully
215
+ /// * `Ok(false)` if the transfer was stopped
184
216
pub async fn receive_files (
185
217
& mut self ,
186
218
mut initial_progress_callback : impl FnMut ( & [ ( String , u64 , u64 ) ] ) ,
187
219
mut accept_files_callback : impl FnMut ( & [ FilesAvailable ] ) -> Option < PathBuf > ,
188
220
read_callback : & mut impl FnMut ( u64 ) ,
189
- ) -> Result < ( ) , ReceiveError > {
221
+ should_continue : & mut impl FnMut ( ) -> bool ,
222
+ ) -> Result < bool , ReceiveError > {
190
223
match receive_packet :: < SenderToReceiver > ( & self . conn ) . await ? {
191
224
SenderToReceiver :: ConnRequest { version_num } => {
192
225
if version_num != QS_VERSION {
@@ -282,6 +315,8 @@ impl Receiver {
282
315
let recv = self . conn . accept_uni ( ) . await ?;
283
316
let mut recv = GzipDecoder :: new ( tokio:: io:: BufReader :: with_capacity ( BUF_SIZE , recv) ) ;
284
317
318
+ let mut interrupted = false ;
319
+
285
320
for file in to_receive. into_iter ( ) . flatten ( ) {
286
321
match file {
287
322
FileSendRecvTree :: File { name, skip, size } => {
@@ -292,9 +327,21 @@ impl Receiver {
292
327
. open ( & path)
293
328
. await ?;
294
329
295
- receive_file ( & mut recv, & mut file, skip, size, read_callback) . await ?;
330
+ interrupted = !receive_file (
331
+ & mut recv,
332
+ & mut file,
333
+ skip,
334
+ size,
335
+ read_callback,
336
+ should_continue,
337
+ )
338
+ . await ?;
296
339
file. sync_all ( ) . await ?;
297
340
file. shutdown ( ) . await ?;
341
+
342
+ if interrupted {
343
+ break ;
344
+ }
298
345
}
299
346
FileSendRecvTree :: Dir { name, files } => {
300
347
let path = output_path. join ( name) ;
@@ -303,12 +350,21 @@ impl Receiver {
303
350
std:: fs:: create_dir ( & path) ?;
304
351
}
305
352
306
- receive_directory ( & mut recv, & path, & files, read_callback) ?;
353
+ if !receive_directory ( & mut recv, & path, & files, read_callback, should_continue) ?
354
+ {
355
+ interrupted = true ;
356
+ break ;
357
+ }
307
358
}
308
359
}
309
360
}
310
361
311
362
self . close ( ) . await ;
312
- Ok ( ( ) )
363
+
364
+ if interrupted {
365
+ tracing:: info!( "transfer interrupted" ) ;
366
+ }
367
+
368
+ Ok ( !interrupted)
313
369
}
314
370
}
0 commit comments