Skip to content

Commit

Permalink
XHTTP: Add "stream-up" mode for client & server (#3994)
Browse files Browse the repository at this point in the history
  • Loading branch information
RPRX authored Nov 9, 2024
1 parent 94c02f0 commit bc4bf3d
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 77 deletions.
10 changes: 10 additions & 0 deletions infra/conf/transport_internet.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ type SplitHTTPConfig struct {
XPaddingBytes *Int32Range `json:"xPaddingBytes"`
Xmux Xmux `json:"xmux"`
DownloadSettings *StreamConfig `json:"downloadSettings"`
Mode string `json:"mode"`
}

type Xmux struct {
Expand Down Expand Up @@ -289,6 +290,14 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
muxProtobuf.CMaxReuseTimes.To = 128
}

switch c.Mode {
case "":
c.Mode = "auto"
case "auto", "packet-up", "stream-up":
default:
return nil, errors.New("unsupported mode: " + c.Mode)
}

config := &splithttp.Config{
Path: c.Path,
Host: c.Host,
Expand All @@ -299,6 +308,7 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
NoSSEHeader: c.NoSSEHeader,
XPaddingBytes: splithttpNewRandRangeConfig(c.XPaddingBytes),
Xmux: &muxProtobuf,
Mode: c.Mode,
}
var err error
if c.DownloadSettings != nil {
Expand Down
4 changes: 4 additions & 0 deletions transport/internet/splithttp/browser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
// has no fields because everything is global state :O)
type BrowserDialerClient struct{}

func (c *BrowserDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
panic("not implemented yet")
}

func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
conn, err := browser_dialer.DialGet(baseURL)
dummyAddr := &gonet.IPAddr{}
Expand Down
12 changes: 12 additions & 0 deletions transport/internet/splithttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type DialerClient interface {
// (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr)
// baseURL already contains sessionId
OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error)

// (ctx, baseURL) -> uploadWriter
// baseURL already contains sessionId
OpenUpload(context.Context, string) io.WriteCloser
}

// implements splithttp.DialerClient in terms of direct network connections
Expand All @@ -38,6 +42,14 @@ type DefaultDialerClient struct {
dialUploadConn func(ctxInner context.Context) (net.Conn, error)
}

func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
reader, writer := io.Pipe()
req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader)
req.Header = c.transportConfig.GetRequestHeader()
go c.client.Do(req)
return writer
}

func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
var remoteAddr gonet.Addr
var localAddr gonet.Addr
Expand Down
73 changes: 41 additions & 32 deletions transport/internet/splithttp/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions transport/internet/splithttp/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message Config {
RandRangeConfig xPaddingBytes = 8;
Multiplexing xmux = 9;
xray.transport.internet.StreamConfig downloadSettings = 10;
string mode = 11;
}

message RandRangeConfig {
Expand Down
84 changes: 46 additions & 38 deletions transport/internet/splithttp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me

httpClient, muxRes := getHTTPClient(ctx, dest, streamSettings)

var httpClient2 DialerClient
httpClient2 := httpClient
requestURL2 := requestURL
var muxRes2 *muxResource
var requestURL2 url.URL
if transportConfiguration.DownloadSettings != nil {
globalDialerAccess.Lock()
if streamSettings.DownloadSettings == nil {
Expand All @@ -279,27 +279,59 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
requestURL2.RawQuery = config2.GetNormalizedQuery()
}

maxUploadSize := scMaxEachPostBytes.roll()
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
// code relies on this behavior. Subtract 1 so that together with
// uploadWriter wrapper, exact size limits can be enforced
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))
reader, remoteAddr, localAddr, err := httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String())
if err != nil {
return nil, err
}

if muxRes != nil {
muxRes.OpenRequests.Add(1)
}
if muxRes2 != nil {
muxRes2.OpenRequests.Add(1)
}
closed := false

