Skip to content

chore: trial implemention of resumable uploads #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 96 additions & 58 deletions httpclient/multipartrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/deploymenttheory/go-api-http-client/authenticationhandler"
Expand All @@ -22,6 +23,13 @@
"go.uber.org/zap"
)

// UploadState represents the state of an upload operation, including the last uploaded byte.
// This struct is used to track the progress of file uploads for resumable uploads and to resume uploads from the last uploaded byte.
type UploadState struct {
LastUploadedByte int64
sync.Mutex
}

// DoMultiPartRequest creates and executes a multipart/form-data HTTP request for file uploads and form fields.
// This function handles constructing the multipart request body, setting the necessary headers, and executing the request.
// It supports custom content types and headers for each part of the multipart request, and handles authentication and
Expand Down Expand Up @@ -85,59 +93,75 @@

log.Info("Executing multipart file upload request", zap.String("method", method), zap.String("endpoint", endpoint))

// body, contentType, err := createMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
// if err != nil {
// return nil, err
// }
url := c.APIHandler.ConstructAPIResourceEndpoint(endpoint, log)

// Call the helper function to create a streaming multipart request body
body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
if err != nil {
return nil, err
}
var resp *http.Response
var requestErr error

//logMultiPartRequestBody(body, log)
// Retry logic
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
// Create a context with a longer timeout
ctx, cancel := context.WithTimeout(context.Background(), c.clientConfig.ClientOptions.Timeout.CustomTimeout.Duration())
defer cancel()

url := c.APIHandler.ConstructAPIResourceEndpoint(endpoint, log)
body, contentType, err := createStreamingMultipartRequestBody(files, formDataFields, fileContentTypes, formDataPartHeaders, log)
if err != nil {
log.Error("Failed to create streaming multipart request body", zap.Error(err))

Check warning

Code scanning / gosec

Errors unhandled. Warning

Errors unhandled.
return nil, err
}

// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), c.clientConfig.ClientOptions.Timeout.CustomTimeout.Duration())
defer cancel()
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
log.Error("Failed to create HTTP request", zap.Error(err))

Check warning

Code scanning / gosec

Errors unhandled. Warning

Errors unhandled.
return nil, err
}

req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
log.Error("Failed to create HTTP request", zap.Error(err))
return nil, err
}
cookiejar.ApplyCustomCookies(req, c.clientConfig.ClientOptions.Cookies.CustomCookies, c.Logger)

cookiejar.ApplyCustomCookies(req, c.clientConfig.ClientOptions.Cookies.CustomCookies, c.Logger)
req.Header.Set("Content-Type", contentType)

req.Header.Set("Content-Type", contentType)
headerHandler := headers.NewHeaderHandler(req, c.Logger, c.APIHandler, c.AuthTokenHandler)
headerHandler.SetRequestHeaders(endpoint)
headerHandler.LogHeaders(c.clientConfig.ClientOptions.Logging.HideSensitiveData)

headerHandler := headers.NewHeaderHandler(req, c.Logger, c.APIHandler, c.AuthTokenHandler)
headerHandler.SetRequestHeaders(endpoint)
headerHandler.LogHeaders(c.clientConfig.ClientOptions.Logging.HideSensitiveData)
startTime := time.Now()

startTime := time.Now()
resp, requestErr = c.httpClient.Do(req)
duration := time.Since(startTime)

resp, err := c.httpClient.Do(req)
if err != nil {
log.Error("Failed to send request", zap.String("method", method), zap.String("endpoint", endpoint), zap.Error(err))
return nil, err
}
if requestErr != nil {
log.Error("Failed to send request", zap.String("method", method), zap.String("endpoint", endpoint), zap.Error(requestErr))

Check warning

Code scanning / gosec

Errors unhandled. Warning

Errors unhandled.
if attempt < maxRetries {
log.Info("Retrying request", zap.Int("attempt", attempt))
time.Sleep(2 * time.Second)
continue
}
return nil, requestErr
}

duration := time.Since(startTime)
log.Debug("Request sent successfully", zap.String("method", method), zap.String("endpoint", endpoint), zap.Int("status_code", resp.StatusCode), zap.Duration("duration", duration))
log.Debug("Request sent successfully", zap.String("method", method), zap.String("endpoint", endpoint), zap.Int("status_code", resp.StatusCode), zap.Duration("duration", duration))

if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return resp, response.HandleAPISuccessResponse(resp, out, log)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return resp, response.HandleAPISuccessResponse(resp, out, log)
}

// If status code indicates a server error, retry
if resp.StatusCode >= 500 && attempt < maxRetries {
log.Info("Retrying request due to server error", zap.Int("status_code", resp.StatusCode), zap.Int("attempt", attempt))
time.Sleep(2 * time.Second)
continue
}

return resp, response.HandleAPIErrorResponse(resp, log)
}

return resp, response.HandleAPIErrorResponse(resp, log)
return resp, requestErr
}

