Skip to content

Commit

Permalink
Merge pull request #2 from datawire/thallgren/start-error
Browse files Browse the repository at this point in the history
Always return error when fuse start fails to mount.
  • Loading branch information
thallgren authored Apr 21, 2023
2 parents 7f058b2 + 5374509 commit 9ce6909
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 50 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/datawire/dlib v1.3.1-0.20220715022530-b09ab2e017e1
github.com/datawire/go-ftpserver v0.1.2
github.com/datawire/go-ftpserver v0.1.3
github.com/datawire/go-fuseftp/rpc v0.2.0
github.com/jlaffaye/ftp v0.1.0
github.com/sirupsen/logrus v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/datawire/dlib v1.3.1-0.20220715022530-b09ab2e017e1 h1:u/41zrBxTiE9jmRMY3nJdnJAED4oI5zl1tmwqlhW25s=
github.com/datawire/dlib v1.3.1-0.20220715022530-b09ab2e017e1/go.mod h1:NiGDmetmbkBvtznpWSx6C0vA0s0LK9aHna3LJDqjruk=
github.com/datawire/go-ftpserver v0.1.2 h1:81/o2RnjxNT7r1FgYzgwqQQ/AcvWWtezj9ii/xTaERg=
github.com/datawire/go-ftpserver v0.1.2/go.mod h1:H9RuaneDn4sT3AaZwBmgYVfMTFLtbUmNs7Lj1UTWDYc=
github.com/datawire/go-ftpserver v0.1.3 h1:3kEzcPLG9jcP5oUpXHEKF2P8v0GxcpzSjNfwxEWZG+U=
github.com/datawire/go-ftpserver v0.1.3/go.mod h1:H9RuaneDn4sT3AaZwBmgYVfMTFLtbUmNs7Lj1UTWDYc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
1 change: 1 addition & 0 deletions pkg/fs/errmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fs
// error texts that are found in reported errors using strings.Contain(err.Error(), errXXX)
const (
errBrokenPipe = "broken pipe"
errClosed = "use of closed"
errConnAborted = "connection was aborted"
errConnRefused = "connection refused"
errIO = "input/output error"
Expand Down
5 changes: 4 additions & 1 deletion pkg/fs/ftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ func (f *fuseImpl) errToFuseErr(err error) int {
switch {
case strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusCommandOK)):
return 0
case strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusClosingDataConnection)), strings.Contains(em, errConnAborted):
case
strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusClosingDataConnection)),
strings.Contains(em, errConnAborted),
strings.Contains(em, errClosed):
return -fuse.ECONNABORTED
case containsAny(em, fs.ErrNotExist.Error(), errFileNotFound, errDirNotFound):
return -fuse.ENOENT
Expand Down
120 changes: 87 additions & 33 deletions pkg/fs/ftp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ import (
fs2 "io/fs"
"log"
"math"
"net"
"net/http"
_ "net/http/pprof"
"net/netip"
"os"
"os/exec"
"path/filepath"
"runtime"
"sort"
Expand All @@ -21,14 +25,24 @@ import (
"time"

"github.com/sirupsen/logrus"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/datawire/dlib/dlog"
server "github.com/datawire/go-ftpserver"
)

func TestMain(m *testing.M) {
go func() {
port := 6060
if os.Getenv("TEST_CALLED_FROM_TEST") == "1" {
port = 6061
}
log.Println(http.ListenAndServe(fmt.Sprintf("localhost:%d", port), nil))
}()
m.Run()
}

type tbWrapper struct {
testing.TB
level dlog.LogLevel
Expand Down Expand Up @@ -153,42 +167,85 @@ func startFTPServer(t *testing.T, ctx context.Context, dir string, wg *sync.Wait
dir = filepath.Join(dir, "server")
export := filepath.Join(dir, remoteDir)
require.NoError(t, os.MkdirAll(export, 0755))
portCh := make(chan uint16)

localAddr := func() *net.TCPAddr {
l, err := net.Listen("tcp", "0.0.0.0:0")
require.NoError(t, err)
addr := l.Addr().(*net.TCPAddr)
_ = l.Close()
return addr
}

quitAddr := localAddr()
ftpAddr := localAddr()
cmd := exec.Command(os.Args[0], "-test.run=TestHelperFTPServer", "--", dir, quitAddr.String(), ftpAddr.String())
cmd.SysProcAttr = interruptableSysProcAttr
cmd.Env = []string{"TEST_CALLED_FROM_TEST=1"}
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
require.NoError(t, cmd.Start())

go func() {
<-ctx.Done()
c, err := net.DialTimeout(quitAddr.Network(), quitAddr.String(), time.Second)
require.NoError(t, err)
_ = c.Close()
}()

wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, server.Start(ctx, "127.0.0.1", dir, portCh))
err := cmd.Wait()
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
}()
time.Sleep(100 * time.Millisecond)
return export, uint16(ftpAddr.Port)
}

