Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File transfer example #10

Open
rukai opened this issue Nov 6, 2023 · 7 comments · Fixed by #14 or #30
Open

File transfer example #10

rukai opened this issue Nov 6, 2023 · 7 comments · Fixed by #14 or #30

Comments

@rukai
Copy link

rukai commented Nov 6, 2023

It would be nice to have an example for how to best transfer a file to/from the remote machine.
e.g. a function like push_file(source_path: &str, dest_path: &str) that pushes from the local machine to the remote machine.

Doing this efficiently would be complicated since I imagine we would want to read new bytes from the source while sending to the destination?
Additionally this is a very common use case for sftp, so I think making an example of this would bring a lot of value.

@rukai
Copy link
Author

rukai commented Nov 6, 2023

I'm trying to replace a system where I pipe stdout to/from dd over ssh to transfer files.
It takes about 2-3s to transfer a 1MB file that way.

Trying a naive implementation of

    pub async fn pull_file(&self, source: &Path, dest: &Path) {
        let task = format!("pulling file from {}:{source:?} to {dest:?}", self.address);
        tracing::info!("{task}");

        let mut channel = self.session.channel_open_session().await.unwrap();
        channel.request_subsystem(true, "sftp").await.unwrap();
        let sftp = SftpSession::new(channel.into_stream()).await.unwrap();
        let mut file = sftp.open(source.to_str().unwrap()).await.unwrap();

        let mut dest = File::create(dest)
            .await
            .map_err(|e| anyhow!(e).context(format!("Failed to read from {source:?}")))
            .unwrap();
        let mut read = vec![];
        file.read_to_end(&mut read)
            .await
            .unwrap_or_else(|e| panic!("{task} failed to read from local disk with {e:?}"));
        tokio::io::AsyncWriteExt::write_all(&mut dest, &read)
            .await
            .unwrap_or_else(|e| panic!("{task} failed to write to remote server with {e:?}"));
    }

or

    pub async fn pull_file(&self, source: &Path, dest: &Path) {
        let task = format!("pulling file from {}:{source:?} to {dest:?}", self.address);
        tracing::info!("{task}");

        let mut channel = self.session.channel_open_session().await.unwrap();
        channel.request_subsystem(true, "sftp").await.unwrap();
        let sftp = SftpSession::new(channel.into_stream()).await.unwrap();
        let mut file = sftp.open(source.to_str().unwrap()).await.unwrap();

        let mut dest = File::create(dest)
            .await
            .map_err(|e| anyhow!(e).context(format!("Failed to read from {source:?}")))
            .unwrap();

        let mut bytes = vec![0u8; 1024 * 1024];
        loop {
            let read_count = file
                .read(&mut bytes[..])
                .await
                .unwrap_or_else(|e| panic!("{task} failed to read from local disk with {e:?}"));
            if read_count == 0 {
                break;
            }
            tokio::io::AsyncWriteExt::write_all(&mut dest, &bytes[0..read_count])
                .await
                .unwrap_or_else(|e| panic!("{task} failed to write to remote server with {e:?}"));
        }
    }

is slower than my original dd approach, taking about 5-10 seconds. I tried a 4KB buffer instead of a 1MB buffer and that took 60s.

@UVUUUVUU
Copy link

Hi! Have you tried transferring multiple files? The struct SftpSession doesn't implement clone trait. how can we do this?

let sftp = sftp.clone();
let handle = task::spawn(async move {
    download_file(sftp, remote_path, local_path).await;
}

@AspectUnk
Copy link
Owner

AspectUnk commented Nov 13, 2023

@rukai I will try to test the speed as soon as possible and provide an example. In addition, I think we need to add a native implementation for high-level.

@UVUUUVUU At the moment this is a bottleneck, because internally a mutex is used to process requests, which prevents multitasking from working well. As a temporary solution, you can use Arc with Mutex. This will be fixed soon

let session = SftpSession::new(channel.into_stream()).await?;
let arc_session = Arc::new(Mutex::new(session));
let sftp = arc_session.clone();
let handle = task::spawn(async move {
    download_file(sftp, remote_path, local_path).await;
});

@UVUUUVUU
Copy link

Ok, thanks for your answer!Looking forward for your future work.

@UVUUUVUU
Copy link

@UVUUUVUU目前这是一个瓶颈,因为内部使用互斥体来处理请求,这会阻止多任务处理正常工作。作为临时解决方案,您可以使用Arcwith Mutex。这个问题很快就会修复

let session = SftpSession::new(channel.into_stream()).await?;
let arc_session = Arc::new(Mutex::new(session));
let sftp = arc_session.clone();
let handle = task::spawn(async move {
    download_file(sftp, remote_path, local_path).await;
});

how do you want to solve this? could you give some suggestion?

@AspectUnk
Copy link
Owner

AspectUnk commented Nov 15, 2023

how do you want to solve this? could you give some suggestion?

I think this problem can be solved using a concurrent hashmap because this avoids blocking. the main task is to create a non-blocking list of request/response. flurry may solve this problem, i like their implementation.

Oh, by the way, we can just use broadcast, each consumer will only receive his own request.

@AspectUnk
Copy link
Owner

AspectUnk commented May 13, 2024

The optimization problem should now be resolved with PR #30. Published as 2.0.0 🚀
329891837-e3b2493c-d259-4b10-b283-e8a1f44c8784
Now we have benche and "example".

I'm trying to replace a system where I pipe stdout to/from dd over ssh to transfer files. It takes about 2-3s to transfer a 1MB file that way.

@rukai It would be nice to get feedback from you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants