diff --git a/pkg/cache/cluster.go b/pkg/cache/cluster.go index 626c70e81..38ebcdbff 100644 --- a/pkg/cache/cluster.go +++ b/pkg/cache/cluster.go @@ -666,6 +666,29 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo } } + // If the resourceVersion is still missing, watchutil.NewRetryWatcher will fail. + // https://github.com/kubernetes/client-go/blob/78d2af792babf2dd937ba2e2a8d99c753a5eda89/tools/watch/retrywatcher.go#L68-L71 + // Instead, let's just check if the resourceVersion exists at the next resync ... + if resourceVersion == "" { + c.log.V(1).Info(fmt.Sprintf("Ignoring watch for %s on %s due to missing resourceVersion", api.GroupKind, c.config.Host)) + + var watchResyncTimeoutCh <-chan time.Time + if c.watchResyncTimeout > 0 { + shouldResync := time.NewTimer(c.watchResyncTimeout) + defer shouldResync.Stop() + watchResyncTimeoutCh = shouldResync.C + } + + for { + select { + case <-ctx.Done(): + return nil + case <-watchResyncTimeoutCh: + return fmt.Errorf("Resyncing %s on %s due to timeout", api.GroupKind, c.config.Host) + } + } + } + w, err := watchutil.NewRetryWatcher(resourceVersion, &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { res, err := resClient.Watch(ctx, options) diff --git a/pkg/cache/cluster_test.go b/pkg/cache/cluster_test.go index 893c6b877..73f1b92dd 100644 --- a/pkg/cache/cluster_test.go +++ b/pkg/cache/cluster_test.go @@ -1,9 +1,13 @@ package cache import ( + "bufio" + "bytes" "context" "errors" "fmt" + "io" + "regexp" "sort" "strings" "sync" @@ -29,6 +33,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" testcore "k8s.io/client-go/testing" + "k8s.io/klog/v2/textlogger" "sigs.k8s.io/yaml" "github.com/argoproj/gitops-engine/pkg/utils/kube" @@ -1393,3 +1398,219 @@ func BenchmarkIterateHierarchyV2(b *testing.B) { // }) // } //} + +type syncedBuffer struct { + mutex sync.Mutex + buf bytes.Buffer +} + +func (lb *syncedBuffer) Read(p []byte) (n int, err error) { + lb.mutex.Lock() + defer lb.mutex.Unlock() + return lb.buf.Read(p) +} + +func (lb *syncedBuffer) Write(p []byte) (n int, err error) { + lb.mutex.Lock() + defer lb.mutex.Unlock() + return lb.buf.Write(p) +} + +func Test_watchEvents_Missing_resourceVersion(t *testing.T) { + objExample := &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "apiservice.example.com/v1", + "kind": "Example", + "metadata": map[string]any{ + "name": "example", + }, + }} + + testCases := []struct { + name string + objs []runtime.Object + funAssert func(t *testing.T, logLines []string) + waitForLogLines []string + waitForLogExtra time.Duration + watchResyncTimeout time.Duration + }{ + { + name: "Should_ignore_resource_without_resourceVersion", + objs: []runtime.Object{objExample}, + waitForLogLines: []string{"Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion"}, + funAssert: func(t *testing.T, logLines []string) { + t.Helper() + require.NotContains(t, logLines, "Resyncing Example.apiservice.example.com on https://test due to timeout") + }, + watchResyncTimeout: defaultWatchResyncTimeout, + waitForLogExtra: 0 * time.Millisecond, + }, + { + name: "Should_not_ignore_resource_with_resourceVersion", + objs: []runtime.Object{testDeploy()}, + waitForLogLines: []string{"Start watch Deployment.apps on https://test"}, + funAssert: func(t *testing.T, logLines []string) { + t.Helper() + require.NotContains(t, logLines, "Ignoring watch for Deployment.apps on https://test due to missing resourceVersion") + }, + watchResyncTimeout: defaultWatchResyncTimeout, + waitForLogExtra: 100 * time.Millisecond, + }, + { + name: "Should_retry_ignored_resource_on_next_resync", + objs: []runtime.Object{objExample}, + waitForLogLines: []string{"Failed to watch Example.apiservice.example.com on https://test: Resyncing Example.apiservice.example.com on https://test due to timeout, retrying in 1s"}, + funAssert: func(t *testing.T, logLines []string) { + t.Helper() + require.Contains(t, logLines, "Ignoring watch for Example.apiservice.example.com on https://test due to missing resourceVersion") + }, + watchResyncTimeout: 10 * time.Millisecond, + waitForLogExtra: 100 * time.Millisecond, + }, + } + + readLinesUntil := func(ctx context.Context, buf io.Reader, wantedLines []string, readExtra time.Duration) ([]string, error) { + wantedStatuses := map[string]bool{} + for _, wantedLine := range wantedLines { + wantedStatuses[strings.TrimSuffix(wantedLine, "\r\n")] = false + } + + var logLines []string + readChan := make(chan any) + go func() { + lineRgx := regexp.MustCompile(`^.+?\s+\d+\s+.+?\.go:(?:\d+?|\d+?)\]\s+"(?P.+)"$`) + + for { + scanner := bufio.NewScanner(buf) + for scanner.Scan() { + match := lineRgx.FindStringSubmatch(scanner.Text()) + readChan <- match[1] + } + + if scanner.Err() != nil { + readChan <- scanner.Err() + return + } + + // EOF. Waiting for data. + time.Sleep(50 * time.Millisecond) + } + }() + + var readExtraTimer *time.Timer + var readExtraTimeoutChan <-chan time.Time + + for { + select { + case <-readExtraTimeoutChan: + return logLines, ctx.Err() + case <-ctx.Done(): + return logLines, ctx.Err() + case read := <-readChan: + if err, ok := read.(error); ok { + return logLines, err + } + + // EOF + if read == nil { + return logLines, nil + } + + logLines = append(logLines, read.(string)) + if readExtraTimer != nil { + continue + } + + line := read.(string) + if _, ok := wantedStatuses[line]; ok { + wantedStatuses[line] = true + + done := true + for _, ok := range wantedStatuses { + if !ok { + done = false + } + } + + if done { + readExtraTimer = time.NewTimer(readExtra) + readExtraTimeoutChan = readExtraTimer.C + } + } + } + } + } + + createCluster := func(opts []UpdateSettingsFunc, objs ...runtime.Object) *clusterCache { + client := fake.NewSimpleDynamicClientWithCustomListKinds(scheme.Scheme, + map[schema.GroupVersionResource]string{ + {Group: "apiservice.example.com", Version: "v1", Resource: "examples"}: "ExampleList", + }, + objs...) + reactor := client.ReactionChain[0] + client.PrependReactor("list", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + handled, ret, err = reactor.React(action) + if err != nil || !handled { + return + } + + // The apiservice.example.com group is for testing missing resourceVersion, so we omit setting it for those responses. + retList, ok := ret.(*unstructured.UnstructuredList) + if ok && len(retList.Items) > 0 && retList.Items[0].GetObjectKind().GroupVersionKind().Group == "apiservice.example.com" { + return + } + + // make sure retList response have resource version + ret.(metav1.ListInterface).SetResourceVersion("123") + return + }) + + apiResources := []kube.APIResourceInfo{{ + GroupKind: schema.GroupKind{Group: "apps", Kind: "Deployment"}, + GroupVersionResource: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, + Meta: metav1.APIResource{Namespaced: true}, + }, { + GroupKind: schema.GroupKind{Group: "apiservice.example.com", Kind: "Example"}, + GroupVersionResource: schema.GroupVersionResource{Group: "apiservice.example.com", Version: "v1", Resource: "examples"}, + Meta: metav1.APIResource{Namespaced: false}, + }} + + opts = append([]UpdateSettingsFunc{ + SetKubectl(&kubetest.MockKubectlCmd{APIResources: apiResources, DynamicClient: client}), + }, opts...) + + cache := NewClusterCache( + &rest.Config{Host: "https://test"}, + opts..., + ) + return cache + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + ctx, ctxCancel := context.WithTimeout(context.Background(), 1*time.Second) + defer ctxCancel() + + var logBuffer syncedBuffer + logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&logBuffer), textlogger.Verbosity(1), textlogger.FixedTime(time.Unix(0, 0)))) + + cluster := createCluster([]UpdateSettingsFunc{ + SetLogr(logger), + SetWatchResyncTimeout(testCase.watchResyncTimeout), + }, testCase.objs...) + + defer func() { + cluster.Invalidate() + }() + + err := cluster.EnsureSynced() + require.NoError(t, err) + + logLines, err := readLinesUntil(ctx, &logBuffer, testCase.waitForLogLines, testCase.waitForLogExtra) + require.NoError(t, err) + testCase.funAssert(t, logLines) + for _, wantedLogLine := range testCase.waitForLogLines { + require.Contains(t, logLines, wantedLogLine) + } + }) + } +}