diff --git a/go.mod b/go.mod index 3a5fd05..e2e2439 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/datawire/go-fuseftp go 1.20 require ( - github.com/datawire/dlib v1.3.1-0.20220715022530-b09ab2e017e1 github.com/datawire/go-ftpserver v0.1.3 github.com/datawire/go-fuseftp/rpc v0.3.1 github.com/jlaffaye/ftp v0.1.0 @@ -16,11 +15,12 @@ require ( ) require ( + github.com/datawire/dlib v1.3.1-0.20220715022530-b09ab2e017e1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fclairamb/ftpserverlib v0.21.0 // indirect github.com/fclairamb/go-log v0.4.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index 9dd68b8..42b09fe 100644 --- a/go.sum +++ b/go.sum @@ -127,9 +127,8 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= -github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= -github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= diff --git a/pkg/fs/connpool.go b/pkg/fs/connpool.go index e19949a..07b23f3 100644 --- a/pkg/fs/connpool.go +++ b/pkg/fs/connpool.go @@ -1,7 +1,6 @@ package fs import ( - "context" "net" "net/netip" "strings" @@ -9,8 +8,7 @@ import ( "time" "github.com/jlaffaye/ftp" - - "github.com/datawire/dlib/dlog" + log "github.com/sirupsen/logrus" ) type connList struct { @@ -69,10 +67,8 @@ type connPool struct { } // connect returns a new connection without using the pool. Use get instead of connect. -func (p *connPool) connect(ctx context.Context) (*ftp.ServerConn, error) { - opts := []ftp.DialOption{ - ftp.DialWithContext(ctx), - } +func (p *connPool) connect() (*ftp.ServerConn, error) { + var opts []ftp.DialOption if p.timeout > 0 { opts = append(opts, ftp.DialWithTimeout(p.timeout), @@ -109,7 +105,7 @@ func (p *connPool) connect(ctx context.Context) (*ftp.ServerConn, error) { } // get returns a connection from the pool, or creates a new connection if needed -func (p *connPool) get(ctx context.Context) (conn *ftp.ServerConn, err error) { +func (p *connPool) get() (conn *ftp.ServerConn, err error) { p.Lock() if idle := p.idleList; idle != nil { p.idleList = idle.next @@ -119,14 +115,14 @@ func (p *connPool) get(ctx context.Context) (conn *ftp.ServerConn, err error) { } p.Unlock() if conn == nil { - conn, err = p.connect(ctx) + conn, err = p.connect() } return } // setAddr will call Quit on all open connections, both busy and idle, change // the address, reconnect one connection, and put it in the idle list. -func (p *connPool) setAddr(ctx context.Context, addr netip.AddrPort) error { +func (p *connPool) setAddr(addr netip.AddrPort) error { var idle []*ftp.ServerConn var busy []*ftp.ServerConn p.Lock() @@ -142,21 +138,21 @@ func (p *connPool) setAddr(ctx context.Context, addr netip.AddrPort) error { if eq { return nil } - closeList(ctx, idle, true) - closeList(ctx, busy, true) + closeList(idle, true) + closeList(busy, true) // Create the first connection up front, so that a failure to connect to the server is caught early - conn, err := p.connect(ctx) + conn, err := p.connect() if err != nil { return err } // return the connection to the pool - p.put(ctx, conn) + p.put(conn) return nil } // put returns a connection to the pool -func (p *connPool) put(ctx context.Context, conn *ftp.ServerConn) { +func (p *connPool) put(conn *ftp.ServerConn) { // remove from busyList p.Lock() removed := false @@ -188,32 +184,32 @@ func (p *connPool) put(ctx context.Context, conn *ftp.ServerConn) { p.Unlock() } -func closeList(ctx context.Context, conns []*ftp.ServerConn, silent bool) { +func closeList(conns []*ftp.ServerConn, silent bool) { for _, c := range conns { if err := c.Quit(); err != nil && !silent { if !strings.Contains(err.Error(), "use of closed") { - dlog.Errorf(ctx, "quit failed: %v", err) + log.Errorf("quit failed: %v", err) } } } } // quit calls the Quit method on all connections and empties the pool -func (p *connPool) quit(ctx context.Context) { +func (p *connPool) quit() { p.Lock() idle := p.idleList.conns() busy := p.idleList.conns() p.idleList = nil p.busyList = nil p.Unlock() - closeList(ctx, idle, false) - closeList(ctx, busy, false) + closeList(idle, false) + closeList(busy, false) } // tidy will attempt to shrink the number of open connections to two, but since it // only closes connections that are idle at the time the call is made, there // might still be more than 2 connections after the call returns. -func (p *connPool) tidy(ctx context.Context) { +func (p *connPool) tidy() { p.Lock() idle := p.idleList.conns() idleCount := 64 - p.busyList.size() @@ -226,5 +222,5 @@ func (p *connPool) tidy(ctx context.Context) { } } p.Unlock() - closeList(ctx, cl, false) + closeList(cl, false) } diff --git a/pkg/fs/ftp.go b/pkg/fs/ftp.go index 0084c64..4ca285e 100644 --- a/pkg/fs/ftp.go +++ b/pkg/fs/ftp.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "errors" - "fmt" "io" "io/fs" "math" @@ -19,10 +18,8 @@ import ( "time" "github.com/jlaffaye/ftp" + log "github.com/sirupsen/logrus" "github.com/winfsp/cgofuse/fuse" - - "github.com/datawire/dlib/derror" - "github.com/datawire/dlib/dlog" ) // fuseImpl implements the fuse.FileSystemInterface. The official documentation for the API @@ -33,11 +30,7 @@ type fuseImpl struct { // connPool is the pool of control connections to the remote FTP server. pool connPool - // ctx is only used for logging purposes and final termination, because neither the ftp - // nor the fuse implementation is context aware at this point. - ctx context.Context - - // cancel the ctx, and hence the GC loop + // cancel the GC loop cancel context.CancelFunc // Mutex protects nextHandle, current, and shuttingDown @@ -83,7 +76,10 @@ type info struct { // Current offset for write operations. wof uint64 - // The writer is the writer side of a io.Pipe() used when writing data to a remote file. + // The Entry from the ftp server with estimated size based on initial size and write/truncate operations + entry ftp.Entry + + // The writer is the writer side of an io.Pipe() used when writing data to a remote file. writer io.WriteCloser // 1 is added to this wg when the reader/writer pipe is created. Wait for it when closing the writer. @@ -93,10 +89,10 @@ type info struct { // close this handle and free up any resources that it holds. func (i *info) close() { if i.rr != nil { - i.rr.Close() + _ = i.rr.Close() } if i.writer != nil { - i.writer.Close() + _ = i.writer.Close() } i.wg.Wait() } @@ -109,46 +105,49 @@ type FTPClient interface { // SetAddress will quit open connections, change the address, and reconnect // The method is intended to be used when a FUSE mount must survive a change of // FTP server address. - SetAddress(ctx context.Context, addr netip.AddrPort) error + SetAddress(addr netip.AddrPort) error } // NewFTPClient returns an implementation of the fuse.FileSystemInterface that is backed by // an FTP server connection tp the address. The dir parameter is the directory that the // FTP server changes to when connecting. func NewFTPClient(ctx context.Context, addr netip.AddrPort, dir string, readTimeout time.Duration) (FTPClient, error) { + ctx, cancel := context.WithCancel(ctx) f := &fuseImpl{ + cancel: cancel, current: make(map[uint64]*info), pool: connPool{ dir: dir, timeout: readTimeout, }, } + go func() { + ticker := time.NewTicker(stalePeriod) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + f.pool.tidy() + } + } + }() - ctx, cancel := context.WithCancel(ctx) - if err := f.pool.setAddr(ctx, addr); err != nil { + if err := f.pool.setAddr(addr); err != nil { cancel() return nil, err } - f.ctx = ctx - f.cancel = cancel return f, nil } -func (f *fuseImpl) SetAddress(ctx context.Context, addr netip.AddrPort) error { - return f.pool.setAddr(ctx, addr) -} - -func (f *fuseImpl) logPanic() { - if r := recover(); r != nil { - dlog.Errorf(f.ctx, "%+v", derror.PanicToError(r)) - } +func (f *fuseImpl) SetAddress(addr netip.AddrPort) error { + return f.pool.setAddr(addr) } // Create will create a file of size zero unless the file already exists // The third argument, the mode bits, are currently ignored func (f *fuseImpl) Create(path string, flags int, _ uint32) (int, uint64) { - defer f.logPanic() - dlog.Debugf(f.ctx, "Create(%s, %#x)", path, flags) + log.Debugf("Create(%s, %#x)", path, flags) fe, _, errCode := f.openHandle(path, true, flags&os.O_APPEND == os.O_APPEND) if errCode < 0 { return errCode, 0 @@ -158,8 +157,7 @@ func (f *fuseImpl) Create(path string, flags int, _ uint32) (int, uint64) { // Destroy will drain all ongoing writes, and for each active connection, send the QUIT message to the FTP server and disconnect func (f *fuseImpl) Destroy() { - defer f.logPanic() - dlog.Debug(f.ctx, "Destroy") + log.Debug("Destroy") f.Lock() // Prevent new entries from being added @@ -185,14 +183,13 @@ func (f *fuseImpl) Destroy() { }(fe) } wg.Wait() - f.pool.quit(f.ctx) + f.pool.quit() f.cancel() } // Flush is a noop in this implementation func (f *fuseImpl) Flush(path string, fh uint64) int { - defer f.logPanic() - dlog.Debugf(f.ctx, "Flush(%s, %d)", path, fh) + log.Debugf("Flush(%s, %d)", path, fh) return 0 } @@ -200,8 +197,13 @@ func (f *fuseImpl) Flush(path string, fh uint64) int { // UID and GID of the caller. File mode is always 0644 and Directory // mode is always 0755. func (f *fuseImpl) Getattr(path string, s *fuse.Stat_t, fh uint64) int { - defer f.logPanic() - dlog.Debugf(f.ctx, "Getattr(%s, %d)", path, fh) + if runtime.GOOS == "darwin" { + fn := filepath.Base(path) + if fn == ".DS_Store" || strings.HasPrefix("._", fn) { + return -fuse.ENOENT + } + } + log.Debugf("Getattr(%s, %d)", path, fh) var e *ftp.Entry var errCode int if fh != math.MaxUint64 { @@ -219,24 +221,11 @@ func (f *fuseImpl) Getattr(path string, s *fuse.Stat_t, fh uint64) int { // Init starts the garbage collector that removes cached items when they // haven't been used for a period of time. func (f *fuseImpl) Init() { - defer f.logPanic() - dlog.Debug(f.ctx, "Init") - go func() { - ticker := time.NewTicker(stalePeriod) - for { - select { - case <-f.ctx.Done(): - return - case <-ticker.C: - f.pool.tidy(f.ctx) - } - } - }() + log.Debug("Init") } func (f *fuseImpl) Mkdir(path string, mode uint32) int { - defer f.logPanic() - dlog.Debugf(f.ctx, "Mkdir(%s, %O)", path, mode) + log.Debugf("Mkdir(%s, %O)", path, mode) err := f.withConn(func(conn *ftp.ServerConn) error { return conn.MakeDir(relpath(path)) }) @@ -246,20 +235,18 @@ func (f *fuseImpl) Mkdir(path string, mode uint32) int { // Open ensures checks if the file exists, and if it doesn't, ensure that // a file of size zero can be created in the server. func (f *fuseImpl) Open(path string, flags int) (int, uint64) { - defer f.logPanic() - dlog.Debugf(f.ctx, "Open(%s, %#x)", path, flags) + log.Debugf("Open(%s, %#x)", path, flags) fe, _, errCode := f.openHandle(path, flags&os.O_CREATE == os.O_CREATE, flags&os.O_APPEND == os.O_APPEND) if errCode < 0 { return errCode, 0 } - dlog.Debugf(f.ctx, "Open(%s, %#x) -> %d", path, flags, fe.fh) + log.Debugf("Open(%s, %#x) -> %d", path, flags, fe.fh) return 0, fe.fh } // Opendir is like Open but will fail unless the path represents a directory func (f *fuseImpl) Opendir(path string) (int, uint64) { - defer f.logPanic() - dlog.Debugf(f.ctx, "Opendir(%s)", path) + log.Debugf("Opendir(%s)", path) fe, e, errCode := f.openHandle(path, false, false) if errCode < 0 { return errCode, 0 @@ -268,7 +255,7 @@ func (f *fuseImpl) Opendir(path string) (int, uint64) { f.delete(fe.fh) return -fuse.ENOTDIR, 0 } - dlog.Debugf(f.ctx, "Opendir(%s) -> %d", path, fe.fh) + log.Debugf("Opendir(%s) -> %d", path, fe.fh) return 0, fe.fh } @@ -279,8 +266,7 @@ func (f *fuseImpl) Opendir(path string) (int, uint64) { // Read requires that fuse is started with -o sync_read to ensure that the // read calls arrive in sequence. func (f *fuseImpl) Read(path string, buff []byte, ofst int64, fh uint64) int { - defer f.logPanic() - dlog.Debugf(f.ctx, "Read(%s, sz=%d, off=%d, %d)", path, len(buff), ofst, fh) + log.Debugf("Read(%s, sz=%d, off=%d, %d)", path, len(buff), ofst, fh) fe, errCode := f.loadHandle(fh) if errCode < 0 { return errCode @@ -331,8 +317,8 @@ func relpath(path string) string { // Readdir will read the remote directory using an MLSD command and call the given fill function // for each entry that was found. The ofst parameter is ignored. -func (f *fuseImpl) Readdir(path string, fill func(name string, stat *fuse.Stat_t, ofst int64) bool, ofst int64, fh uint64) int { - dlog.Debugf(f.ctx, "ReadDir(%s, %d)", path, fh) +func (f *fuseImpl) Readdir(path string, fill func(name string, stat *fuse.Stat_t, ofst int64) bool, _ int64, fh uint64) int { + log.Debugf("ReadDir(%s, %d)", path, fh) var fe *info var errCode int if fh == math.MaxUint64 { @@ -361,21 +347,21 @@ func (f *fuseImpl) Readdir(path string, fill func(name string, stat *fuse.Stat_t // Release will release the resources associated with the given file handle func (f *fuseImpl) Release(path string, fh uint64) int { - dlog.Debugf(f.ctx, "Release(%s, %d)", path, fh) + log.Debugf("Release(%s, %d)", path, fh) f.delete(fh) return 0 } // Releasedir will release the resources associated with the given file handle func (f *fuseImpl) Releasedir(path string, fh uint64) int { - dlog.Debugf(f.ctx, "Releasedir(%s, %d)", path, fh) + log.Debugf("Releasedir(%s, %d)", path, fh) f.delete(fh) return 0 } // Rename will rename or move oldpath to newpath func (f *fuseImpl) Rename(oldpath string, newpath string) int { - dlog.Debugf(f.ctx, "Rename(%s, %s)", oldpath, newpath) + log.Debugf("Rename(%s, %s)", oldpath, newpath) if oldpath == newpath { return 0 } @@ -387,7 +373,7 @@ func (f *fuseImpl) Rename(oldpath string, newpath string) int { // Rmdir removes the directory at path. The directory must be empty func (f *fuseImpl) Rmdir(path string) int { - dlog.Debugf(f.ctx, "Rmdir(%s)", path) + log.Debugf("Rmdir(%s)", path) return f.errToFuseErr(f.withConn(func(conn *ftp.ServerConn) error { if err := conn.RemoveDir(relpath(path)); err != nil { return err @@ -400,7 +386,7 @@ func (f *fuseImpl) Rmdir(path string) int { // Truncate will truncate the given file to a certain size using a STOR command // with zero bytes and an offset. This behavior will only work with some servers. func (f *fuseImpl) Truncate(path string, size int64, fh uint64) int { - dlog.Debugf(f.ctx, "Truncate(%s, sz=%d, %d)", path, size, fh) + log.Debugf("Truncate(%s, sz=%d, %d)", path, size, fh) var fe *info var errCode int if fh == math.MaxUint64 { @@ -412,15 +398,19 @@ func (f *fuseImpl) Truncate(path string, size int64, fh uint64) int { if errCode < 0 { return errCode } - if errCode = f.errToFuseErr(fe.conn.StorFrom(relpath(path), bytes.NewReader(nil), uint64(size))); errCode < 0 { + sz := uint64(size) + if errCode = f.errToFuseErr(fe.conn.StorFrom(relpath(path), bytes.NewReader(nil), sz)); errCode < 0 { return errCode } + if sz < fe.entry.Size { + fe.entry.Size = sz + } return 0 } // Unlink will remove the path from the file system. func (f *fuseImpl) Unlink(path string) int { - dlog.Debugf(f.ctx, "Unlink(%s)", path) + log.Debugf("Unlink(%s)", path) return f.errToFuseErr(f.withConn(func(conn *ftp.ServerConn) error { if err := conn.Delete(relpath(path)); err != nil { return err @@ -430,56 +420,62 @@ func (f *fuseImpl) Unlink(path string) int { })) } -// Write writes the gven data to a file at the given offset in that file. The -// data connection that is established to facilitate the write will remain open +func (i *info) pipeCopy(of uint64) int { + // A connection dedicated to the Write function is needed because there + // might be simultaneous Read and Write operations on the same file handle. + conn, err := i.pool.get() + if errCode := i.errToFuseErr(err); errCode < 0 { + return errCode + } + i.wof = of + var reader io.ReadCloser + reader, i.writer = io.Pipe() + i.wg.Add(1) + go func() { + defer func() { + i.wg.Done() + i.pool.put(conn) + }() + if err := conn.StorFrom(relpath(i.path), reader, of); err != nil { + log.Errorf("error storing: %v", err) + } + }() + return 0 +} + +// Write writes the given data to a file at the given offset in that file. The data +// connection that is established to facilitate the data transfer will remain open // until the handle is released by a call to Release func (f *fuseImpl) Write(path string, buf []byte, ofst int64, fh uint64) int { - defer f.logPanic() - // dlog.Debugf(f.ctx, "Write(%s, sz=%d, off=%d, %d)", path, len(buf), ofst, fh) + log.Debugf("Write(%s, sz=%d, off=%d, %d)", path, len(buf), ofst, fh) fe, errCode := f.loadHandle(fh) if errCode < 0 { return errCode } of := uint64(ofst) - pipeCopy := func(conn *ftp.ServerConn, of uint64) { - fe.wof = of - var reader io.ReadCloser - reader, fe.writer = io.Pipe() - fe.wg.Add(1) - go func() { - defer func() { - fe.wg.Done() - f.pool.put(f.ctx, conn) - }() - if err := conn.StorFrom(relpath(path), reader, of); err != nil { - dlog.Errorf(f.ctx, "error storing: %v", err) - } - }() - } - - // A connection dedicated to the Write function is needed because there - // might be simultaneous Read and Write operations on the same file handle. + var ec int if fe.writer == nil { - conn, err := f.pool.get(f.ctx) - if errCode = f.errToFuseErr(err); errCode < 0 { - return errCode - } - // start the pipe pumper. It ends when the fe.writer closes. That // happens when Release is called - pipeCopy(conn, of) + ec = fe.pipeCopy(of) } else if fe.wof != of { // Drain and restart the write operation. - fe.writer.Close() + _ = fe.writer.Close() fe.wg.Wait() - pipeCopy(fe.conn, of) + ec = fe.pipeCopy(of) + } + if ec != 0 { + return ec } n, err := fe.writer.Write(buf) if errCode = f.errToFuseErr(err); errCode < 0 { n = errCode } else { fe.wof += uint64(n) + if fe.wof > fe.entry.Size { + fe.entry.Size = fe.wof + } } return n } @@ -505,7 +501,7 @@ func (f *fuseImpl) clearPath(p string) { f.Lock() delete(f.current, fe.fh) f.Unlock() - f.pool.put(f.ctx, fe.conn) + f.pool.put(fe.conn) } } @@ -518,7 +514,7 @@ func (f *fuseImpl) delete(fh uint64) { f.Lock() delete(f.current, fe.fh) f.Unlock() - f.pool.put(f.ctx, fe.conn) + f.pool.put(fe.conn) } } @@ -566,9 +562,6 @@ func (f *fuseImpl) errToFuseErr(err error) int { case strings.Contains(em, errConnRefused): return -fuse.ECONNREFUSED case strings.Contains(em, errIOTimeout): - buf := make([]byte, 0x10000) - n := runtime.Stack(buf, false) - fmt.Fprintf(dlog.StdLogger(f.ctx, dlog.MaxLogLevel(f.ctx)).Writer(), "%T %v\n%s", err, err, string(buf[:n])) return -fuse.ETIMEDOUT case strings.Contains(em, errFileExists): return -fuse.EEXIST @@ -576,12 +569,20 @@ func (f *fuseImpl) errToFuseErr(err error) int { // TODO buf := make([]byte, 0x10000) n := runtime.Stack(buf, false) - fmt.Fprintf(dlog.StdLogger(f.ctx, dlog.MaxLogLevel(f.ctx)).Writer(), "%T %v\n%s", err, err, string(buf[:n])) + log.Printf("%T %v\n%s", err, err, string(buf[:n])) return -fuse.EIO } } func (f *fuseImpl) getEntry(path string) (e *ftp.Entry, fuseErr int) { + f.RLock() + for _, fe := range f.current { + if fe.path == path { + f.RUnlock() + return &fe.entry, 0 + } + } + f.RUnlock() err := f.withConn(func(conn *ftp.ServerConn) error { var err error e, err = conn.GetEntry(relpath(path)) @@ -597,8 +598,7 @@ func (f *fuseImpl) loadEntry(fh uint64) (*ftp.Entry, int) { if !ok { return nil, -fuse.ENOENT } - e, err := fe.conn.GetEntry(relpath(fe.path)) - return e, f.errToFuseErr(err) + return &fe.entry, 0 } func (f *fuseImpl) loadHandle(fh uint64) (*info, int) { @@ -618,7 +618,7 @@ func (f *fuseImpl) openHandle(path string, create, append bool) (nfe *info, e *f if shuttingDown { return nil, nil, -fuse.ECANCELED } - conn, err := f.pool.get(f.ctx) + conn, err := f.pool.get() ec := f.errToFuseErr(err) if ec < 0 { return nil, nil, ec @@ -626,7 +626,7 @@ func (f *fuseImpl) openHandle(path string, create, append bool) (nfe *info, e *f defer func() { if errCode != 0 { - f.pool.put(f.ctx, conn) + f.pool.put(conn) } }() @@ -647,29 +647,31 @@ func (f *fuseImpl) openHandle(path string, create, append bool) (nfe *info, e *f } } + f.Lock() + fh := f.nextHandle + f.nextHandle++ nfe = &info{ fuseImpl: f, path: path, - fh: f.nextHandle, + fh: fh, conn: conn, + entry: *e, } if append { nfe.wof = e.Size } - f.Lock() - f.current[f.nextHandle] = nfe - f.nextHandle++ + f.current[fh] = nfe f.Unlock() return nfe, e, 0 } func (f *fuseImpl) withConn(fn func(conn *ftp.ServerConn) error) error { - conn, err := f.pool.get(f.ctx) + conn, err := f.pool.get() if err != nil { return err } err = fn(conn) - f.pool.put(f.ctx, conn) + f.pool.put(conn) return err } diff --git a/pkg/fs/ftp_test.go b/pkg/fs/ftp_test.go index ec1a3c9..81701f6 100644 --- a/pkg/fs/ftp_test.go +++ b/pkg/fs/ftp_test.go @@ -18,8 +18,6 @@ import ( "os/exec" "path/filepath" "runtime" - "sort" - "strings" "sync" "testing" "time" @@ -28,7 +26,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/datawire/dlib/dlog" server "github.com/datawire/go-ftpserver" ) @@ -43,104 +40,9 @@ func TestMain(m *testing.M) { m.Run() } -type tbWrapper struct { - testing.TB - level dlog.LogLevel - fields map[string]any -} - -type tbWriter struct { - *tbWrapper - l dlog.LogLevel -} - -func (w *tbWriter) Write(data []byte) (n int, err error) { - w.Helper() - w.Log(w.l, strings.TrimSuffix(string(data), "\n")) // strip trailing newline if present, since the Log() call appends a newline - return len(data), nil -} - -func NewTestLogger(t testing.TB, level dlog.LogLevel) dlog.Logger { - return &tbWrapper{TB: t, level: level} -} - -func (w *tbWrapper) StdLogger(l dlog.LogLevel) *log.Logger { - return log.New(&tbWriter{tbWrapper: w, l: l}, "", 0) -} - -func (w *tbWrapper) WithField(key string, value any) dlog.Logger { - ret := tbWrapper{ - TB: w.TB, - fields: make(map[string]any, len(w.fields)+1), - } - for k, v := range w.fields { - ret.fields[k] = v - } - ret.fields[key] = value - return &ret -} - -func (w *tbWrapper) Log(level dlog.LogLevel, msg string) { - if level > w.level { - return - } - w.Helper() - w.UnformattedLog(level, msg) -} - -func (w *tbWrapper) MaxLevel() dlog.LogLevel { - return w.level -} - -func (w *tbWrapper) UnformattedLog(level dlog.LogLevel, args ...any) { - if level > w.level { - return - } - w.Helper() - sb := strings.Builder{} - sb.WriteString(time.Now().Format("15:04:05.0000")) - for _, arg := range args { - sb.WriteString(" ") - fmt.Fprint(&sb, arg) - } - - if len(w.fields) > 0 { - parts := make([]string, 0, len(w.fields)) - for k := range w.fields { - parts = append(parts, k) - } - sort.Strings(parts) - - for i, k := range parts { - if i > 0 { - sb.WriteString(" ") - } - fmt.Fprintf(&sb, "%s=%#v", k, w.fields[k]) - } - } - w.TB.Log(sb.String()) -} - -func (w *tbWrapper) UnformattedLogf(level dlog.LogLevel, format string, args ...any) { - if level > w.level { - return - } - w.Helper() - w.UnformattedLog(level, fmt.Sprintf(format, args...)) -} - -func (w *tbWrapper) UnformattedLogln(level dlog.LogLevel, args ...any) { - if level > w.level { - return - } - w.Helper() - w.UnformattedLog(level, fmt.Sprintln(args...)) -} - func testContext(t *testing.T) context.Context { - lr := logrus.New() - lr.Level = logrus.InfoLevel - lr.SetFormatter(&logrus.TextFormatter{ + logrus.SetLevel(logrus.InfoLevel) + logrus.SetFormatter(&logrus.TextFormatter{ ForceColors: false, DisableColors: true, ForceQuote: false, @@ -157,7 +59,7 @@ func testContext(t *testing.T) context.Context { FieldMap: nil, CallerPrettyfier: nil, }) - return dlog.WithLogger(context.Background(), dlog.WrapLogrus(lr)) + return context.Background() } const remoteDir = "exported" @@ -332,7 +234,7 @@ func TestBrokenConnection(t *testing.T) { require.NoError(t, os.WriteFile(filepath.Join(root, "test1.txt"), contents, 0644)) // Assign the new address to the FTP client (it should now quit all connections and reconnect) - require.NoError(t, fsh.SetAddress(ctx, netip.MustParseAddrPort(fmt.Sprintf("127.0.0.1:%d", port)))) + require.NoError(t, fsh.SetAddress(netip.MustParseAddrPort(fmt.Sprintf("127.0.0.1:%d", port)))) // Ensure that the connection is restored test1Mounted, err = os.ReadFile(filepath.Join(mountPoint, "test1.txt")) diff --git a/pkg/fs/fuse.go b/pkg/fs/fuse.go index f7f9f81..a0cecad 100644 --- a/pkg/fs/fuse.go +++ b/pkg/fs/fuse.go @@ -7,9 +7,8 @@ import ( "sync" "time" + "github.com/sirupsen/logrus" "github.com/winfsp/cgofuse/fuse" - - "github.com/datawire/dlib/dlog" ) // FuseHost wraps a fuse.FileSystemHost and adds Start/Stop semantics @@ -48,7 +47,7 @@ func (fh *FuseHost) Start(ctx context.Context, startTimeout time.Duration) error startCtx, startCancel := context.WithTimeout(ctx, startTimeout) defer startCancel() go fh.detectFuseStarted(startCtx, started) - if dlog.MaxLogLevel(ctx) >= dlog.LogLevelDebug { + if logrus.GetLevel() >= logrus.DebugLevel { opts = append(opts, "-o", "debug") } @@ -71,7 +70,7 @@ func (fh *FuseHost) Start(ctx context.Context, startTimeout time.Duration) error fh.host.Unmount() case mountResult := <-mCh: if !mountResult { - dlog.Errorf(ctx, "fuse mount of %s failed", fh.mountPoint) + logrus.Errorf("fuse mount of %s failed", fh.mountPoint) } } }() diff --git a/pkg/fs/fuse_started_unix.go b/pkg/fs/fuse_started_unix.go index 5eb7fe0..e37166c 100644 --- a/pkg/fs/fuse_started_unix.go +++ b/pkg/fs/fuse_started_unix.go @@ -8,9 +8,8 @@ import ( "fmt" "time" + "github.com/sirupsen/logrus" "golang.org/x/sys/unix" - - "github.com/datawire/dlib/dlog" ) func (fh *FuseHost) detectFuseStarted(ctx context.Context, started chan error) { @@ -42,7 +41,7 @@ func (fh *FuseHost) detectFuseStarted(ctx context.Context, started chan error) { var mountSt unix.Stat_t if err := unix.Stat(fh.mountPoint, &mountSt); err != nil { // we don't consider a failure to stat an error here, just a cause for a retry. - dlog.Debugf(ctx, "unable to stat mount point %q: %v", fh.mountPoint, err) + logrus.Debugf("unable to stat mount point %q: %v", fh.mountPoint, err) } else { if st.Ino != mountSt.Ino || st.Dev != mountSt.Dev { return diff --git a/pkg/main/main.go b/pkg/main/main.go index 732059c..8924160 100644 --- a/pkg/main/main.go +++ b/pkg/main/main.go @@ -18,7 +18,6 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" - "github.com/datawire/dlib/dlog" "github.com/datawire/go-fuseftp/pkg/fs" "github.com/datawire/go-fuseftp/rpc" ) @@ -54,15 +53,13 @@ func addrPort(ap *rpc.AddressAndPort) (netip.AddrPort, error) { } func (s *service) Mount(_ context.Context, rq *rpc.MountRequest) (*rpc.MountIdentifier, error) { - logger := logrus.New() if rq.LogLevel != "" { lvl, err := logrus.ParseLevel(rq.LogLevel) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } - logger.SetLevel(lvl) + logrus.SetLevel(lvl) } - ctx := dlog.WithLogger(s.ctx, dlog.WrapLogrus(logger)) s.Lock() defer s.Unlock() @@ -77,7 +74,7 @@ func (s *service) Mount(_ context.Context, rq *rpc.MountRequest) (*rpc.MountIden if err != nil { return nil, err } - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(s.ctx) fi, err := fs.NewFTPClient(ctx, ap, rq.Directory, rq.ReadTimeout.AsDuration()) if err != nil { cancel() @@ -127,7 +124,7 @@ func (s *service) SetFtpServer(_ context.Context, rq *rpc.SetFtpServerRequest) ( if err != nil { return nil, err } - if err = m.ftpClient.SetAddress(s.ctx, ap); err != nil { + if err = m.ftpClient.SetAddress(ap); err != nil { return nil, err } return &emptypb.Empty{}, err