Skip to content

Commit

Permalink
feat: load git from s3 first
Browse files Browse the repository at this point in the history
Signed-off-by: Tianchu Zhao <[email protected]>
  • Loading branch information
tczhao committed Oct 11, 2024
1 parent cfa71ce commit 8c55a90
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 15 deletions.
3 changes: 3 additions & 0 deletions workflow/artifacts/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,19 @@ func (azblobDriver *ArtifactDriver) Load(artifact *wfv1.Artifact, path string) e
}
isEmptyFile = true
} else if !bloberror.HasCode(origErr, bloberror.BlobNotFound) {
_ = os.Remove(path)
return fmt.Errorf("unable to download blob %s: %s", artifact.Azure.Blob, origErr)
}

isDir, err := azblobDriver.IsDirectory(artifact)
if err != nil {
_ = os.Remove(path)
return fmt.Errorf("unable to determine if %s is a directory: %s", artifact.Azure.Blob, err)
}

// It's not a directory and the file doesn't exist, Return the original NoSuchKey error.
if !isDir && !isEmptyFile {
_ = os.Remove(path)
return argoerrors.New(argoerrors.CodeNotFound, origErr.Error())
}

Expand Down
87 changes: 72 additions & 15 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"io/fs"
"math"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -162,7 +163,6 @@ func (we *WorkflowExecutor) HandleError(ctx context.Context) {
func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
log.Infof("Start loading input artifacts...")
for _, art := range we.Template.Inputs.Artifacts {

log.Infof("Downloading artifact: %s", art.Name)

if !art.HasLocationOrKey() {
Expand All @@ -177,14 +177,6 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
if err != nil {
return err
}
driverArt, err := we.newDriverArt(&art)
if err != nil {
return fmt.Errorf("failed to load artifact '%s': %w", art.Name, err)
}
artDriver, err := we.InitDriver(ctx, driverArt)
if err != nil {
return err
}
// Determine the file path of where to load the artifact
var artPath string
mnt := common.FindOverlappingVolume(&we.Template, art.Path)
Expand All @@ -204,13 +196,78 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
// the file is a tarball or not. If it is, it is first extracted then renamed to
// the desired location. If not, it is simply renamed to the location.
tempArtPath := artPath + ".tmp"
err = artDriver.Load(driverArt, tempArtPath)
if err != nil {
if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
log.Infof("Skipping optional input artifact that was not found: %s", art.Name)
continue

proceed := true
gitLoopCount := 0
if art.Git != nil {
// if git artifact, try s3 first
for {
if gitLoopCount >= 3 || !proceed {
break
}
proceed = true
branch := "master"
if art.Git.Branch != "" {
branch = art.Git.Branch
}
repoString := art.Git.Repo[strings.LastIndex(art.Git.Repo, ":")+1:]
repoStringArray := strings.Split(strings.Replace(repoString, ".git", "", -1), "/")
repoString = repoStringArray[len(repoStringArray)-2] + "/" + repoStringArray[len(repoStringArray)-1]
s3Key := "git-artifacts/workflow/" + we.workflow + "/" + repoString + "/" + branch

artS3 := wfv1.Artifact{
ArtifactLocation: wfv1.ArtifactLocation{
S3: &wfv1.S3Artifact{
Key: s3Key,
},
},
}
log.Info(artS3)
driverArt, err := we.newDriverArt(&artS3)
if err != nil {
log.Warn(err)
} else {
artDriver, err := we.InitDriver(ctx, driverArt)
if err != nil {
log.Warn(err)
} else {
err = artDriver.Load(driverArt, tempArtPath)
if err != nil {
if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
log.Infof("Skipping optional input artifact that was not found: %s", artS3.Name)
continue
}
log.Warn(err)
} else {
proceed = false
}
}
}
baseDelay := 1 * time.Second
secRetry := math.Pow(2, float64(gitLoopCount))
delay := time.Duration(secRetry) * baseDelay
time.Sleep(delay)
gitLoopCount++
}
}
if proceed {
// other artifact
driverArt, err := we.newDriverArt(&art)
if err != nil {
return fmt.Errorf("failed to load artifact '%s': %w", art.Name, err)
}
artDriver, err := we.InitDriver(ctx, driverArt)
if err != nil {
return err
}
err = artDriver.Load(driverArt, tempArtPath)
if err != nil {
if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
log.Infof("Skipping optional input artifact that was not found: %s", art.Name)
continue
}
return fmt.Errorf("artifact %s failed to load: %w", art.Name, err)
}
return fmt.Errorf("artifact %s failed to load: %w", art.Name, err)
}

isTar := false
Expand Down

0 comments on commit 8c55a90

Please sign in to comment.