@@ -920,61 +920,51 @@ impl FileXferTask {
920
920
logger : Logger ,
921
921
guard : AliveGuard ,
922
922
) {
923
- let filename_len = self . file . subpath ( ) . name ( ) . len ( ) ;
924
-
925
- if filename_len + MAX_FILE_SUFFIX_LEN > MAX_FILENAME_LENGTH {
926
- events. failed ( Error :: FilenameTooLong ) . await ;
927
- return ;
928
- }
929
-
930
- let emit_checksum_events = {
931
- if let Some ( threshold) = state. config . checksum_events_size_threshold {
932
- self . file . size ( ) >= threshold as u64
933
- } else {
934
- false
923
+ let task = async {
924
+ // Check file and dir names are shorter then MAX
925
+ for name in self . file . subpath ( ) . iter ( ) {
926
+ if name. len ( ) + MAX_FILE_SUFFIX_LEN > MAX_FILENAME_LENGTH {
927
+ return Err ( Error :: FilenameTooLong ) ;
928
+ }
935
929
}
936
- } ;
937
-
938
- events. preflight ( ) . await ;
939
930
940
- let tmp_location: Hidden < PathBuf > = Hidden (
941
- self . base_dir
942
- . join ( temp_file_name ( self . xfer . id ( ) , self . file . id ( ) ) ) ,
943
- ) ;
931
+ let emit_checksum_events = {
932
+ if let Some ( threshold) = state. config . checksum_events_size_threshold {
933
+ self . file . size ( ) >= threshold as u64
934
+ } else {
935
+ false
936
+ }
937
+ } ;
944
938
945
- let tmp_file_state = self
946
- . handle_tmp_file ( & logger, & events, & tmp_location, emit_checksum_events)
947
- . await ;
939
+ events. preflight ( ) . await ;
948
940
949
- let init_res = match downloader. init ( & self , tmp_file_state) . await {
950
- Ok ( init) => init,
951
- Err ( crate :: Error :: Canceled ) => {
952
- warn ! ( logger, "File cancelled on download init stage" ) ;
953
- return ;
954
- }
955
- Err ( err) => {
956
- events. failed ( err) . await ;
957
- return ;
958
- }
959
- } ;
941
+ let tmp_location: Hidden < PathBuf > = Hidden (
942
+ self . base_dir
943
+ . join ( temp_file_name ( self . xfer . id ( ) , self . file . id ( ) ) ) ,
944
+ ) ;
960
945
961
- match init_res {
962
- handler:: DownloadInit :: Stream { offset } => {
963
- if req_send
964
- . send ( ServerReq :: Start {
965
- file : self . file . id ( ) . clone ( ) ,
966
- offset,
967
- } )
968
- . is_err ( )
969
- {
970
- debug ! ( logger, "Client is disconnected. Stopping file stream" ) ;
971
- return ;
972
- }
946
+ let tmp_file_state = self
947
+ . handle_tmp_file ( & logger, & events, & tmp_location, emit_checksum_events)
948
+ . await ;
949
+
950
+ let init_res = downloader. init ( & self , tmp_file_state) . await ?;
951
+
952
+ match init_res {
953
+ handler:: DownloadInit :: Stream { offset } => {
954
+ if req_send
955
+ . send ( ServerReq :: Start {
956
+ file : self . file . id ( ) . clone ( ) ,
957
+ offset,
958
+ } )
959
+ . is_err ( )
960
+ {
961
+ debug ! ( logger, "Client is disconnected. Stopping file stream" ) ;
962
+ return Err ( Error :: Canceled ) ;
963
+ }
973
964
974
- events. start ( self . base_dir . to_string_lossy ( ) , offset) . await ;
965
+ events. start ( self . base_dir . to_string_lossy ( ) , offset) . await ;
975
966
976
- let result = self
977
- . stream_file (
967
+ self . stream_file (
978
968
StreamCtx {
979
969
logger : & logger,
980
970
state : & state,
@@ -986,70 +976,73 @@ impl FileXferTask {
986
976
offset,
987
977
emit_checksum_events,
988
978
)
989
- . await ;
979
+ . await
980
+ }
981
+ }
982
+ } ;
990
983
991
- // This is a critical part that we need to execute atomically.
992
- // Since the outter task can be aborted, let's move it to a separate task
993
- // so that it's never interrupted.
994
- let error_logger = logger. clone ( ) ;
995
- if let Err ( e) = tokio:: spawn ( async move {
996
- let _guard = guard;
984
+ let result = task. await ;
997
985
998
- match result {
999
- Err ( crate :: Error :: Canceled ) => {
1000
- info ! ( logger, "File {} stopped" , self . file. id( ) )
1001
- }
1002
- Ok ( dst_location) => {
1003
- info ! ( logger, "File {} downloaded succesfully" , self . file. id( ) ) ;
1004
-
1005
- if let Err ( err) = state
1006
- . transfer_manager
1007
- . incoming_finish_post ( self . xfer . id ( ) , self . file . id ( ) , true )
1008
- . await
1009
- {
1010
- warn ! ( logger, "Failed to post finish: {err}" ) ;
1011
- }
1012
-
1013
- if let Err ( e) = req_send. send ( ServerReq :: Done {
1014
- file : self . file . id ( ) . clone ( ) ,
1015
- } ) {
1016
- warn ! ( logger, "Failed to send DONE message: {}" , e) ;
1017
- } ;
1018
-
1019
- events. success ( dst_location) . await ;
1020
- }
1021
- Err ( err) => {
1022
- info ! (
1023
- logger,
1024
- "File {} finished with error: {err:?}" ,
1025
- self . file. id( )
1026
- ) ;
1027
-
1028
- if let Err ( err) = state
1029
- . transfer_manager
1030
- . incoming_finish_post ( self . xfer . id ( ) , self . file . id ( ) , false )
1031
- . await
1032
- {
1033
- warn ! ( logger, "Failed to post finish: {err}" ) ;
1034
- }
1035
-
1036
- if let Err ( e) = req_send. send ( ServerReq :: Fail {
1037
- file : self . file . id ( ) . clone ( ) ,
1038
- msg : err. to_string ( ) ,
1039
- } ) {
1040
- warn ! ( logger, "Failed to send FAIL message: {}" , e) ;
1041
- } ;
1042
-
1043
- events. failed ( err) . await ;
1044
- }
986
+ // This is a critical part that we need to execute atomically.
987
+ // Since the outter task can be aborted, let's move it to a separate task
988
+ // so that it's never interrupted.
989
+ let error_logger = logger. clone ( ) ;
990
+ if let Err ( e) = tokio:: spawn ( async move {
991
+ let _guard = guard;
992
+
993
+ match result {
994
+ Err ( crate :: Error :: Canceled ) => {
995
+ info ! ( logger, "File {} stopped" , self . file. id( ) )
996
+ }
997
+ Ok ( dst_location) => {
998
+ info ! ( logger, "File {} downloaded succesfully" , self . file. id( ) ) ;
999
+
1000
+ if let Err ( err) = state
1001
+ . transfer_manager
1002
+ . incoming_finish_post ( self . xfer . id ( ) , self . file . id ( ) , true )
1003
+ . await
1004
+ {
1005
+ warn ! ( logger, "Failed to post finish: {err}" ) ;
1045
1006
}
1046
- } )
1047
- . await
1048
- {
1049
- error ! ( error_logger, "Failed to spawn file xfer task: {:?}" , e) ;
1050
- } ;
1007
+
1008
+ if let Err ( e) = req_send. send ( ServerReq :: Done {
1009
+ file : self . file . id ( ) . clone ( ) ,
1010
+ } ) {
1011
+ warn ! ( logger, "Failed to send DONE message: {}" , e) ;
1012
+ } ;
1013
+
1014
+ events. success ( dst_location) . await ;
1015
+ }
1016
+ Err ( err) => {
1017
+ info ! (
1018
+ logger,
1019
+ "File {} finished with error: {err:?}" ,
1020
+ self . file. id( )
1021
+ ) ;
1022
+
1023
+ if let Err ( err) = state
1024
+ . transfer_manager
1025
+ . incoming_finish_post ( self . xfer . id ( ) , self . file . id ( ) , false )
1026
+ . await
1027
+ {
1028
+ warn ! ( logger, "Failed to post finish: {err}" ) ;
1029
+ }
1030
+
1031
+ if let Err ( e) = req_send. send ( ServerReq :: Fail {
1032
+ file : self . file . id ( ) . clone ( ) ,
1033
+ msg : err. to_string ( ) ,
1034
+ } ) {
1035
+ warn ! ( logger, "Failed to send FAIL message: {}" , e) ;
1036
+ } ;
1037
+
1038
+ events. failed ( err) . await ;
1039
+ }
1051
1040
}
1052
- }
1041
+ } )
1042
+ . await
1043
+ {
1044
+ error ! ( error_logger, "Failed to spawn file xfer task: {:?}" , e) ;
1045
+ } ;
1053
1046
}
1054
1047
}
1055
1048
0 commit comments