Skip to content

Commit

Permalink
Merge pull request #4 from datawire/thallgren/offsets
Browse files Browse the repository at this point in the history
Handle mismatch in offset during write
  • Loading branch information
thallgren authored Apr 27, 2023
2 parents 17cf067 + ab2977b commit a7b2c3a
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 59 deletions.
135 changes: 86 additions & 49 deletions pkg/fs/ftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package fs
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"math"
"net/netip"
"net/textproto"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -74,7 +76,12 @@ type info struct {

// rr and of is used when reading data from a remote file
rr *ftp.Response
of uint64

// Current offset for read operations.
rof uint64

// Current offset for write operations.
wof uint64

// The writer is the writer side of a io.Pipe() used when writing data to a remote file.
writer io.WriteCloser
Expand Down Expand Up @@ -133,7 +140,7 @@ func (f *fuseImpl) SetAddress(ctx context.Context, addr netip.AddrPort) error {

func (f *fuseImpl) logPanic() {
if r := recover(); r != nil {
dlog.Error(f.ctx, derror.PanicToError(r))
dlog.Errorf(f.ctx, "%+v", derror.PanicToError(r))
}
}

Expand Down Expand Up @@ -233,10 +240,7 @@ func (f *fuseImpl) Mkdir(path string, mode uint32) int {
err := f.withConn(func(conn *ftp.ServerConn) error {
return conn.MakeDir(relpath(path))
})
if err != nil {
return f.errToFuseErr(err)
}
return 0
return f.errToFuseErr(err)
}

// Open ensures checks if the file exists, and if it doesn't, ensure that
Expand Down Expand Up @@ -284,20 +288,20 @@ func (f *fuseImpl) Read(path string, buff []byte, ofst int64, fh uint64) int {

of := uint64(ofst)

if fe.of != of && fe.rr != nil {
// This should normally not happen, but if it does, let's restart the read
if fe.rof != of && fe.rr != nil {
// Restart the read with new offset
_ = fe.rr.Close()
fe.rr = nil
}

if fe.rr == nil {
// Obtain the ftp.Response. It acts as an io.Reader
rr, err := fe.conn.RetrFrom(relpath(path), of)
if err != nil {
return f.errToFuseErr(err)
if errCode = f.errToFuseErr(err); errCode < 0 {
return errCode
}
fe.rr = rr
fe.of = of
fe.rof = of
}

bytesRead := 0
Expand All @@ -310,10 +314,12 @@ func (f *fuseImpl) Read(path string, buff []byte, ofst int64, fh uint64) int {
// Retain the ftp.Response until the file handle is released
break
}
return f.errToFuseErr(err)
if errCode = f.errToFuseErr(err); errCode < 0 {
return errCode
}
}
}
fe.of += uint64(bytesRead)
fe.rof += uint64(bytesRead)

// Errors are always negative and Read expects the number of bytes read to be returned here.
return bytesRead
Expand All @@ -339,8 +345,9 @@ func (f *fuseImpl) Readdir(path string, fill func(name string, stat *fuse.Stat_t
return errCode
}
es, err := fe.conn.List(relpath(path))
if err != nil {
return f.errToFuseErr(err)
errCode = f.errToFuseErr(err)
if errCode < 0 {
return errCode
}
for _, e := range es {
s := &fuse.Stat_t{}
Expand Down Expand Up @@ -405,8 +412,8 @@ func (f *fuseImpl) Truncate(path string, size int64, fh uint64) int {
if errCode < 0 {
return errCode
}
if err := fe.conn.StorFrom(relpath(path), bytes.NewReader(nil), uint64(size)); err != nil {
return f.errToFuseErr(err)
if errCode = f.errToFuseErr(fe.conn.StorFrom(relpath(path), bytes.NewReader(nil), uint64(size))); errCode < 0 {
return errCode
}
return 0
}
Expand All @@ -427,42 +434,52 @@ func (f *fuseImpl) Unlink(path string) int {
// data connection that is established to facilitate the write will remain open
// until the handle is released by a call to Release
func (f *fuseImpl) Write(path string, buf []byte, ofst int64, fh uint64) int {
dlog.Debugf(f.ctx, "Write(%s, sz=%d, off=%d, %d)", path, len(buf), ofst, fh)
defer f.logPanic()
// dlog.Debugf(f.ctx, "Write(%s, sz=%d, off=%d, %d)", path, len(buf), ofst, fh)
fe, errCode := f.loadHandle(fh)
if errCode < 0 {
return errCode
}
of := uint64(ofst)
var err error

pipeCopy := func(conn *ftp.ServerConn, of uint64) {
fe.wof = of
var reader io.ReadCloser
reader, fe.writer = io.Pipe()
fe.wg.Add(1)
go func() {
defer func() {
fe.wg.Done()
f.pool.put(f.ctx, conn)
}()
if err := conn.StorFrom(relpath(path), reader, of); err != nil {
dlog.Errorf(f.ctx, "error storing: %v", err)
}
}()
}

// A connection dedicated to the Write function is needed because there
// might be simultaneous Read and Write operations on the same file handle.
if fe.writer == nil {
var conn *ftp.ServerConn
if conn, err = f.pool.get(f.ctx); err == nil {
var reader io.Reader
reader, fe.writer = io.Pipe()

// start the pipe pumper. It ends when the fe.writer closes. That
// happens when Release is called
fe.wg.Add(1)
go func() {
defer func() {
fe.wg.Done()
f.pool.put(f.ctx, conn)
}()
if err := conn.StorFrom(relpath(path), reader, of); err != nil {
dlog.Errorf(f.ctx, "error storing: %v", err)
}
}()
conn, err := f.pool.get(f.ctx)
if errCode = f.errToFuseErr(err); errCode < 0 {
return errCode
}
}
if err != nil {
return f.errToFuseErr(err)

// start the pipe pumper. It ends when the fe.writer closes. That
// happens when Release is called
pipeCopy(conn, of)
} else if fe.wof != of {
// Drain and restart the write operation.
fe.writer.Close()
fe.wg.Wait()
pipeCopy(fe.conn, of)
}
n, err := fe.writer.Write(buf)
if err != nil {
return f.errToFuseErr(err)
if errCode = f.errToFuseErr(err); errCode < 0 {
n = errCode
} else {
fe.wof += uint64(n)
}
return n
}
Expand Down Expand Up @@ -510,12 +527,31 @@ func (f *fuseImpl) errToFuseErr(err error) int {
return 0
}

var tpe *textproto.Error
if errors.As(err, &tpe) {
if tpe.Code == ftp.StatusCommandOK {
return 0
}
switch tpe.Code {
case ftp.StatusClosingDataConnection:
return -fuse.ECONNABORTED
case ftp.StatusNotAvailable:
return -fuse.EADDRNOTAVAIL
case ftp.StatusCanNotOpenDataConnection:
return -fuse.ECONNREFUSED
case ftp.StatusTransfertAborted:
return -fuse.ECONNABORTED
case ftp.StatusInvalidCredentials:
return -fuse.EACCES
case ftp.StatusHostUnavailable:
return -fuse.EHOSTUNREACH
case ftp.StatusBadFileName:
return -fuse.EINVAL
}
}
em := err.Error()
switch {
case strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusCommandOK)):
return 0
case
strings.HasPrefix(em, fmt.Sprintf("%d ", ftp.StatusClosingDataConnection)),
strings.Contains(em, errConnAborted),
strings.Contains(em, errClosed):
return -fuse.ECONNABORTED
Expand Down Expand Up @@ -583,8 +619,9 @@ func (f *fuseImpl) openHandle(path string, create, append bool) (nfe *info, e *f
return nil, nil, -fuse.ECANCELED
}
conn, err := f.pool.get(f.ctx)
if err != nil {
return nil, nil, f.errToFuseErr(err)
ec := f.errToFuseErr(err)
if ec < 0 {
return nil, nil, ec
}

defer func() {
Expand All @@ -600,8 +637,8 @@ func (f *fuseImpl) openHandle(path string, create, append bool) (nfe *info, e *f
}

// Create an empty file to ensure that it can be created
if err = conn.Stor(relpath(path), bytes.NewReader(nil)); err != nil {
return nil, nil, f.errToFuseErr(err)
if ec = f.errToFuseErr(conn.Stor(relpath(path), bytes.NewReader(nil))); ec < 0 {
return nil, nil, ec
}
e = &ftp.Entry{
Name: filepath.Base(path),
Expand All @@ -617,7 +654,7 @@ func (f *fuseImpl) openHandle(path string, create, append bool) (nfe *info, e *f
conn: conn,
}
if append {
nfe.of = e.Size
nfe.wof = e.Size
}
f.Lock()
f.current[f.nextHandle] = nfe
Expand Down
24 changes: 15 additions & 9 deletions pkg/fs/ftp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,13 +602,6 @@ func TestManyLargeFiles(t *testing.T) {
root, port := startFTPServer(t, ctx, tmp, &wg)
require.NotEqual(t, uint16(0), port)

_, host, mountPoint := startFUSEHost(t, ctx, port, tmp)
t.Cleanup(func() {
host.Stop()
cancel()
wg.Wait()
})

const fileSize = 100 * 1024 * 1024
names := make([]string, manyLargeFilesCount)

Expand All @@ -629,6 +622,18 @@ func TestManyLargeFiles(t *testing.T) {
t.Fatal("failed attempting to create large files")
}

_, host, mountPoint := startFUSEHost(t, ctx, port, tmp)
stopped := false
stopFuse := func() {
if !stopped {
stopped = true
host.Stop()
cancel()
wg.Wait()
}
}
t.Cleanup(stopFuse)

// Using the local filesystem, read the remote files while writing new ones. All in parallel.
readWriteWg := &sync.WaitGroup{}
readWriteWg.Add(manyLargeFilesCount * 2)
Expand All @@ -651,6 +656,7 @@ func TestManyLargeFiles(t *testing.T) {
}(i)
}
readWriteWg.Wait()
stopFuse()

// Read files "on the remote server" and validate them.
readRemoteWg := &sync.WaitGroup{}
Expand Down Expand Up @@ -709,7 +715,7 @@ func validateLargeFile(name string, sz int) error {
return err
}
if st.Size() != int64(sz) {
return fmt.Errorf("file size differ. Expected %d, got %d", sz, st.Size())
return fmt.Errorf("file size of %s differ. Expected %d, got %d", name, sz, st.Size())
}
bf := bufio.NewReader(f)
qz := uint32(sz / 4)
Expand All @@ -724,7 +730,7 @@ func validateLargeFile(name string, sz int) error {
}
x := binary.BigEndian.Uint32(buf)
if i != x {
return fmt.Errorf("content differ at position %d: expected %d, got %d", i*4, i, x)
return fmt.Errorf("content of %s differ at position %d: expected %d, got %d", name, i*4, i, x)
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/fs/ftp_test_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (
"syscall"
)

const manyLargeFilesCount = 20
const manyLargeFilesCount = 12

var interruptableSysProcAttr *syscall.SysProcAttr = nil //nolint:gochecknoglobals // OS-specific constant

0 comments on commit a7b2c3a

Please sign in to comment.