Skip to content
Open
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.26.2

require (
github.com/go-git/go-billy/v6 v6.0.0-alpha.1
github.com/go-git/go-git/v6 v6.0.0-alpha.3
github.com/go-git/go-git/v6 v6.0.0-alpha.4.0.20260521151600-590487407c38
github.com/spf13/cobra v1.10.2
github.com/stretchr/testify v1.11.1
github.com/zalando/go-keyring v0.2.8
Expand All @@ -26,8 +26,8 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sergi/go-diff v1.4.0 // indirect
github.com/spf13/pflag v1.0.9 // indirect
golang.org/x/crypto v0.50.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/crypto v0.51.0 // indirect
golang.org/x/net v0.54.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.44.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ github.com/go-git/gcfg/v2 v2.0.2 h1:MY5SIIfTGGEMhdA7d7JePuVVxtKL7Hp+ApGDJAJ7dpo=
github.com/go-git/gcfg/v2 v2.0.2/go.mod h1:/lv2NsxvhepuMrldsFilrgct6pxzpGdSRC13ydTLSLs=
github.com/go-git/go-billy/v6 v6.0.0-alpha.1 h1:xVjAR4oUvrKy7/Xuw/lLlV3gkxR3KO2H8W+MamuVVsQ=
github.com/go-git/go-billy/v6 v6.0.0-alpha.1/go.mod h1:eaCUpHbedW7//EwcYmUDfJe2N6sJC9O12AT0OTqJR1E=
github.com/go-git/go-git-fixtures/v6 v6.0.0-20260422085740-0c07409f52ec h1:FpCNUs50xfQyJJs31uO3mDnqU855OhzAzfkkTgE6/DI=
github.com/go-git/go-git-fixtures/v6 v6.0.0-20260422085740-0c07409f52ec/go.mod h1:F1SpxOny2UYXu62DzjEH4UqBjk4AoGs27cA8I9buK+o=
github.com/go-git/go-git/v6 v6.0.0-alpha.3 h1:lJGritJ5AcC0X7buV0lReZ4cEHqcKB3Ab2ZjD3Ku+Ss=
github.com/go-git/go-git/v6 v6.0.0-alpha.3/go.mod h1:DGnqu+twdAgtDx/4tQTWFrVE1an+2ACph3W9yOfSJZM=
github.com/go-git/go-git-fixtures/v6 v6.0.0-alpha.1 h1:gmqi2jvsreu0s8JMLylYDFq4sbjHwwlhktMw0DUg3mA=
github.com/go-git/go-git-fixtures/v6 v6.0.0-alpha.1/go.mod h1:ECf1MqJlBdYpKggBrOXjo/0EnvRZx6D++I86UYjPgAQ=
github.com/go-git/go-git/v6 v6.0.0-alpha.4.0.20260521151600-590487407c38 h1:uA2L2RZQTkmvHjzBqMNMFR+UWdjicJBc0UqhCrgodZs=
github.com/go-git/go-git/v6 v6.0.0-alpha.4.0.20260521151600-590487407c38/go.mod h1:4ODa/G7hPWrh4Y+7lmt59Ij3zW38IEfvRoAZxLYYBhc=
github.com/godbus/dbus/v5 v5.2.2 h1:TUR3TgtSVDmjiXOgAAyaZbYmIeP3DPkld3jgKGV8mXQ=
github.com/godbus/dbus/v5 v5.2.2/go.mod h1:3AAv2+hPq5rdnr5txxxRwiGjPXamgoIHgz9FPBfOp3c=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand Down Expand Up @@ -59,18 +59,18 @@ github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD
github.com/zalando/go-keyring v0.2.8 h1:6sD/Ucpl7jNq10rM2pgqTs0sZ9V3qMrqfIIy5YPccHs=
github.com/zalando/go-keyring v0.2.8/go.mod h1:tsMo+VpRq5NGyKfxoBVjCuMrG47yj8cmakZDO5QGii0=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w=
golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY=
golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY=
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
golang.org/x/term v0.43.0 h1:S4RLU2sB31O/NCl+zFN9Aru9A/Cq2aqKpTZJ6B+DwT4=
golang.org/x/term v0.43.0/go.mod h1:lrhlHNdQJHO+1qVYiHfFKVuVioJIheAc3fBSMFYEIsk=
golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
162 changes: 160 additions & 2 deletions internal/gitproto/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"os"
"strings"
"sync/atomic"
"time"

"github.com/go-git/go-git/v6/plumbing"
"github.com/go-git/go-git/v6/plumbing/format/packfile"
Expand Down Expand Up @@ -199,6 +201,15 @@ func sendReceivePack(
}

// PushObjects pushes locally-materialized objects to the target.
//
// Delta selection runs synchronously up front via
// packfile.DeltaSelector. The selected objects are then handed back to
// a packfile.Encoder behind a passthrough ObjectSelector, so the
// encoder's write phase (Encode → encode(objects)) streams pack bytes
// continuously into an io.Pipe to the HTTP request body. This avoids
// the mid-stream stall that occurs when Encode runs selection itself —
// CDN edges treat the resulting idle gap as a stalled upload and close
// the connection. See go-git PR #2142 for the API hook.
func PushObjects(
ctx context.Context,
conn Conn,
Expand All @@ -217,12 +228,24 @@ func PushObjects(
return sendReceivePack(ctx, conn, req, nil, verbose, onRejection)
}

progressDest := progressSink(verbose, "target: ", conn.ProgressWriter())

stopSelect := startSelectionProgress(progressDest)
objects, err := packfile.NewDeltaSelector(store).ObjectsToPack(hashes, 10)
stopSelect(len(objects), err)
if err != nil {
return fmt.Errorf("select objects to pack: %w", err)
Comment thread
Soph marked this conversation as resolved.
}

useRefDeltas := !adv.Capabilities.Supports(capability.OFSDelta)
pr, pw := io.Pipe()
done := make(chan error, 1)

go func() {
enc := packfile.NewEncoder(pw, store, useRefDeltas)
cw := &countingWriter{w: pw}
stopWrite := startPackWriteProgress(cw, progressDest)
defer stopWrite()
enc := packfile.NewEncoder(cw, store, useRefDeltas,
packfile.WithObjectSelector(precomputedSelector{objects: objects}))
Comment thread
Soph marked this conversation as resolved.
if _, err := enc.Encode(hashes, 10); err != nil {
done <- pw.CloseWithError(fmt.Errorf("encode packfile: %w", err))
return
Expand All @@ -239,6 +262,141 @@ func PushObjects(
return encodeErr
}

// precomputedSelector is a packfile.ObjectSelector that returns a
// fixed []*packfile.ObjectToPack, ignoring its arguments. It is the
// passthrough used by PushObjects to feed pre-selected objects back
// into packfile.Encoder via WithObjectSelector. Used exactly once per
// PushObjects call and not exposed outside this package.
type precomputedSelector struct {
objects []*packfile.ObjectToPack
}

func (p precomputedSelector) ObjectsToPack(_ []plumbing.Hash, _ uint) ([]*packfile.ObjectToPack, error) {
return p.objects, nil
}

// countingWriter wraps an io.Writer and tracks total bytes written.
// The count is read by the progress ticker concurrently with the
// encoder's writes, so the counter is atomic.
type countingWriter struct {
w io.Writer
n atomic.Int64
}

func (cw *countingWriter) Write(p []byte) (int, error) {
n, err := cw.w.Write(p)
cw.n.Add(int64(n))
if err != nil {
return n, fmt.Errorf("counting writer: %w", err)
}
return n, nil
}

func (cw *countingWriter) Count() int64 { return cw.n.Load() }

// startSelectionProgress emits in-place "selecting deltas, elapsed X"
// updates every 500ms during the synchronous delta-selection phase of
// PushObjects. The returned stop function takes the number of selected
// objects and the selection error (nil on success); on success it
// finalizes the line with a permanent "selected N objects in Y"
// summary, on error it just stops the ticker without claiming success.
// When dest is nil (non-verbose mode) returns a no-op stop, so
// callers don't need to special-case verbosity.
//
// Selection has no observable byte progress — go-git's DeltaSelector
// is opaque to the caller — so elapsed time is the only signal we can
// surface to keep long selections from looking like a hang.
func startSelectionProgress(dest io.Writer) func(objectCount int, err error) {
if dest == nil {
return func(int, error) {}
}
start := time.Now()
ticker := time.NewTicker(500 * time.Millisecond)
stop := make(chan struct{})
done := make(chan struct{})
go func() {
defer close(done)
for {
select {
case <-stop:
return
case <-ticker.C:
fmt.Fprintf(dest, "selecting deltas, elapsed %s\r",
time.Since(start).Round(time.Second))
}
}
}()
return func(objectCount int, err error) {
ticker.Stop()
close(stop)
<-done
if err != nil {
return
}
fmt.Fprintf(dest, "selected %d objects in %s\n",
objectCount, time.Since(start).Round(time.Second))
}
}

// startPackWriteProgress emits in-place "encoding pack: N MB, elapsed
// X" updates every 500ms while the encoder writes pack bytes through
// cw. The returned stop function finalizes the line with a permanent
// "encoded pack" summary. Single-use, typically via defer. When dest
// is nil returns a no-op stop.
//
// This is the second of two phases visible to a materialized push:
// startSelectionProgress runs synchronously first, then
// startPackWriteProgress takes over once selection has completed and
// the encoder begins streaming bytes to the request body.
func startPackWriteProgress(cw *countingWriter, dest io.Writer) func() {
if dest == nil {
return func() {}
}
start := time.Now()
ticker := time.NewTicker(500 * time.Millisecond)
stop := make(chan struct{})
done := make(chan struct{})
go func() {
defer close(done)
for {
select {
case <-stop:
return
case <-ticker.C:
fmt.Fprintf(dest, "encoding pack: %s, elapsed %s\r",
humanizeBytes(cw.Count()), time.Since(start).Round(time.Second))
}
}
}()
return func() {
ticker.Stop()
close(stop)
<-done
fmt.Fprintf(dest, "encoded pack: %s in %s\n",
humanizeBytes(cw.Count()), time.Since(start).Round(time.Second))
}
}

// humanizeBytes renders n in IEC units with one decimal place for KB+
// (e.g. "47.3 MB"). Anything below 1 KB is shown as raw bytes.
func humanizeBytes(n int64) string {
const (
kb = 1024
mb = kb * 1024
gb = mb * 1024
)
switch {
case n < kb:
return fmt.Sprintf("%d B", n)
case n < mb:
return fmt.Sprintf("%.1f KB", float64(n)/float64(kb))
case n < gb:
return fmt.Sprintf("%.1f MB", float64(n)/float64(mb))
default:
return fmt.Sprintf("%.1f GB", float64(n)/float64(gb))
}
}

// PushPack pushes a pack stream (relay) to the target.
func PushPack(
ctx context.Context,
Expand Down
72 changes: 72 additions & 0 deletions internal/gitproto/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/go-git/go-git/v6/plumbing/protocol/capability"
"github.com/go-git/go-git/v6/plumbing/protocol/packp"
"github.com/go-git/go-git/v6/plumbing/transport"
"github.com/go-git/go-git/v6/storage/memory"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -227,6 +228,11 @@ func TestPushPackClosesPackOnContextCanceled(t *testing.T) {
}
}

// TestPushPackStartsHTTPBeforePackFullyRead asserts that PushPack — the
// relay path — keeps streaming source pack bytes through to the target
// with chunked encoding. Materialized push (PushObjects) gets the same
// property via precomputed delta selection; see
// TestPushObjectsStreamsBody.
func TestPushPackStartsHTTPBeforePackFullyRead(t *testing.T) {
started := make(chan struct{}, 1)
release := make(chan struct{})
Expand Down Expand Up @@ -276,6 +282,72 @@ func TestPushPackStartsHTTPBeforePackFullyRead(t *testing.T) {
}
}

// TestPushObjectsStreamsBody asserts that PushObjects sends a chunked
// receive-pack request — the streaming property is what avoids the
// mid-stream stall (delta selection runs before the body opens, so
// pack bytes flow continuously once writing starts). A request with
// no Transfer-Encoding: chunked would mean we've regressed to
// buffering the whole pack before sending.
func TestPushObjectsStreamsBody(t *testing.T) {
type observation struct {
transferEncoding []string
contentLength int64
bodyLen int64
}
observed := make(chan observation, 1)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
n, err := io.Copy(io.Discard, r.Body)
if err != nil {
t.Logf("drain request body: %v", err)
}
_ = r.Body.Close()
observed <- observation{
transferEncoding: r.TransferEncoding,
contentLength: r.ContentLength,
bodyLen: n,
}
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()

conn := connForServer(t, srv)
adv := &packp.AdvRefs{}
adv.Capabilities.Set(capability.OFSDelta)

err := PushObjects(context.Background(), conn, adv, []PushCommand{{
Name: "refs/heads/main",
New: plumbing.NewHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
}}, memory.NewStorage(), nil, false, nil)
if err != nil {
t.Fatalf("PushObjects: %v", err)
}

var obs observation
select {
case obs = <-observed:
case <-time.After(2 * time.Second):
t.Fatal("server did not receive request")
}

chunked := false
for _, te := range obs.transferEncoding {
if te == "chunked" {
chunked = true
break
}
}
if !chunked {
t.Errorf("Transfer-Encoding = %v, want to include \"chunked\"", obs.transferEncoding)
}
if obs.contentLength != -1 {
t.Errorf("Content-Length = %d, want -1 (unknown for chunked)", obs.contentLength)
}
if obs.bodyLen <= 0 {
t.Errorf("bodyLen = %d, want > 0", obs.bodyLen)
}
}

func TestBuildUpdateRequest(t *testing.T) {
adv := &packp.AdvRefs{}
adv.Capabilities.Set(capability.ReportStatus)
Expand Down
Loading
Loading