Skip to content

Commit

Permalink
Fixed --watch-kubeconfig glitches (#57)
Browse files Browse the repository at this point in the history
* Fixed --watch-kubeconfig glitches

* Added back missing field

* Updated CHANGELOG.md
  • Loading branch information
applejag authored Nov 2, 2023
1 parent 3726f01 commit 8a5468e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 55 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ This project tries to follow [SemVer 2.0.0](https://semver.org/).
- Added timer on pod's "STATUS" column when a pod is deleted
(e.g `Deleted (3m ago)`). (#56)

- Fixed glitches when using flag `--watch-kubeconfig` / `-W`.
The watch was not properly restarting, but works great now. (#57)

## v0.4.0 (2023-09-03)

- Added text filtering. (#32, thanks @semihbkgr!)
Expand Down
90 changes: 35 additions & 55 deletions pkg/klock/klock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package klock
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"

tea "github.com/charmbracelet/bubbletea"
Expand Down Expand Up @@ -97,11 +97,10 @@ func Execute(o Options, args []string) error {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
restartChan := make(chan struct{})
defer close(restartChan)

watchAndPrint := func() error {
if err := w.Watch(ctx); err != nil {
return err
}
go func() {
for {
select {
case event, ok := <-fileEvents:
Expand All @@ -112,22 +111,19 @@ func Execute(o Options, args []string) error {
if event.Op != fsnotify.Write {
continue
}
go w.RestartActiveWatch(ctx)
restartChan <- struct{}{}

case err := <-w.ErrorChan():
t.SetError(err)
p.Send(nil)
case <-ctx.Done():
return nil
return
}
}
}
}()

go func() {
if err := watchAndPrint(); err != nil {
p.Quit()
fmt.Fprintf(os.Stderr, "err: %s\n", err)
}
w.WatchLoop(ctx, restartChan)
}()

_, err := p.Run()
Expand All @@ -151,44 +147,47 @@ type Watcher struct {
Printer Printer
Args []string

cancel func()
errorChan chan error
}

func (w *Watcher) ErrorChan() <-chan error {
return w.errorChan
}

func (w *Watcher) Watch(ctx context.Context) error {
return w.startWatch(ctx, false)
}

func (w *Watcher) RestartActiveWatch(ctx context.Context) {
if w.cancel != nil {
w.cancel()
}
w.cancel = nil

func (w *Watcher) WatchLoop(ctx context.Context, restartChan <-chan struct{}) error {
clearBeforePrinting := false
watchErrChan := make(chan error, 1)
for {
err := w.startWatch(ctx, true)
if ctx.Err() != nil {
return
}
if err != nil {
w.errorChan <- fmt.Errorf("retry in 5s: restart watch: %w", err)
}
var wg sync.WaitGroup
wg.Add(1)
watchCtx, cancel := context.WithCancel(ctx)
go func(clearBeforePrinting bool) {
defer wg.Done()
err := w.watch(watchCtx, clearBeforePrinting)
if watchCtx.Err() == nil {
watchErrChan <- err
}
}(clearBeforePrinting)
clearBeforePrinting = true
select {
case <-time.After(5 * time.Second):
continue
case err := <-watchErrChan:
w.errorChan <- fmt.Errorf("restart in 5s: %w", err)
time.Sleep(5 * time.Second)
case <-restartChan:
cancel()
case <-ctx.Done():
return
cancel()
return ctx.Err()
}
wg.Wait()
}
}

func (w *Watcher) startWatch(ctx context.Context, clearBeforePrinting bool) error {
ctx, w.cancel = context.WithCancel(ctx)
func (w *Watcher) Watch(ctx context.Context) error {
return w.watch(ctx, false)
}

func (w *Watcher) watch(ctx context.Context, clearBeforePrinting bool) error {
ns, _, err := w.ConfigFlags.ToRawKubeConfigLoader().Namespace()
if err != nil {
return fmt.Errorf("read namespace: %w", err)
Expand Down Expand Up @@ -263,26 +262,7 @@ func (w *Watcher) startWatch(ctx context.Context, clearBeforePrinting bool) erro

w.Printer.Table.StopSpinner()

go w.watchLoop(ctx, r, resVersion)
return nil
}

func (w *Watcher) watchLoop(ctx context.Context, r *resource.Result, resVersion string) {
for {
err := w.pipeEvents(ctx, r, resVersion)
if ctx.Err() != nil {
return
}
if err != nil {
w.errorChan <- fmt.Errorf("retry in 5s: %w", err)
}
select {
case <-time.After(5 * time.Second):
continue
case <-ctx.Done():
return
}
}
return w.pipeEvents(ctx, r, resVersion)
}

func (w *Watcher) pipeEvents(ctx context.Context, r *resource.Result, resVersion string) error {
Expand Down

0 comments on commit 8a5468e

Please sign in to comment.