select {
case <-ctx.Done():
return "", 0
case port := <-portCh:
return export, port
func TestHelperFTPServer(t *testing.T) {
if os.Getenv("TEST_CALLED_FROM_TEST") != "1" {
return
}
args := os.Args
require.Lenf(t, os.Args, 6, "usage %s -test.run=TestHelperFTPServer <dir> <quitAddr> <listenAddr>", args[1])

ql, err := net.Listen("tcp", args[4])
require.NoError(t, err, "unable to listen to %s", args[4])
defer ql.Close()

addr, err := netip.ParseAddrPort(args[5])
require.NoError(t, err, "unable to parse to %s", args[5])

ctx, cancel := context.WithCancel(context.Background())
go func() {
c, err := ql.Accept()
if err != nil {
fmt.Fprintf(os.Stderr, "quit acceptor: %v", err)
}
c.Close()
cancel()
}()
require.NoError(t, err, "unable to parse port")
require.NoError(t, server.StartOnPort(ctx, "127.0.0.1", args[3], addr.Port()))
<-ctx.Done()
logrus.Info("over and out")
}

func startFUSEHost(t *testing.T, ctx context.Context, port uint16, dir string) (FTPClient, *FuseHost, string) {
// Start the client
dir = filepath.Join(dir, "mount")
require.NoError(t, os.Mkdir(dir, 0755))
started := make(chan error, 1)
fsh, err := NewFTPClient(ctx, netip.MustParseAddrPort(fmt.Sprintf("127.0.0.1:%d", port)), remoteDir, 30*time.Second)
fsh, err := NewFTPClient(ctx, netip.MustParseAddrPort(fmt.Sprintf("127.0.0.1:%d", port)), remoteDir, 60*time.Second)
require.NoError(t, err)
mp := dir
if runtime.GOOS == "windows" {
mp = "T:"
dir = mp + `\`
}
host := NewHost(fsh, mp)
host.Start(ctx, started)
select {
case err := <-started:
require.NoError(t, err)
dlog.Info(ctx, "FUSE started")
case <-ctx.Done():
}
require.NoError(t, host.Start(ctx, 5*time.Second))
return fsh, host, dir
}

Expand Down Expand Up @@ -227,17 +284,15 @@ func TestBrokenConnection(t *testing.T) {
wg.Wait()

broken := func(err error) bool {
pe := &fs2.PathError{}
if errors.As(err, &pe) {
ee := pe.Error()
return containsAny(ee, errIO, errBrokenPipe, errConnRefused, errConnAborted, errUnexpectedNetworkError)
if err == nil {
return false
}
return false
return containsAny(err.Error(), errIO, errBrokenPipe, errClosed, errConnRefused, errConnAborted, errUnexpectedNetworkError)
}

t.Run("Stat", func(t *testing.T) {
_, err := os.Stat(filepath.Join(mountPoint, "somefile.txt"))
require.True(t, broken(err))
require.True(t, broken(err), err.Error())
})

t.Run("Create", func(t *testing.T) {
Expand Down Expand Up @@ -554,14 +609,13 @@ func TestManyLargeFiles(t *testing.T) {
wg.Wait()
})

const fileCount = 20
const fileSize = 100 * 1024 * 1024
names := make([]string, fileCount)
names := make([]string, manyLargeFilesCount)

// Create files "on the remote server".
createRemoteWg := &sync.WaitGroup{}
createRemoteWg.Add(fileCount)
for i := 0; i < fileCount; i++ {
createRemoteWg.Add(manyLargeFilesCount)
for i := 0; i < manyLargeFilesCount; i++ {
go func(i int) {
defer createRemoteWg.Done()
name, err := createLargeFile(root, fileSize)
Expand All @@ -577,17 +631,17 @@ func TestManyLargeFiles(t *testing.T) {

// Using the local filesystem, read the remote files while writing new ones. All in parallel.
readWriteWg := &sync.WaitGroup{}
readWriteWg.Add(fileCount * 2)
for i := 0; i < fileCount; i++ {
readWriteWg.Add(manyLargeFilesCount * 2)
for i := 0; i < manyLargeFilesCount; i++ {
go func(name string) {
defer readWriteWg.Done()
t.Logf("validating %s", name)
require.NoError(t, validateLargeFile(name, fileSize))
}(filepath.Join(mountPoint, filepath.Base(names[i])))
}

localNames := make([]string, fileCount)
for i := 0; i < fileCount; i++ {
localNames := make([]string, manyLargeFilesCount)
for i := 0; i < manyLargeFilesCount; i++ {
go func(i int) {
defer readWriteWg.Done()
name, err := createLargeFile(mountPoint, fileSize)
Expand All @@ -600,8 +654,8 @@ func TestManyLargeFiles(t *testing.T) {

// Read files "on the remote server" and validate them.
readRemoteWg := &sync.WaitGroup{}
readRemoteWg.Add(fileCount)
for i := 0; i < fileCount; i++ {
readRemoteWg.Add(manyLargeFilesCount)
for i := 0; i < manyLargeFilesCount; i++ {
go func(name string) {
defer readRemoteWg.Done()
t.Logf("validating %s", name)
Expand Down
11 changes: 11 additions & 0 deletions pkg/fs/ftp_test_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//go:build !windows

package fs

import (
"syscall"
)

const manyLargeFilesCount = 20

var interruptableSysProcAttr *syscall.SysProcAttr = nil //nolint:gochecknoglobals // OS-specific constant
9 changes: 9 additions & 0 deletions pkg/fs/ftp_test_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package fs

import (
"syscall"
)

const manyLargeFilesCount = 5 // Keep reasonably low or Windoze thinks it's under attach

var interruptableSysProcAttr = &syscall.SysProcAttr{CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP} //nolint:gochecknoglobals // OS-specific constant
8 changes: 6 additions & 2 deletions pkg/fs/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewHost(fsh fuse.FileSystemInterface, mountPoint string) *FuseHost {
}

// Start will mount the filesystem on the mountPoint passed to NewHost.
func (fh *FuseHost) Start(ctx context.Context, started chan error) {
func (fh *FuseHost) Start(ctx context.Context, startTimeout time.Duration) error {
ctx, cancel := context.WithCancel(ctx)
fh.cancel = cancel

Expand All @@ -44,7 +44,10 @@ func (fh *FuseHost) Start(ctx context.Context, started chan error) {
// user as the one that starts the FUSE mount
opts = append(opts, "-o", "uid=-1", "-o", "gid=-1")
}
go fh.detectFuseStarted(ctx, started)
started := make(chan error, 1)
startCtx, startCancel := context.WithTimeout(ctx, startTimeout)
defer startCancel()
go fh.detectFuseStarted(startCtx, started)
if dlog.MaxLogLevel(ctx) >= dlog.LogLevelDebug {
opts = append(opts, "-o", "debug")
}
Expand Down Expand Up @@ -72,6 +75,7 @@ func (fh *FuseHost) Start(ctx context.Context, started chan error) {
}
}
}()
return <-started
}

// Stop will unmount the file system and terminate the FTP client, wait for all clean-up to
Expand Down
9 changes: 8 additions & 1 deletion pkg/fs/fuse_started_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package fs

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -31,11 +32,17 @@ func (fh *FuseHost) detectFuseStarted(ctx context.Context, started chan error) {
for {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
started <- fmt.Errorf("timeout trying to stat mount point %q", fh.mountPoint)
} else {
started <- ctx.Err()
}
return
case <-ticker.C:
var mountSt unix.Stat_t
if err := unix.Stat(fh.mountPoint, &mountSt); err != nil {
dlog.Errorf(ctx, "unable to stat mount point %q: %v", fh.mountPoint, err)
// we don't consider a failure to stat an error here, just a cause for a retry.
dlog.Debugf(ctx, "unable to stat mount point %q: %v", fh.mountPoint, err)
} else {
if st.Ino != mountSt.Ino || st.Dev != mountSt.Dev {
return
Expand Down
7 changes: 7 additions & 0 deletions pkg/fs/fuse_started_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fs

import (
"context"
"errors"
"fmt"
"time"

"golang.org/x/sys/windows"
Expand All @@ -20,6 +22,11 @@ func (fh *FuseHost) detectFuseStarted(ctx context.Context, started chan error) {
for {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
started <- fmt.Errorf("timeout trying to read mount point %q", fh.mountPoint)
} else {
started <- ctx.Err()
}
return
case <-ticker.C:
fh, err := windows.CreateFile(devPath,
Expand Down
15 changes: 5 additions & 10 deletions pkg/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/netip"
"os"
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -65,14 +66,16 @@ func (s *service) Mount(_ context.Context, rq *rpc.MountRequest) (*rpc.MountIden
return nil, err
}
ctx, cancel := context.WithCancel(s.ctx)
started := make(chan error, 1)
fi, err := fs.NewFTPClient(ctx, ap, rq.Directory, rq.ReadTimeout.AsDuration())
if err != nil {
cancel()
return nil, status.Errorf(codes.Internal, err.Error())
}
host := fs.NewHost(fi, rq.MountPoint)
host.Start(ctx, started)
if err := host.Start(ctx, 5*time.Second); err != nil {
cancel()
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}

id := s.nextID
s.mounts[id] = &mount{
Expand All @@ -87,14 +90,6 @@ func (s *service) Mount(_ context.Context, rq *rpc.MountRequest) (*rpc.MountIden
},
}
s.nextID++
select {
case err := <-started:
if err != nil {
// Failed to open mountPoint
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}
case <-ctx.Done():
}
return &rpc.MountIdentifier{Id: id}, nil
}

Expand Down

0 comments on commit 9ce6909

Please sign in to comment.