diff --git a/pkg/fs/ftp.go b/pkg/fs/ftp.go index 7a75486..0084c64 100644 --- a/pkg/fs/ftp.go +++ b/pkg/fs/ftp.go @@ -4,11 +4,13 @@ package fs import ( "bytes" "context" + "errors" "fmt" "io" "io/fs" "math" "net/netip" + "net/textproto" "os" "path/filepath" "runtime" @@ -74,7 +76,12 @@ type info struct { // rr and of is used when reading data from a remote file rr *ftp.Response - of uint64 + + // Current offset for read operations. + rof uint64 + + // Current offset for write operations. + wof uint64 // The writer is the writer side of a io.Pipe() used when writing data to a remote file. writer io.WriteCloser @@ -133,7 +140,7 @@ func (f *fuseImpl) SetAddress(ctx context.Context, addr netip.AddrPort) error { func (f *fuseImpl) logPanic() { if r := recover(); r != nil { - dlog.Error(f.ctx, derror.PanicToError(r)) + dlog.Errorf(f.ctx, "%+v", derror.PanicToError(r)) } } @@ -233,10 +240,7 @@ func (f *fuseImpl) Mkdir(path string, mode uint32) int { err := f.withConn(func(conn *ftp.ServerConn) error { return conn.MakeDir(relpath(path)) }) - if err != nil { - return f.errToFuseErr(err) - } - return 0 + return f.errToFuseErr(err) } // Open ensures checks if the file exists, and if it doesn't, ensure that @@ -284,8 +288,8 @@ func (f *fuseImpl) Read(path string, buff []byte, ofst int64, fh uint64) int { of := uint64(ofst) - if fe.of != of && fe.rr != nil { - // This should normally not happen, but if it does, let's restart the read + if fe.rof != of && fe.rr != nil { + // Restart the read with new offset _ = fe.rr.Close() fe.rr = nil } @@ -293,11 +297,11 @@ func (f *fuseImpl) Read(path string, buff []byte, ofst int64, fh uint64) int { if fe.rr == nil { // Obtain the ftp.Response. It acts as an io.Reader rr, err := fe.conn.RetrFrom(relpath(path), of) - if err != nil { - return f.errToFuseErr(err) + if errCode = f.errToFuseErr(err); errCode < 0 { + return errCode } fe.rr = rr - fe.of = of + fe.rof = of } bytesRead := 0 @@ -310,10 +314,12 @@ func (f *fuseImpl) Read(path string, buff []byte, ofst int64, fh uint64) int { // Retain the ftp.Response until the file handle is released break } - return f.errToFuseErr(err) + if errCode = f.errToFuseErr(err); errCode < 0 { + return errCode + } } } - fe.of += uint64(bytesRead) + fe.rof += uint64(bytesRead) // Errors are always negative and Read expects the number of bytes read to be returned here. return bytesRead @@ -339,8 +345,9 @@ func (f *fuseImpl) Readdir(path string, fill func(name string, stat *fuse.Stat_t return errCode } es, err := fe.conn.List(relpath(path)) - if err != nil { - return f.errToFuseErr(err) + errCode = f.errToFuseErr(err) + if errCode < 0 { + return errCode } for _, e := range es { s := &fuse.Stat_t{} @@ -405,8 +412,8 @@ func (f *fuseImpl) Truncate(path string, size int64, fh uint64) int { if errCode < 0 { return errCode } - if err := fe.conn.StorFrom(relpath(path), bytes.NewReader(nil), uint64(size)); err != nil { - return f.errToFuseErr(err) + if errCode = f.errToFuseErr(fe.conn.StorFrom(relpath(path), bytes.NewReader(nil), uint64(size))); errCode < 0 { + return errCode } return 0 } @@ -427,42 +434,52 @@ func (f *fuseImpl) Unlink(path string) int { // data connection that is established to facilitate the write will remain open // until the handle is released by a call to Release func (f *fuseImpl) Write(path string, buf []byte, ofst int64, fh uint64) int { - dlog.Debugf(f.ctx, "Write(%s, sz=%d, off=%d, %d)", path, len(buf), ofst, fh) + defer f.logPanic() + // dlog.Debugf(f.ctx, "Write(%s, sz=%d, off=%d, %d)", path, len(buf), ofst, fh) fe, errCode := f.loadHandle(fh) if errCode < 0 { return errCode } of := uint64(ofst) - var err error + + pipeCopy := func(conn *ftp.ServerConn, of uint64) { + fe.wof = of + var reader io.ReadCloser + reader, fe.writer = io.Pipe() + fe.wg.Add(1) + go func() { + defer func() { + fe.wg.Done() + f.pool.put(f.ctx, conn) + }() + if err := conn.StorFrom(relpath(path), reader, of); err != nil { + dlog.Errorf(f.ctx, "error storing: %v", err) + } + }() + } // A connection dedicated to the Write function is needed because there // might be simultaneous Read and Write operations on the same file handle. if fe.writer == nil { - var conn *ftp.ServerConn - if conn, err = f.pool.get(f.ctx); err == nil { - var reader io.Reader - reader, fe.writer = io.Pipe() - - // start the pipe pumper. It ends when the fe.writer closes. That - // happens when Release is called - fe.wg.Add(1) - go func() { - defer func() { - fe.wg.Done() - f.pool.put(f.ctx, conn) - }() - if err := conn.StorFrom(relpath(path), reader, of); err != nil { - dlog.Errorf(f.ctx, "error storing: %v", err) - } - }() + conn, err := f.pool.get(f.ctx) + if errCode = f.errToFuseErr(err); errCode < 0 { + return errCode } - } - if err != nil { - return f.errToFuseErr(err) + + // start the pipe pumper. It ends when the fe.writer closes. That + // happens when Release is called + pipeCopy(conn, of) + } else if fe.wof != of { + // Drain and restart the write operation. + fe.writer.Close() + fe.wg.Wait() + pipeCopy(fe.conn, of) } n, err := fe.writer.Write(buf) - if err != nil { - return f.errToFuseErr(err) + if errCode = f.errToFuseErr(err); errCode < 0 { + n = errCode + } else { + fe.wof += uint64(n) } return n } @@ -510,12 +527,31 @@ func (f *fuseImpl) errToFuseErr(err error) int { return 0 } + var tpe *textproto.Error + if errors.As(err, &tpe) { + if tpe.Code == ftp.StatusCommandOK { + return 0 + } + switch tpe.Code { + case ftp.StatusClosingDataConnection: + return -fuse.ECONNABORTED + case ftp.StatusNotAvailable: + return -fuse.EADDRNOTAVAIL + case ftp.StatusCanNotOpenDataConnection: + return -fuse.ECONNREFUSED + case ftp.StatusTransfertAborted: + return -fuse.ECONNABORTED + case ftp.StatusInvalidCredentials: + return -fuse.EACCES + case ftp.StatusHostUnavailable: + return -fuse.EHOSTUNREACH + case ftp.StatusBadFileName: + return -fuse.EINVAL + } + } em := err.Error() switch { - case strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusCommandOK)): - return 0 case - strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusClosingDataConnection)), strings.Contains(em, errConnAborted), strings.Contains(em, errClosed): return -fuse.ECONNABORTED @@ -583,8 +619,9 @@ func (f *fuseImpl) openHandle(path string, create, append bool) (nfe *info, e *f return nil, nil, -fuse.ECANCELED } conn, err := f.pool.get(f.ctx) - if err != nil { - return nil, nil, f.errToFuseErr(err) + ec := f.errToFuseErr(err) + if ec < 0 { + return nil, nil, ec } defer func() { @@ -600,8 +637,8 @@ func (f *fuseImpl) openHandle(path string, create, append bool) (nfe *info, e *f } // Create an empty file to ensure that it can be created - if err = conn.Stor(relpath(path), bytes.NewReader(nil)); err != nil { - return nil, nil, f.errToFuseErr(err) + if ec = f.errToFuseErr(conn.Stor(relpath(path), bytes.NewReader(nil))); ec < 0 { + return nil, nil, ec } e = &ftp.Entry{ Name: filepath.Base(path), @@ -617,7 +654,7 @@ func (f *fuseImpl) openHandle(path string, create, append bool) (nfe *info, e *f conn: conn, } if append { - nfe.of = e.Size + nfe.wof = e.Size } f.Lock() f.current[f.nextHandle] = nfe diff --git a/pkg/fs/ftp_test.go b/pkg/fs/ftp_test.go index 4e5687d..ec1a3c9 100644 --- a/pkg/fs/ftp_test.go +++ b/pkg/fs/ftp_test.go @@ -602,13 +602,6 @@ func TestManyLargeFiles(t *testing.T) { root, port := startFTPServer(t, ctx, tmp, &wg) require.NotEqual(t, uint16(0), port) - _, host, mountPoint := startFUSEHost(t, ctx, port, tmp) - t.Cleanup(func() { - host.Stop() - cancel() - wg.Wait() - }) - const fileSize = 100 * 1024 * 1024 names := make([]string, manyLargeFilesCount) @@ -629,6 +622,18 @@ func TestManyLargeFiles(t *testing.T) { t.Fatal("failed attempting to create large files") } + _, host, mountPoint := startFUSEHost(t, ctx, port, tmp) + stopped := false + stopFuse := func() { + if !stopped { + stopped = true + host.Stop() + cancel() + wg.Wait() + } + } + t.Cleanup(stopFuse) + // Using the local filesystem, read the remote files while writing new ones. All in parallel. readWriteWg := &sync.WaitGroup{} readWriteWg.Add(manyLargeFilesCount * 2) @@ -651,6 +656,7 @@ func TestManyLargeFiles(t *testing.T) { }(i) } readWriteWg.Wait() + stopFuse() // Read files "on the remote server" and validate them. readRemoteWg := &sync.WaitGroup{} @@ -709,7 +715,7 @@ func validateLargeFile(name string, sz int) error { return err } if st.Size() != int64(sz) { - return fmt.Errorf("file size differ. Expected %d, got %d", sz, st.Size()) + return fmt.Errorf("file size of %s differ. Expected %d, got %d", name, sz, st.Size()) } bf := bufio.NewReader(f) qz := uint32(sz / 4) @@ -724,7 +730,7 @@ func validateLargeFile(name string, sz int) error { } x := binary.BigEndian.Uint32(buf) if i != x { - return fmt.Errorf("content differ at position %d: expected %d, got %d", i*4, i, x) + return fmt.Errorf("content of %s differ at position %d: expected %d, got %d", name, i*4, i, x) } } return nil diff --git a/pkg/fs/ftp_test_unix.go b/pkg/fs/ftp_test_unix.go index 8d57eb4..4f5129d 100644 --- a/pkg/fs/ftp_test_unix.go +++ b/pkg/fs/ftp_test_unix.go @@ -6,6 +6,6 @@ import ( "syscall" ) -const manyLargeFilesCount = 20 +const manyLargeFilesCount = 12 var interruptableSysProcAttr *syscall.SysProcAttr = nil //nolint:gochecknoglobals // OS-specific constant