Skip to content

Commit

Permalink
feat: push artifact data one at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Jan 29, 2024
1 parent f4fc416 commit 46bf1a2
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 65 deletions.
3 changes: 2 additions & 1 deletion job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func (j *Job) init() {
}

j.Context = j.Context.WithObject(v1.ObjectMeta{
Name: j.Name,
Name: j.Name,
Namespace: j.Context.GetNamespace(),
Annotations: map[string]string{
"debug": lo.Ternary(j.Debug, "true", "false"),
"trace": lo.Ternary(j.Trace, "true", "false"),
Expand Down
2 changes: 1 addition & 1 deletion upstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewUpstreamClient(config UpstreamConfig) *UpstreamClient {

// Push uploads the given push message to the upstream server.
func (t *UpstreamClient) PushArtifacts(ctx context.Context, artifactID uuid.UUID, reader io.ReadCloser) error {
resp, err := t.R(ctx).Post("artifacts", reader)
resp, err := t.R(ctx).Post(fmt.Sprintf("artifacts/%s", artifactID), reader)
if err != nil {
return fmt.Errorf("error pushing to upstream: %w", err)
}
Expand Down
70 changes: 7 additions & 63 deletions upstream/jobs.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package upstream

import (
"archive/zip"
"fmt"
"io"
"os"
"path/filepath"

Expand Down Expand Up @@ -161,75 +159,21 @@ func SyncArtifactItems(ctx context.Context, config UpstreamConfig, artifactStore
}

for _, artifact := range artifacts {
topLevelPath := filepath.Join(artifactStoreLocalPath, artifact.Path)
e, err := os.Open(topLevelPath)
path := filepath.Join(artifactStoreLocalPath, artifact.Path)
f, err := os.Open(path)
if err != nil {
return 0, fmt.Errorf("failed to read local artifact store: %w", err)
return count, fmt.Errorf("failed to read local artifact store: %w", err)
}

path := filepath.Join(topLevelPath, e.Name())
archivedPath, err := zipDir(path)
if err != nil {
return 0, fmt.Errorf("failed to zip dir (%s): %w", e.Name(), err)
}

f, err := os.Open(archivedPath)
if err != nil {
return 0, fmt.Errorf("failed to open archived path (%s): %w", archivedPath, err)
if err := client.PushArtifacts(ctx, artifact.ID, f); err != nil {
return count, fmt.Errorf("failed to push artifact (%s): %w", f.Name(), err)
}

if err := client.PushArtifacts(ctx, artifact.ID, f); err != nil {
return 0, fmt.Errorf("failed to push artifact (%s): %w", e.Name(), err)
if err := ctx.DB().Model(&models.Artifact{}).Where("id = ?", artifact.ID).Update("is_data_pushed", true).Error; err != nil {
return 0, fmt.Errorf("failed to update is_pushed on artifacts: %w", err)
}

// Send to upstream
count++
}
}
}

func zipDir(dirPath string) (string, error) {
outputPath := fmt.Sprintf("%s.zip", dirPath)
outputFile, err := os.Create(outputPath)
if err != nil {
return "", err
}
defer outputFile.Close()

w := zip.NewWriter(outputFile)
defer w.Close()

walker := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

if info.IsDir() {
return nil
}

file, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open %s: %w", path, err)
}
defer file.Close()

relativename, err := filepath.Rel(dirPath, path)
if err != nil {
return fmt.Errorf("failed to generate relative path (%s, %s): %w", dirPath, path, err)
}

f, err := w.Create(relativename)
if err != nil {
return fmt.Errorf("failed to create %s on zip: %w", relativename, err)
}

if _, err = io.Copy(f, file); err != nil {
return fmt.Errorf("failed to write %s to zip: %w", relativename, err)
}

return nil
}

return outputPath, filepath.Walk(dirPath, walker)
}

0 comments on commit 46bf1a2

Please sign in to comment.