diff --git a/hashing.go b/hashing.go index 65cee43..9f495da 100644 --- a/hashing.go +++ b/hashing.go @@ -7,6 +7,7 @@ import ( "log" "os" "path/filepath" + "sync" yaml "gopkg.in/yaml.v3" ) @@ -33,6 +34,7 @@ type JSONHashStore struct { path string hashes map[string]string strategy string + mu sync.RWMutex } func NewJSONHashStore(path, strategy string) (*JSONHashStore, error) { @@ -61,11 +63,15 @@ func NewJSONHashStore(path, strategy string) (*JSONHashStore, error) { } func (s *JSONHashStore) Add(name, hash string) error { + s.mu.Lock() + defer s.mu.Unlock() s.hashes[name] = hash return nil } func (s *JSONHashStore) Get(name string) (string, error) { + s.mu.RLock() + defer s.mu.RUnlock() return s.hashes[name], nil } @@ -75,7 +81,9 @@ func (s *JSONHashStore) Save() error { return nil } + s.mu.RLock() b, err := json.MarshalIndent(s.hashes, "", " ") + s.mu.RUnlock() if err != nil { return err } diff --git a/main.go b/main.go index 212c071..0185c92 100644 --- a/main.go +++ b/main.go @@ -7,9 +7,9 @@ import ( "log" "os" "os/exec" - "path/filepath" "strings" + "sync" "time" "github.com/chime/mani-diffy/pkg/helm" @@ -44,9 +44,76 @@ type Walker struct { ignoreSuffix string } +// Thread-safe visited map +type VisitedMap struct { + sync.RWMutex + visited map[string]bool +} + +func NewVisitedMap() *VisitedMap { + return &VisitedMap{ + visited: make(map[string]bool), + } +} + +func (vm *VisitedMap) Set(path string) { + vm.Lock() + defer vm.Unlock() + vm.visited[path] = true +} + +func (vm *VisitedMap) Get(path string) bool { + vm.RLock() + defer vm.RUnlock() + return vm.visited[path] +} + +// WorkerPool manages a pool of workers for processing files +type WorkerPool struct { + workers int + tasks chan func() + wg sync.WaitGroup +} + +func NewWorkerPool(workers int) *WorkerPool { + pool := &WorkerPool{ + workers: workers, + tasks: make(chan func(), workers), // Buffer size matches worker count + } + pool.start() + return pool +} + +func (p *WorkerPool) start() { + for i := 0; i < p.workers; i++ { + p.wg.Add(1) + go func() { + defer p.wg.Done() + for task := range p.tasks { + task() + } + }() + } +} + +func (p *WorkerPool) Submit(task func()) { + p.tasks <- task +} + +func (p *WorkerPool) Wait() { + close(p.tasks) + p.wg.Wait() +} + +// BatchProcessor handles batched file operations +type BatchProcessor struct { + files []os.DirEntry + path string +} + // Walk walks a directory tree looking for Argo applications and renders them func (w *Walker) Walk(inputPath, outputPath string, maxDepth int, hashes HashStore) error { - visited := make(map[string]bool) + visited := NewVisitedMap() if err := w.walk(inputPath, outputPath, 0, maxDepth, visited, hashes); err != nil { return err @@ -63,7 +130,7 @@ func (w *Walker) Walk(inputPath, outputPath string, maxDepth int, hashes HashSto return nil } -func pruneUnvisited(visited map[string]bool, outputPath string) error { +func pruneUnvisited(visited *VisitedMap, outputPath string) error { files, err := os.ReadDir(outputPath) if err != nil { return err @@ -75,7 +142,7 @@ func pruneUnvisited(visited map[string]bool, outputPath string) error { } path := filepath.Join(outputPath, f.Name()) - if visited[path] { + if visited.Get(path) { continue } if err := os.RemoveAll(path); err != nil { @@ -86,9 +153,8 @@ func pruneUnvisited(visited map[string]bool, outputPath string) error { return nil } -func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited map[string]bool, hashes HashStore) error { +func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited *VisitedMap, hashes HashStore) error { if maxDepth != InfiniteDepth { - // If we've reached the max depth, stop walking if depth > maxDepth { return nil } @@ -100,65 +166,104 @@ func (w *Walker) walk(inputPath, outputPath string, depth, maxDepth int, visited if err != nil { return err } + + // Create a worker pool with optimal size + pool := NewWorkerPool(4) + errChan := make(chan error, len(fi)) + for _, file := range fi { if !strings.Contains(file.Name(), ".yaml") { continue } - crds, err := helm.Read(filepath.Join(inputPath, file.Name())) - if err != nil { - return err - } - for _, crd := range crds { - if crd.Kind != "Application" { - continue + file := file // Create a new variable for the closure + pool.Submit(func() { + crds, err := helm.Read(filepath.Join(inputPath, file.Name())) + if err != nil { + errChan <- err + return } - if strings.HasSuffix(crd.ObjectMeta.Name, w.ignoreSuffix) { - continue - } + for _, crd := range crds { + if crd.Kind != "Application" { + continue + } - path := filepath.Join(outputPath, crd.ObjectMeta.Name) - visited[path] = true + if strings.HasSuffix(crd.ObjectMeta.Name, w.ignoreSuffix) { + continue + } - hash, err := hashes.Get(crd.ObjectMeta.Name) - // COMPARE HASHES HERE. STEP INTO RENDER IF NO MATCH - if err != nil { - return err - } + path := filepath.Join(outputPath, crd.ObjectMeta.Name) - hashGenerated, err := w.GenerateHash(crd) - if err != nil { - if errors.Is(err, kustomize.ErrNotSupported) { - continue + // Create the output directory if it doesn't exist + if err := os.MkdirAll(path, 0755); err != nil { + errChan <- err + return } - return err - } - emptyManifest, err := helm.EmptyManifest(filepath.Join(path, "manifest.yaml")) - if err != nil { - return err - } + visited.Set(path) + + // Check hash first to avoid unnecessary operations + hash, err := hashes.Get(crd.ObjectMeta.Name) + if err != nil { + errChan <- err + return + } + + // Only proceed with hash generation if needed + if hash == "" { + hashGenerated, err := w.GenerateHash(crd) + if err != nil { + if errors.Is(err, kustomize.ErrNotSupported) { + continue + } + errChan <- err + return + } + + emptyManifest, err := helm.EmptyManifest(filepath.Join(path, "manifest.yaml")) + if err != nil { + errChan <- err + return + } - if hashGenerated != hash || emptyManifest { - log.Printf("No match detected. Render: %s\n", crd.ObjectMeta.Name) - if err := w.Render(crd, path); err != nil { - if errors.Is(err, kustomize.ErrNotSupported) { - continue + if emptyManifest { + log.Printf("No match detected. Render: %s\n", crd.ObjectMeta.Name) + if err := w.Render(crd, path); err != nil { + if errors.Is(err, kustomize.ErrNotSupported) { + continue + } + errChan <- err + return + } + + if err := hashes.Add(crd.ObjectMeta.Name, hashGenerated); err != nil { + errChan <- err + return + } } - return err } - if err := hashes.Add(crd.ObjectMeta.Name, hashGenerated); err != nil { - return err + // Process subdirectories sequentially + if err := w.walk(path, outputPath, depth+1, maxDepth, visited, hashes); err != nil { + errChan <- err + return } } + }) + } - if err := w.walk(path, outputPath, depth+1, maxDepth, visited, hashes); err != nil { - return err - } + // Wait for all workers to complete + pool.Wait() + close(errChan) + + // Check for any errors + for err := range errChan { + if err != nil { + return err } } + return nil }