// createStreamingMultipartRequestBody creates a streaming multipart request body with the provided files and form fields.
// This function constructs the body of a multipart/form-data request using an io.Pipe, allowing the request to be sent in chunks.
// createStreamingMultipartRequestBody creates a streaming multipart request body with the provided files and form fields.
func createStreamingMultipartRequestBody(files map[string][]string, formDataFields map[string]string, fileContentTypes map[string]string, formDataPartHeaders map[string]http.Header, log logger.Logger) (io.Reader, string, error) {
pr, pw := io.Pipe()
writer := multipart.NewWriter(pw)
Expand Down Expand Up @@ -198,31 +222,31 @@
// - error: An error object indicating failure during the construction of the multipart request body. This could be due to issues
// such as file reading errors or multipart writer errors.

func createMultipartRequestBody(files map[string][]string, formDataFields map[string]string, fileContentTypes map[string]string, formDataPartHeaders map[string]http.Header, log logger.Logger) (*bytes.Buffer, string, error) {
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
// func createMultipartRequestBody(files map[string][]string, formDataFields map[string]string, fileContentTypes map[string]string, formDataPartHeaders map[string]http.Header, log logger.Logger) (*bytes.Buffer, string, error) {
// body := &bytes.Buffer{}
// writer := multipart.NewWriter(body)

for fieldName, filePaths := range files {
for _, filePath := range filePaths {
if err := addFilePart(writer, fieldName, filePath, fileContentTypes, formDataPartHeaders, log); err != nil {
return nil, "", err
}
}
}
// for fieldName, filePaths := range files {
// for _, filePath := range filePaths {
// if err := addFilePart(writer, fieldName, filePath, fileContentTypes, formDataPartHeaders, log); err != nil {
// return nil, "", err
// }
// }
// }

for key, val := range formDataFields {
if err := addFormField(writer, key, val, log); err != nil {
return nil, "", err
}
}
// for key, val := range formDataFields {
// if err := addFormField(writer, key, val, log); err != nil {
// return nil, "", err
// }
// }

if err := writer.Close(); err != nil {
log.Error("Failed to close writer", zap.Error(err))
return nil, "", err
}
// if err := writer.Close(); err != nil {
// log.Error("Failed to close writer", zap.Error(err))
// return nil, "", err
// }

return body, writer.FormDataContentType(), nil
}
// return body, writer.FormDataContentType(), nil
// }

// addFilePart adds a base64 encoded file part to the multipart writer with the provided field name and file path.
// This function opens the specified file, sets the appropriate content type and headers, and adds it to the multipart writer.
Expand All @@ -241,6 +265,9 @@
// Returns:
// - error: An error object indicating failure during the addition of the file part. This could be due to issues such as
// file reading errors or multipart writer errors.
//
// addFilePart adds a base64 encoded file part to the multipart writer with the provided field name and file path.
// addFilePart adds a base64 encoded file part to the multipart writer with the provided field name and file path.
func addFilePart(writer *multipart.Writer, fieldName, filePath string, fileContentTypes map[string]string, formDataPartHeaders map[string]http.Header, log logger.Logger) error {
file, err := os.Open(filePath)
if err != nil {
Expand Down Expand Up @@ -273,7 +300,8 @@
}

progressLogger := logUploadProgress(file, fileSize.Size(), log)
if err := chunkFileUpload(file, encoder, log, progressLogger); err != nil {
uploadState := &UploadState{}
if err := chunkFileUpload(file, encoder, log, progressLogger, uploadState); err != nil {
log.Error("Failed to copy file content", zap.String("filePath", filePath), zap.Error(err))
return err
}
Expand Down Expand Up @@ -347,13 +375,18 @@
// Returns:
// - error: An error object indicating failure during the file upload. This could be due to issues such as file reading errors
// or writer errors.
func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateProgress func(int64)) error {
//
// chunkFileUpload reads the file upload into chunks and writes it to the writer.
func chunkFileUpload(file *os.File, writer io.Writer, log logger.Logger, updateProgress func(int64), uploadState *UploadState) error {
const chunkSize = 1024 * 1024 // 1024 bytes * 1024 (1 MB)
buffer := make([]byte, chunkSize)
totalWritten := int64(0)
chunkWritten := int64(0)
fileName := filepath.Base(file.Name())

// Seek to the last uploaded byte
file.Seek(uploadState.LastUploadedByte, io.SeekStart)

Check warning

Code scanning / gosec

Errors unhandled. Warning

Errors unhandled.

for {
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
Expand All @@ -365,6 +398,10 @@

written, err := writer.Write(buffer[:n])
if err != nil {
// Save the state before returning the error
uploadState.Lock()
uploadState.LastUploadedByte += totalWritten
uploadState.Unlock()
return err
}

Expand Down Expand Up @@ -403,6 +440,7 @@

// Returns:
// - func(int64): A function that takes the number of bytes written as an argument and logs the upload progress.
// logUploadProgress logs the upload progress based on the percentage of the total file size.
func logUploadProgress(file *os.File, fileSize int64, log logger.Logger) func(int64) {
var uploaded int64 = 0
const logInterval = 5 // Log every 5% increment
Expand Down
Loading
Loading