diff --git a/cmd/tar-split/checksize.go b/cmd/tar-split/checksize.go index 0343682..49c2924 100644 --- a/cmd/tar-split/checksize.go +++ b/cmd/tar-split/checksize.go @@ -43,10 +43,11 @@ func CommandChecksize(c *cli.Context) { sp := storage.NewJSONPacker(packFh) fp := storage.NewDiscardFilePutter() - dissam, err := asm.NewInputTarStream(fh, sp, fp) + dissam, done, err := asm.NewInputTarStreamWithDone(fh, sp, fp) if err != nil { log.Fatal(err) } + defer dissam.Close() var num int tr := tar.NewReader(dissam) @@ -64,6 +65,9 @@ func CommandChecksize(c *cli.Context) { } } fmt.Printf(" -- number of files: %d\n", num) + if derr := <-done; derr != nil { + log.Fatal(derr) + } if err := packFh.Sync(); err != nil { log.Fatal(err) diff --git a/cmd/tar-split/disasm.go b/cmd/tar-split/disasm.go index 3c8b5b8..7bb49f7 100644 --- a/cmd/tar-split/disasm.go +++ b/cmd/tar-split/disasm.go @@ -44,10 +44,11 @@ func CommandDisasm(c *cli.Context) { // we're passing nil here for the file putter, because the ApplyDiff will // handle the extraction of the archive - its, err := asm.NewInputTarStream(inputStream, metaPacker, nil) + its, done, err := asm.NewInputTarStreamWithDone(inputStream, metaPacker, nil) if err != nil { logrus.Fatal(err) } + defer its.Close() var out io.Writer if c.Bool("no-stdout") { out = io.Discard @@ -58,5 +59,8 @@ func CommandDisasm(c *cli.Context) { if err != nil { logrus.Fatal(err) } + if derr := <-done; derr != nil { + logrus.Fatal(derr) + } logrus.Infof("created %s from %s (read %d bytes)", c.String("output"), c.Args()[0], i) } diff --git a/tar/asm/assemble_test.go b/tar/asm/assemble_test.go index cfbcca6..caf1f4e 100644 --- a/tar/asm/assemble_test.go +++ b/tar/asm/assemble_test.go @@ -163,10 +163,11 @@ func TestTarStream(t *testing.T) { fgp := storage.NewBufferFileGetPutter() // wrap the disassembly stream - tarStream, err := NewInputTarStream(gzRdr, sp, fgp) + tarStream, done, err := NewInputTarStreamWithDone(gzRdr, sp, fgp) if err != nil { t.Fatal(err) } + defer tarStream.Close() // get a sum of the stream after it has passed through to ensure it's the same. h0 := sha1.New() @@ -174,6 +175,9 @@ func TestTarStream(t *testing.T) { if err != nil { t.Fatal(err) } + if derr := <-done; derr != nil { + t.Fatal(derr) + } if i != tc.expectedSize { t.Errorf("size of tar: expected %d; got %d", tc.expectedSize, i) @@ -227,15 +231,19 @@ func BenchmarkAsm(b *testing.B) { fgp := storage.NewBufferFileGetPutter() // wrap the disassembly stream - tarStream, err := NewInputTarStream(gzRdr, sp, fgp) + tarStream, done, err := NewInputTarStreamWithDone(gzRdr, sp, fgp) if err != nil { b.Fatal(err) } + defer tarStream.Close() // read it all to the bit bucket i1, err := io.Copy(io.Discard, tarStream) if err != nil { b.Fatal(err) } + if derr := <-done; derr != nil { + b.Fatal(derr) + } r := bytes.NewBuffer(w.Bytes()) sup := storage.NewJSONUnpacker(r) diff --git a/tar/asm/disassemble.go b/tar/asm/disassemble.go index 80c2522..a17b6ea 100644 --- a/tar/asm/disassemble.go +++ b/tar/asm/disassemble.go @@ -1,156 +1,237 @@ package asm import ( + "errors" "io" "github.com/vbatts/tar-split/archive/tar" "github.com/vbatts/tar-split/tar/storage" ) -// NewInputTarStream wraps the Reader stream of a tar archive and provides a -// Reader stream of the same. +// runInputTarStreamGoroutine is the goroutine entrypoint. // -// In the middle it will pack the segments and file metadata to storage.Packer -// `p`. +// It centralizes the goroutine protocol so the core parsing logic can be +// written as ordinary Go code that just "returns an error". // -// The the storage.FilePutter is where payload of files in the stream are -// stashed. If this stashing is not needed, you can provide a nil -// storage.FilePutter. Since the checksumming is still needed, then a default -// of NewDiscardFilePutter will be used internally -func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) { - // What to do here... folks will want their own access to the Reader that is - // their tar archive stream, but we'll need that same stream to use our - // forked 'archive/tar'. - // Perhaps do an io.TeeReader that hands back an io.Reader for them to read - // from, and we'll MITM the stream to store metadata. - // We'll need a storage.FilePutter too ... +// Protocol guarantees: +// - pW is always closed exactly once (CloseWithError(nil) == Close()). +// - if done != nil, exactly one value is sent (nil on success, non-nil on failure). +// - panics are converted into a non-nil error (and the panic is rethrown). +func runInputTarStreamGoroutine(outputRdr io.Reader, pW *io.PipeWriter, p storage.Packer, fp storage.FilePutter, done chan<- error) { + // Default to a non-nil error so a panic can't accidentally look like success. + err := errors.New("panic in runInputTarStream") + defer func() { + // CloseWithError(nil) is equivalent to Close(). + pW.CloseWithError(err) - // Another concern, whether to do any storage.FilePutter operations, such that we - // don't extract any amount of the archive. But then again, we're not making - // files/directories, hardlinks, etc. Just writing the io to the storage.FilePutter. - // Perhaps we have a DiscardFilePutter that is a bit bucket. + if done != nil { + done <- err + } - // we'll return the pipe reader, since TeeReader does not buffer and will - // only read what the outputRdr Read's. Since Tar archives have padding on - // the end, we want to be the one reading the padding, even if the user's - // `archive/tar` doesn't care. - pR, pW := io.Pipe() - outputRdr := io.TeeReader(r, pW) + // Preserve panic semantics while still ensuring the protocol above runs. + if r := recover(); r != nil { + panic(r) + } + }() - // we need a putter that will generate the crc64 sums of file payloads - if fp == nil { - fp = storage.NewDiscardFilePutter() - } + err = runInputTarStream(outputRdr, p, fp) +} - go func() { - tr := tar.NewReader(outputRdr) - tr.RawAccounting = true - for { - hdr, err := tr.Next() - if err != nil { - if err != io.EOF { - pW.CloseWithError(err) - return - } - // even when an EOF is reached, there is often 1024 null bytes on - // the end of an archive. Collect them too. - if b := tr.RawBytes(); len(b) > 0 { - _, err := p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: b, - }) - if err != nil { - pW.CloseWithError(err) - return - } - } - break // not return. We need the end of the reader. - } - if hdr == nil { - break // not return. We need the end of the reader. - } +// runInputTarStream drives tar-split parsing. +// +// It reads a tar stream from outputRdr and records tar-split metadata into the +// provided storage.Packer. +// +// Abort behavior: if the consumer closes the read end early, the tee reader will +// stop producing bytes (due to pipe write failure) and tar parsing will return +// an error. We propagate that error so the goroutine terminates promptly rather +// than draining the input stream for no benefit. +func runInputTarStream(outputRdr io.Reader, p storage.Packer, fp storage.FilePutter) error { + tr := tar.NewReader(outputRdr) + tr.RawAccounting = true + for { + hdr, err := tr.Next() + if err != nil { + if err != io.EOF { + return err + } + // Even when EOF is reached, there is often 1024 null bytes at the end + // of an archive. Collect them too. if b := tr.RawBytes(); len(b) > 0 { - _, err := p.AddEntry(storage.Entry{ + if _, err := p.AddEntry(storage.Entry{ Type: storage.SegmentType, Payload: b, - }) - if err != nil { - pW.CloseWithError(err) - return - } - } - - var csum []byte - if hdr.Size > 0 { - var err error - _, csum, err = fp.Put(hdr.Name, tr) - if err != nil { - pW.CloseWithError(err) - return + }); err != nil { + return err } } + break // Not return: we still need to drain any additional padding. + } + if hdr == nil { + break // Not return: we still need to drain any additional padding. + } - entry := storage.Entry{ - Type: storage.FileType, - Size: hdr.Size, - Payload: csum, + if b := tr.RawBytes(); len(b) > 0 { + if _, err := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: b, + }); err != nil { + return err } - // For proper marshalling of non-utf8 characters - entry.SetName(hdr.Name) + } - // File entries added, regardless of size - _, err = p.AddEntry(entry) + var csum []byte + if hdr.Size > 0 { + _, csum, err = fp.Put(hdr.Name, tr) if err != nil { - pW.CloseWithError(err) - return + return err } + } - if b := tr.RawBytes(); len(b) > 0 { - _, err = p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: b, - }) - if err != nil { - pW.CloseWithError(err) - return - } - } + entry := storage.Entry{ + Type: storage.FileType, + Size: hdr.Size, + Payload: csum, } + // For proper marshalling of non-utf8 characters + entry.SetName(hdr.Name) - // It is allowable, and not uncommon that there is further padding on - // the end of an archive, apart from the expected 1024 null bytes. We - // do this in chunks rather than in one go to avoid cases where a - // maliciously crafted tar file tries to trick us into reading many GBs - // into memory. - const paddingChunkSize = 1024 * 1024 - var paddingChunk [paddingChunkSize]byte - for { - var isEOF bool - n, err := outputRdr.Read(paddingChunk[:]) - if err != nil { - if err != io.EOF { - pW.CloseWithError(err) - return - } - isEOF = true + // File entries added, regardless of size + if _, err := p.AddEntry(entry); err != nil { + return err + } + + if b := tr.RawBytes(); len(b) > 0 { + if _, err := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: b, + }); err != nil { + return err } - if n != 0 { - _, err = p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: paddingChunk[:n], - }) - if err != nil { - pW.CloseWithError(err) - return - } + } + } + + // It is allowable, and not uncommon that there is further padding on + // the end of an archive, apart from the expected 1024 null bytes. We + // do this in chunks rather than in one go to avoid cases where a + // maliciously crafted tar file tries to trick us into reading many GBs + // into memory. + const paddingChunkSize = 1024 * 1024 + var paddingChunk [paddingChunkSize]byte + for { + n, err := outputRdr.Read(paddingChunk[:]) + if n != 0 { + if _, aerr := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: paddingChunk[:n], + }); aerr != nil { + return aerr } - if isEOF { + } + if err != nil { + if err == io.EOF { break } + return err } - pW.Close() - }() + } + + return nil +} + +// newInputTarStreamCommon sets up the shared plumbing for NewInputTarStream and +// NewInputTarStreamWithDone. +// +// It constructs an io.Pipe and an io.TeeReader such that: +// +// - The caller reads tar bytes from the returned *io.PipeReader. +// - The background goroutine simultaneously reads the same stream from the +// TeeReader to perform tar-split parsing and metadata packing. +// +// Abort and synchronization semantics: +// +// - Closing the returned PipeReader causes the TeeReader to fail its write to +// the pipe, which in turn causes the background goroutine to exit promptly. +// - If withDone is true, a done channel is returned that receives exactly one +// error value (nil on success) once the background goroutine has fully +// terminated. This allows callers to safely wait until the input reader `r` +// is no longer in use. +func newInputTarStreamCommon( + r io.Reader, + p storage.Packer, + fp storage.FilePutter, + done chan<- error, +) (pr *io.PipeReader) { + // What to do here... folks will want their own access to the Reader that is + // their tar archive stream, but we'll need that same stream to use our + // forked 'archive/tar'. + // Perhaps do an io.TeeReader that hands back an io.Reader for them to read + // from, and we'll MITM the stream to store metadata. + // We'll need a storage.FilePutter too ... + + // Another concern, whether to do any storage.FilePutter operations, such that we + // don't extract any amount of the archive. But then again, we're not making + // files/directories, hardlinks, etc. Just writing the io to the storage.FilePutter. + // Perhaps we have a DiscardFilePutter that is a bit bucket. - return pR, nil + // we'll return the pipe reader, since TeeReader does not buffer and will + // only read what the outputRdr Read's. Since Tar archives have padding on + // the end, we want to be the one reading the padding, even if the user's + // `archive/tar` doesn't care. + pr, pw := io.Pipe() + + if fp == nil { + fp = storage.NewDiscardFilePutter() + } + + outputRdr := io.TeeReader(r, pw) + go runInputTarStreamGoroutine(outputRdr, pw, p, fp, done) + + return pr +} + +// NewInputTarStream wraps the Reader stream of a tar archive and provides a +// Reader stream of the same. +// +// In the middle it will pack the segments and file metadata to storage.Packer +// `p`. +// +// The storage.FilePutter is where payload of files in the stream are +// stashed. If this stashing is not needed, you can provide a nil +// storage.FilePutter. Since the checksumming is still needed, then a default +// of NewDiscardFilePutter will be used internally +// +// If callers need to be able to abort early and/or wait for goroutine termination, +// prefer NewInputTarStreamWithDone. +// +// Deprecated: This leaves a goroutine around if the consumer aborts without consuming +// the whole stream, and does not allow the caller to know when r is safe to deallocate +// or when p has written everything. Use NewInputTarStreamWithDone instead. +func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) { + pr := newInputTarStreamCommon(r, p, fp, nil) + return pr, nil +} + +// NewInputTarStreamWithDone wraps the Reader stream of a tar archive and provides a +// Reader stream of the same. +// +// In the middle it will pack the segments and file metadata to storage.Packer `p`. +// +// It also returns a done channel that will receive exactly one error value +// (nil on success) when the internal goroutine has fully completed parsing +// the tar stream (including the final paddingChunk draining loop) and has +// finished writing all entries to `p`. +// +// The returned reader is an io.ReadCloser so callers can stop early; closing it +// aborts the pipe so the internal goroutine can terminate promptly (rather than +// hanging on a blocked pipe write). +// +// The caller is expected to consume the returned reader fully until EOF +// (not just the tar EOF marker); closing the returned reader earlier will +// cause the done channel to return a failure. +func NewInputTarStreamWithDone(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.ReadCloser, <-chan error, error) { + done := make(chan error, 1) + pr := newInputTarStreamCommon(r, p, fp, done) + return pr, done, nil } diff --git a/tar/asm/disassemble_test.go b/tar/asm/disassemble_test.go index 84a3e77..51f7cfc 100644 --- a/tar/asm/disassemble_test.go +++ b/tar/asm/disassemble_test.go @@ -2,10 +2,15 @@ package asm import ( "archive/tar" + "bytes" + "errors" "fmt" "io" "os" + "sync" + "sync/atomic" "testing" + "time" "github.com/vbatts/tar-split/tar/storage" ) @@ -66,3 +71,359 @@ func TestLargeJunkPadding(t *testing.T) { // At this point, if we haven't crashed then we are not vulnerable to // CVE-2017-14992. } + +// Mocked Packer storing entries and returning an error on demand. +type recordingPacker struct { + mu sync.Mutex + entries []storage.Entry + errAt int + err error + callNum int +} + +func (p *recordingPacker) AddEntry(e storage.Entry) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + // Return an aritifical error if we are instructed to do so. + p.callNum++ + if p.errAt > 0 && p.callNum == p.errAt { + if p.err == nil { + p.err = errors.New("packer error") + } + return 0, p.err + } + + // Copy payload because callers may reuse buffers. + cp := e + if e.Payload != nil { + cp.Payload = bytes.Clone(e.Payload) + } + p.entries = append(p.entries, cp) + return len(p.entries), nil +} + +func (p *recordingPacker) snapshot() []storage.Entry { + p.mu.Lock() + defer p.mu.Unlock() + out := make([]storage.Entry, len(p.entries)) + copy(out, p.entries) + return out +} + +// Mocked FilePutter +type recordingFilePutter struct { + mu sync.Mutex + puts []string +} + +func (fp *recordingFilePutter) Put(name string, r io.Reader) (int64, []byte, error) { + dataLen, err := io.Copy(io.Discard, r) + if err != nil { + return 0, nil, err + } + + fp.mu.Lock() + fp.puts = append(fp.puts, name) + fp.mu.Unlock() + + // Return a deterministic "checksum" based on content length. + csum := []byte(fmt.Sprintf("len=%d", dataLen)) + return dataLen, csum, nil +} + +// Helper function to generate the tar with optional extra padding. +func makeTarWithExtraPadding(t *testing.T, name string, content []byte, extraPadding int) []byte { + t.Helper() + + var buf bytes.Buffer + tw := tar.NewWriter(&buf) + + hdr := &tar.Header{ + Name: name, + Mode: 0o644, + Size: int64(len(content)), + } + if err := tw.WriteHeader(hdr); err != nil { + t.Fatalf("WriteHeader: %v", err) + } + if _, err := tw.Write(content); err != nil { + t.Fatalf("Write: %v", err) + } + if err := tw.Close(); err != nil { + t.Fatalf("Close tar writer: %v", err) + } + + out := buf.Bytes() + if extraPadding > 0 { + out = append(append([]byte(nil), out...), make([]byte, extraPadding)...) + } + return out +} + +// Helper function to wait until "done" for specific time. +func waitDone(t *testing.T, done <-chan error) error { + t.Helper() + select { + case err := <-done: + return err + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for done") + return errors.New("timeout") + } +} + +// closableBlockingReader simulates an io.Reader that can be "closed" while a Read is +// blocked. +// +// Behavior: +// - It serves bytes from data. +// - After it has served at least blockAfter bytes, the next Read blocks until either: +// - Unblock() is called, or +// - Close() is called (which also unblocks) and subsequent reads fail with errUnderlyingClosed. +type closableBlockingReader struct { + data []byte + pos int + blockAfter int + + closed atomic.Bool + + blockOnce sync.Once + blockCh chan struct{} // used to block/unblock reads + blockedCh chan struct{} // closed when we start blocking +} + +var errUnderlyingClosed = errors.New("underlying reader closed") + +func newClosableBlockingReader(data []byte, blockAfter int) *closableBlockingReader { + return &closableBlockingReader{ + data: data, + blockAfter: blockAfter, + blockCh: make(chan struct{}), + blockedCh: make(chan struct{}), + } +} + +func (r *closableBlockingReader) Read(p []byte) (int, error) { + if r.closed.Load() { + return 0, errUnderlyingClosed + } + if r.pos >= len(r.data) { + return 0, io.EOF + } + + // If we've reached the point where we should block, block before producing + // more data (simulates "reader got closed while goroutine is still running"). + if r.pos >= r.blockAfter { + r.blockOnce.Do(func() { close(r.blockedCh) }) + <-r.blockCh + if r.closed.Load() { + return 0, errUnderlyingClosed + } + } + + n := copy(p, r.data[r.pos:]) + r.pos += n + return n, nil +} + +func (r *closableBlockingReader) Close() error { + r.closed.Store(true) + // ensure blocked goroutine wakes up + select { + case <-r.blockCh: + // already closed/unblocked + default: + close(r.blockCh) + } + // signal blocked state even if we closed early + r.blockOnce.Do(func() { close(r.blockedCh) }) + return nil +} + +func (r *closableBlockingReader) Unblock() { + select { + case <-r.blockCh: + default: + close(r.blockCh) + } +} + +// Test that NewInputTarStreamWithDone signals done when we read everything. +func TestNewInputTarStreamWithDone(t *testing.T) { + input := makeTarWithExtraPadding(t, "file.txt", []byte("hello"), 4096) + + p := &recordingPacker{} + fp := &recordingFilePutter{} + + payload, done, err := NewInputTarStreamWithDone(bytes.NewReader(input), p, fp) + if err != nil { + t.Fatalf("NewInputTarStreamWithDone: %v", err) + } + defer payload.Close() + + got, rerr := io.ReadAll(payload) + if rerr != nil { + t.Fatalf("ReadAll(payload): %v", rerr) + } + if !bytes.Equal(got, input) { + t.Fatalf("payload bytes differ: got=%d bytes, want=%d bytes", len(got), len(input)) + } + + if derr := waitDone(t, done); derr != nil { + t.Fatalf("done returned error: %v", derr) + } + + entries := p.snapshot() + if len(entries) == 0 { + t.Fatalf("expected entries to be recorded") + } + + var ( + foundFile bool + foundSegment bool + ) + for _, e := range entries { + switch e.Type { + case storage.FileType: + foundFile = true + // We set size to len("hello") + if e.Size != int64(len("hello")) { + t.Fatalf("file entry size=%d, want=%d", e.Size, len("hello")) + } + case storage.SegmentType: + if len(e.Payload) > 0 { + foundSegment = true + } + } + } + if !foundFile { + t.Fatalf("expected at least one FileType entry") + } + if !foundSegment { + t.Fatalf("expected at least one SegmentType entry with payload") + } +} + +// Test that NewInputTarStreamWithDone works when underlying reader is closed while +// the NewInputTarStreamWithDone go-routine still runs. +func TestNewInputTarStreamWithDonUnderlyingClosed(t *testing.T) { + // Make a tar stream that is large enough that parsing won't finish in one tiny read. + input := makeTarWithExtraPadding(t, "file.txt", bytes.Repeat([]byte("A"), 64*1024), 0) + + // Block the underlying reader after it has produced some bytes. + // This ensures the tar-split goroutine will be mid-flight and will need more data. + under := newClosableBlockingReader(input, 4096) + + p := &recordingPacker{} + fp := storage.NewDiscardFilePutter() + + payload, done, err := NewInputTarStreamWithDone(under, p, fp) + if err != nil { + t.Fatalf("NewInputTarStreamWithDone: %v", err) + } + defer payload.Close() + + // Start draining payload in a separate goroutine so the internal goroutine is forced to read. + readErrCh := make(chan error, 1) + go func() { + _, rerr := io.ReadAll(payload) + readErrCh <- rerr + }() + + // Wait until the underlying reader starts blocking (i.e., internal goroutine progressed + // far enough to need more bytes). + select { + case <-under.blockedCh: + // good + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for underlying reader to enter blocked state") + } + + // Now trigger an error from the underlying closableBlockingReader while the tar-split + // goroutine is still running. + under.Close() + + // The tar-split goroutine should treat this as a non-EOF error, call fail(err), + // CloseWithError on the pipe, and signal done with the same error. + derr := waitDone(t, done) + if derr == nil { + t.Fatalf("expected done error, got nil") + } + if !errors.Is(derr, errUnderlyingClosed) { + t.Fatalf("done error=%v, want errors.Is(..., errUnderlyingClosed)=true", derr) + } + + // The consumer side should also observe an error (from the pipe). + select { + case rerr := <-readErrCh: + if rerr == nil { + t.Fatalf("expected reader error, got nil") + } + if !errors.Is(rerr, errUnderlyingClosed) { + t.Fatalf("reader error=%v, want errors.Is(..., errUnderlyingClosed)=true", rerr) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for payload read to finish") + } +} + +// Test that if the caller closes the returned reader without draining it, +// the background goroutine terminates promptly and the done channel is signaled. +func TestNewInputTarStreamWithDoneEarlyClose(t *testing.T) { + input := makeTarWithExtraPadding(t, "file.txt", []byte("hello"), 2048) + + p := &recordingPacker{} + fp := &recordingFilePutter{} + + payload, done, err := NewInputTarStreamWithDone(bytes.NewReader(input), p, fp) + if err != nil { + t.Fatalf("NewInputTarStreamWithDone: %v", err) + } + + // Close immediately without draining: this should abort the pipe and allow + // the goroutine to exit (rather than hanging). + if err := payload.Close(); err != nil { + t.Fatalf("payload.Close(): %v", err) + } + + // The goroutine should terminate and signal done (likely with a non-nil error + // due to the induced abort). + derr := waitDone(t, done) + if !errors.Is(derr, io.ErrClosedPipe) { + t.Fatalf("done error=%v, want io.ErrClosedPipe", derr) + } +} + +// Test that Packer error propagates to waitDone(). +func TestNewInputTarStreamWithDonePackerError(t *testing.T) { + input := makeTarWithExtraPadding(t, "file.txt", []byte("hello"), 0) + + packerErr := errors.New("boom") + p := &recordingPacker{errAt: 2, err: packerErr} // fail early during AddEntry + fp := &recordingFilePutter{} + + payload, done, err := NewInputTarStreamWithDone(bytes.NewReader(input), p, fp) + if err != nil { + t.Fatalf("NewInputTarStreamWithDone: %v", err) + } + defer payload.Close() + + // Reading should eventually return the packer error via CloseWithError. + _, rerr := io.ReadAll(payload) + if rerr == nil { + t.Fatalf("expected reader error, got nil") + } + // The error returned from an io.PipeReader may wrap; check with errors.Is. + if !errors.Is(rerr, packerErr) { + t.Fatalf("reader error=%v, want errors.Is(...,%v)=true", rerr, packerErr) + } + + derr := waitDone(t, done) + if derr == nil { + t.Fatalf("expected done error, got nil") + } + if !errors.Is(derr, packerErr) { + t.Fatalf("done error=%v, want errors.Is(...,%v)=true", derr, packerErr) + } +} diff --git a/tar/asm/iterate.go b/tar/asm/iterate.go index 8a65887..9db3ab5 100644 --- a/tar/asm/iterate.go +++ b/tar/asm/iterate.go @@ -11,7 +11,7 @@ import ( // IterateHeaders calls handler for each tar header provided by Unpacker func IterateHeaders(unpacker storage.Unpacker, handler func(hdr *tar.Header) error) error { - // We assume about NewInputTarStream: + // We assume about NewInputTarStreamWithDone: // - There is a separate SegmentType entry for every tar header, but only one SegmentType entry for the full header incl. any extensions // - (There is a FileType entry for every tar header, we ignore it) // - Trailing padding of a file, if any, is included in the next SegmentType entry diff --git a/tar/asm/iterate_test.go b/tar/asm/iterate_test.go index 884c019..1d10f07 100644 --- a/tar/asm/iterate_test.go +++ b/tar/asm/iterate_test.go @@ -75,10 +75,12 @@ func TestIterateHeaders(t *testing.T) { require.NoError(t, err) var tarSplit bytes.Buffer - tsReader, err := NewInputTarStream(&tarball, storage.NewJSONPacker(&tarSplit), storage.NewDiscardFilePutter()) + tsReader, done, err := NewInputTarStreamWithDone(&tarball, storage.NewJSONPacker(&tarSplit), storage.NewDiscardFilePutter()) require.NoError(t, err) + defer tsReader.Close() _, err = io.Copy(io.Discard, tsReader) require.NoError(t, err) + require.NoError(t, <-done) unpacker := storage.NewJSONUnpacker(&tarSplit) var actual []tar.Header