diff --git a/go.mod b/go.mod index deee686..e203a42 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.20 require ( github.com/datawire/dlib v1.3.1-0.20220715022530-b09ab2e017e1 - github.com/datawire/go-ftpserver v0.1.2 + github.com/datawire/go-ftpserver v0.1.3 github.com/datawire/go-fuseftp/rpc v0.2.0 github.com/jlaffaye/ftp v0.1.0 github.com/sirupsen/logrus v1.9.0 diff --git a/go.sum b/go.sum index 9b9133d..9dd68b8 100644 --- a/go.sum +++ b/go.sum @@ -48,8 +48,8 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/datawire/dlib v1.3.1-0.20220715022530-b09ab2e017e1 h1:u/41zrBxTiE9jmRMY3nJdnJAED4oI5zl1tmwqlhW25s= github.com/datawire/dlib v1.3.1-0.20220715022530-b09ab2e017e1/go.mod h1:NiGDmetmbkBvtznpWSx6C0vA0s0LK9aHna3LJDqjruk= -github.com/datawire/go-ftpserver v0.1.2 h1:81/o2RnjxNT7r1FgYzgwqQQ/AcvWWtezj9ii/xTaERg= -github.com/datawire/go-ftpserver v0.1.2/go.mod h1:H9RuaneDn4sT3AaZwBmgYVfMTFLtbUmNs7Lj1UTWDYc= +github.com/datawire/go-ftpserver v0.1.3 h1:3kEzcPLG9jcP5oUpXHEKF2P8v0GxcpzSjNfwxEWZG+U= +github.com/datawire/go-ftpserver v0.1.3/go.mod h1:H9RuaneDn4sT3AaZwBmgYVfMTFLtbUmNs7Lj1UTWDYc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/pkg/fs/errmsg.go b/pkg/fs/errmsg.go index acbc93d..a5832a2 100644 --- a/pkg/fs/errmsg.go +++ b/pkg/fs/errmsg.go @@ -3,6 +3,7 @@ package fs // error texts that are found in reported errors using strings.Contain(err.Error(), errXXX) const ( errBrokenPipe = "broken pipe" + errClosed = "use of closed" errConnAborted = "connection was aborted" errConnRefused = "connection refused" errIO = "input/output error" diff --git a/pkg/fs/ftp.go b/pkg/fs/ftp.go index e9c75ca..2e0db56 100644 --- a/pkg/fs/ftp.go +++ b/pkg/fs/ftp.go @@ -488,7 +488,10 @@ func (f *fuseImpl) errToFuseErr(err error) int { switch { case strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusCommandOK)): return 0 - case strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusClosingDataConnection)), strings.Contains(em, errConnAborted): + case + strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusClosingDataConnection)), + strings.Contains(em, errConnAborted), + strings.Contains(em, errClosed): return -fuse.ECONNABORTED case containsAny(em, fs.ErrNotExist.Error(), errFileNotFound, errDirNotFound): return -fuse.ENOENT diff --git a/pkg/fs/ftp_test.go b/pkg/fs/ftp_test.go index ca91e12..4e5687d 100644 --- a/pkg/fs/ftp_test.go +++ b/pkg/fs/ftp_test.go @@ -10,8 +10,12 @@ import ( fs2 "io/fs" "log" "math" + "net" + "net/http" + _ "net/http/pprof" "net/netip" "os" + "os/exec" "path/filepath" "runtime" "sort" @@ -21,7 +25,6 @@ import ( "time" "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -29,6 +32,17 @@ import ( server "github.com/datawire/go-ftpserver" ) +func TestMain(m *testing.M) { + go func() { + port := 6060 + if os.Getenv("TEST_CALLED_FROM_TEST") == "1" { + port = 6061 + } + log.Println(http.ListenAndServe(fmt.Sprintf("localhost:%d", port), nil)) + }() + m.Run() +} + type tbWrapper struct { testing.TB level dlog.LogLevel @@ -153,28 +167,77 @@ func startFTPServer(t *testing.T, ctx context.Context, dir string, wg *sync.Wait dir = filepath.Join(dir, "server") export := filepath.Join(dir, remoteDir) require.NoError(t, os.MkdirAll(export, 0755)) - portCh := make(chan uint16) + + localAddr := func() *net.TCPAddr { + l, err := net.Listen("tcp", "0.0.0.0:0") + require.NoError(t, err) + addr := l.Addr().(*net.TCPAddr) + _ = l.Close() + return addr + } + + quitAddr := localAddr() + ftpAddr := localAddr() + cmd := exec.Command(os.Args[0], "-test.run=TestHelperFTPServer", "--", dir, quitAddr.String(), ftpAddr.String()) + cmd.SysProcAttr = interruptableSysProcAttr + cmd.Env = []string{"TEST_CALLED_FROM_TEST=1"} + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stdout + require.NoError(t, cmd.Start()) + + go func() { + <-ctx.Done() + c, err := net.DialTimeout(quitAddr.Network(), quitAddr.String(), time.Second) + require.NoError(t, err) + _ = c.Close() + }() wg.Add(1) go func() { defer wg.Done() - assert.NoError(t, server.Start(ctx, "127.0.0.1", dir, portCh)) + err := cmd.Wait() + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + } }() + time.Sleep(100 * time.Millisecond) + return export, uint16(ftpAddr.Port) +} - select { - case <-ctx.Done(): - return "", 0 - case port := <-portCh: - return export, port +func TestHelperFTPServer(t *testing.T) { + if os.Getenv("TEST_CALLED_FROM_TEST") != "1" { + return } + args := os.Args + require.Lenf(t, os.Args, 6, "usage %s -test.run=TestHelperFTPServer ", args[1]) + + ql, err := net.Listen("tcp", args[4]) + require.NoError(t, err, "unable to listen to %s", args[4]) + defer ql.Close() + + addr, err := netip.ParseAddrPort(args[5]) + require.NoError(t, err, "unable to parse to %s", args[5]) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + c, err := ql.Accept() + if err != nil { + fmt.Fprintf(os.Stderr, "quit acceptor: %v", err) + } + c.Close() + cancel() + }() + require.NoError(t, err, "unable to parse port") + require.NoError(t, server.StartOnPort(ctx, "127.0.0.1", args[3], addr.Port())) + <-ctx.Done() + logrus.Info("over and out") } func startFUSEHost(t *testing.T, ctx context.Context, port uint16, dir string) (FTPClient, *FuseHost, string) { // Start the client dir = filepath.Join(dir, "mount") require.NoError(t, os.Mkdir(dir, 0755)) - started := make(chan error, 1) - fsh, err := NewFTPClient(ctx, netip.MustParseAddrPort(fmt.Sprintf("127.0.0.1:%d", port)), remoteDir, 30*time.Second) + fsh, err := NewFTPClient(ctx, netip.MustParseAddrPort(fmt.Sprintf("127.0.0.1:%d", port)), remoteDir, 60*time.Second) require.NoError(t, err) mp := dir if runtime.GOOS == "windows" { @@ -182,13 +245,7 @@ func startFUSEHost(t *testing.T, ctx context.Context, port uint16, dir string) ( dir = mp + `\` } host := NewHost(fsh, mp) - host.Start(ctx, started) - select { - case err := <-started: - require.NoError(t, err) - dlog.Info(ctx, "FUSE started") - case <-ctx.Done(): - } + require.NoError(t, host.Start(ctx, 5*time.Second)) return fsh, host, dir } @@ -227,17 +284,15 @@ func TestBrokenConnection(t *testing.T) { wg.Wait() broken := func(err error) bool { - pe := &fs2.PathError{} - if errors.As(err, &pe) { - ee := pe.Error() - return containsAny(ee, errIO, errBrokenPipe, errConnRefused, errConnAborted, errUnexpectedNetworkError) + if err == nil { + return false } - return false + return containsAny(err.Error(), errIO, errBrokenPipe, errClosed, errConnRefused, errConnAborted, errUnexpectedNetworkError) } t.Run("Stat", func(t *testing.T) { _, err := os.Stat(filepath.Join(mountPoint, "somefile.txt")) - require.True(t, broken(err)) + require.True(t, broken(err), err.Error()) }) t.Run("Create", func(t *testing.T) { @@ -554,14 +609,13 @@ func TestManyLargeFiles(t *testing.T) { wg.Wait() }) - const fileCount = 20 const fileSize = 100 * 1024 * 1024 - names := make([]string, fileCount) + names := make([]string, manyLargeFilesCount) // Create files "on the remote server". createRemoteWg := &sync.WaitGroup{} - createRemoteWg.Add(fileCount) - for i := 0; i < fileCount; i++ { + createRemoteWg.Add(manyLargeFilesCount) + for i := 0; i < manyLargeFilesCount; i++ { go func(i int) { defer createRemoteWg.Done() name, err := createLargeFile(root, fileSize) @@ -577,8 +631,8 @@ func TestManyLargeFiles(t *testing.T) { // Using the local filesystem, read the remote files while writing new ones. All in parallel. readWriteWg := &sync.WaitGroup{} - readWriteWg.Add(fileCount * 2) - for i := 0; i < fileCount; i++ { + readWriteWg.Add(manyLargeFilesCount * 2) + for i := 0; i < manyLargeFilesCount; i++ { go func(name string) { defer readWriteWg.Done() t.Logf("validating %s", name) @@ -586,8 +640,8 @@ func TestManyLargeFiles(t *testing.T) { }(filepath.Join(mountPoint, filepath.Base(names[i]))) } - localNames := make([]string, fileCount) - for i := 0; i < fileCount; i++ { + localNames := make([]string, manyLargeFilesCount) + for i := 0; i < manyLargeFilesCount; i++ { go func(i int) { defer readWriteWg.Done() name, err := createLargeFile(mountPoint, fileSize) @@ -600,8 +654,8 @@ func TestManyLargeFiles(t *testing.T) { // Read files "on the remote server" and validate them. readRemoteWg := &sync.WaitGroup{} - readRemoteWg.Add(fileCount) - for i := 0; i < fileCount; i++ { + readRemoteWg.Add(manyLargeFilesCount) + for i := 0; i < manyLargeFilesCount; i++ { go func(name string) { defer readRemoteWg.Done() t.Logf("validating %s", name) diff --git a/pkg/fs/ftp_test_unix.go b/pkg/fs/ftp_test_unix.go new file mode 100644 index 0000000..8d57eb4 --- /dev/null +++ b/pkg/fs/ftp_test_unix.go @@ -0,0 +1,11 @@ +//go:build !windows + +package fs + +import ( + "syscall" +) + +const manyLargeFilesCount = 20 + +var interruptableSysProcAttr *syscall.SysProcAttr = nil //nolint:gochecknoglobals // OS-specific constant diff --git a/pkg/fs/ftp_test_windows.go b/pkg/fs/ftp_test_windows.go new file mode 100644 index 0000000..9f011da --- /dev/null +++ b/pkg/fs/ftp_test_windows.go @@ -0,0 +1,9 @@ +package fs + +import ( + "syscall" +) + +const manyLargeFilesCount = 5 // Keep reasonably low or Windoze thinks it's under attach + +var interruptableSysProcAttr = &syscall.SysProcAttr{CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP} //nolint:gochecknoglobals // OS-specific constant diff --git a/pkg/fs/fuse.go b/pkg/fs/fuse.go index 1c0081a..f7f9f81 100644 --- a/pkg/fs/fuse.go +++ b/pkg/fs/fuse.go @@ -29,7 +29,7 @@ func NewHost(fsh fuse.FileSystemInterface, mountPoint string) *FuseHost { } // Start will mount the filesystem on the mountPoint passed to NewHost. -func (fh *FuseHost) Start(ctx context.Context, started chan error) { +func (fh *FuseHost) Start(ctx context.Context, startTimeout time.Duration) error { ctx, cancel := context.WithCancel(ctx) fh.cancel = cancel @@ -44,7 +44,10 @@ func (fh *FuseHost) Start(ctx context.Context, started chan error) { // user as the one that starts the FUSE mount opts = append(opts, "-o", "uid=-1", "-o", "gid=-1") } - go fh.detectFuseStarted(ctx, started) + started := make(chan error, 1) + startCtx, startCancel := context.WithTimeout(ctx, startTimeout) + defer startCancel() + go fh.detectFuseStarted(startCtx, started) if dlog.MaxLogLevel(ctx) >= dlog.LogLevelDebug { opts = append(opts, "-o", "debug") } @@ -72,6 +75,7 @@ func (fh *FuseHost) Start(ctx context.Context, started chan error) { } } }() + return <-started } // Stop will unmount the file system and terminate the FTP client, wait for all clean-up to diff --git a/pkg/fs/fuse_started_unix.go b/pkg/fs/fuse_started_unix.go index 3786b0e..5eb7fe0 100644 --- a/pkg/fs/fuse_started_unix.go +++ b/pkg/fs/fuse_started_unix.go @@ -4,6 +4,7 @@ package fs import ( "context" + "errors" "fmt" "time" @@ -31,11 +32,17 @@ func (fh *FuseHost) detectFuseStarted(ctx context.Context, started chan error) { for { select { case <-ctx.Done(): + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + started <- fmt.Errorf("timeout trying to stat mount point %q", fh.mountPoint) + } else { + started <- ctx.Err() + } return case <-ticker.C: var mountSt unix.Stat_t if err := unix.Stat(fh.mountPoint, &mountSt); err != nil { - dlog.Errorf(ctx, "unable to stat mount point %q: %v", fh.mountPoint, err) + // we don't consider a failure to stat an error here, just a cause for a retry. + dlog.Debugf(ctx, "unable to stat mount point %q: %v", fh.mountPoint, err) } else { if st.Ino != mountSt.Ino || st.Dev != mountSt.Dev { return diff --git a/pkg/fs/fuse_started_windows.go b/pkg/fs/fuse_started_windows.go index 3060ff6..707b748 100644 --- a/pkg/fs/fuse_started_windows.go +++ b/pkg/fs/fuse_started_windows.go @@ -2,6 +2,8 @@ package fs import ( "context" + "errors" + "fmt" "time" "golang.org/x/sys/windows" @@ -20,6 +22,11 @@ func (fh *FuseHost) detectFuseStarted(ctx context.Context, started chan error) { for { select { case <-ctx.Done(): + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + started <- fmt.Errorf("timeout trying to read mount point %q", fh.mountPoint) + } else { + started <- ctx.Err() + } return case <-ticker.C: fh, err := windows.CreateFile(devPath, diff --git a/pkg/main/main.go b/pkg/main/main.go index 6e224f7..7b5bb10 100644 --- a/pkg/main/main.go +++ b/pkg/main/main.go @@ -10,6 +10,7 @@ import ( "net/netip" "os" "sync" + "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -65,14 +66,16 @@ func (s *service) Mount(_ context.Context, rq *rpc.MountRequest) (*rpc.MountIden return nil, err } ctx, cancel := context.WithCancel(s.ctx) - started := make(chan error, 1) fi, err := fs.NewFTPClient(ctx, ap, rq.Directory, rq.ReadTimeout.AsDuration()) if err != nil { cancel() return nil, status.Errorf(codes.Internal, err.Error()) } host := fs.NewHost(fi, rq.MountPoint) - host.Start(ctx, started) + if err := host.Start(ctx, 5*time.Second); err != nil { + cancel() + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } id := s.nextID s.mounts[id] = &mount{ @@ -87,14 +90,6 @@ func (s *service) Mount(_ context.Context, rq *rpc.MountRequest) (*rpc.MountIden }, } s.nextID++ - select { - case err := <-started: - if err != nil { - // Failed to open mountPoint - return nil, status.Errorf(codes.InvalidArgument, err.Error()) - } - case <-ctx.Done(): - } return &rpc.MountIdentifier{Id: id}, nil }