go func() {
if muxRes != nil {
defer muxRes.OpenRequests.Add(-1)
}
if muxRes2 != nil {
defer muxRes2.OpenRequests.Add(-1)
}
conn := splitConn{
writer: nil,
reader: reader,
remoteAddr: remoteAddr,
localAddr: localAddr,
onClose: func() {
if closed {
return
}
closed = true
if muxRes != nil {
muxRes.OpenRequests.Add(-1)
}
if muxRes2 != nil {
muxRes2.OpenRequests.Add(-1)
}
},
}

mode := transportConfiguration.Mode
if mode == "auto" && realityConfig != nil {
mode = "stream-up"
}
if mode == "stream-up" {
conn.writer = httpClient.OpenUpload(ctx, requestURL.String())
return stat.Connection(&conn), nil
}

maxUploadSize := scMaxEachPostBytes.roll()
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
// code relies on this behavior. Subtract 1 so that together with
// uploadWriter wrapper, exact size limits can be enforced
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))

conn.writer = uploadWriter{
uploadPipeWriter,
maxUploadSize,
}

go func() {
requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll()))
var requestCounter int64

Expand Down Expand Up @@ -352,30 +384,6 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
}
}()

httpClient3 := httpClient
requestURL3 := requestURL
if httpClient2 != nil {
httpClient3 = httpClient2
requestURL3 = requestURL2
}

reader, remoteAddr, localAddr, err := httpClient3.OpenDownload(context.WithoutCancel(ctx), requestURL3.String())
if err != nil {
return nil, err
}

writer := uploadWriter{
uploadPipeWriter,
maxUploadSize,
}

conn := splitConn{
writer: writer,
reader: reader,
remoteAddr: remoteAddr,
localAddr: localAddr,
}

return stat.Connection(&conn), nil
}

Expand Down
33 changes: 26 additions & 7 deletions transport/internet/splithttp/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
return
}

h.config.WriteResponseHeader(writer)

sessionId := ""
subpath := strings.Split(request.URL.Path[len(h.path):], "/")
if len(subpath) > 0 {
Expand Down Expand Up @@ -134,7 +136,26 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}

if seq == "" {
errors.LogInfo(context.Background(), "no seq on request:", request.URL.Path)
if h.config.Mode == "packet-up" {
errors.LogInfo(context.Background(), "stream-up mode is not allowed")
writer.WriteHeader(http.StatusBadRequest)
return
}
err = currentSession.uploadQueue.Push(Packet{
Reader: request.Body,
})
if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload (PushReader)")
writer.WriteHeader(http.StatusConflict)
} else {
writer.WriteHeader(http.StatusOK)
<-request.Context().Done()
}
return
}

if h.config.Mode == "stream-up" {
errors.LogInfo(context.Background(), "packet-up mode is not allowed")
writer.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -148,14 +169,14 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}

if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload")
errors.LogInfoInner(context.Background(), err, "failed to upload (ReadAll)")
writer.WriteHeader(http.StatusInternalServerError)
return
}

seqInt, err := strconv.ParseUint(seq, 10, 64)
if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload")
errors.LogInfoInner(context.Background(), err, "failed to upload (ParseUint)")
writer.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -166,12 +187,11 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
})

if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload")
errors.LogInfoInner(context.Background(), err, "failed to upload (PushPayload)")
writer.WriteHeader(http.StatusInternalServerError)
return
}

h.config.WriteResponseHeader(writer)
writer.WriteHeader(http.StatusOK)
} else if request.Method == "GET" {
responseFlusher, ok := writer.(http.Flusher)
Expand All @@ -195,8 +215,6 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
writer.Header().Set("Content-Type", "text/event-stream")
}

h.config.WriteResponseHeader(writer)

writer.WriteHeader(http.StatusOK)

responseFlusher.Flush()
Expand All @@ -223,6 +241,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req

conn.Close()
} else {
errors.LogInfo(context.Background(), "unsupported method: ", request.Method)
writer.WriteHeader(http.StatusMethodNotAllowed)
}
}
Expand Down
Loading

0 comments on commit bc4bf3d

Please sign in to comment.