Skip to content

Commit

Permalink
remove deprecated use of wait. functions
Browse files Browse the repository at this point in the history
  • Loading branch information
willdot committed Jun 3, 2024
1 parent 79ba10f commit 3e305c9
Show file tree
Hide file tree
Showing 13 changed files with 21 additions and 19 deletions.
5 changes: 3 additions & 2 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func Get(ctx context.Context, agent cmds.Agent, proxy proxy.Proxy) (*config.Node
// does not support jittering, so we instead use wait.JitterUntilWithContext, and cancel
// the context on success.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
agentConfig, err = get(ctx, &agent, proxy)
if err != nil {
Expand All @@ -78,7 +79,7 @@ func KubeProxyDisabled(ctx context.Context, node *config.Node, proxy proxy.Proxy
var disabled bool
var err error

wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
disabled, err = getKubeProxyDisabled(ctx, node, proxy)
if err != nil {
logrus.Infof("Waiting to retrieve kube-proxy configuration; server is not ready: %v", err)
Expand All @@ -96,7 +97,7 @@ func APIServers(ctx context.Context, node *config.Node, proxy proxy.Proxy) []str
var addresses []string
var err error

wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
addresses, err = getAPIServers(ctx, node, proxy)
if err != nil {
logrus.Infof("Failed to retrieve list of apiservers from server: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/netpol/netpol.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func Run(ctx context.Context, nodeConfig *config.Node) error {
// kube-router netpol requires addresses to be available in the node object.
// Wait until the uninitialized taint has been removed, at which point the addresses should be set.
// TODO: Replace with non-deprecated PollUntilContextTimeout when our and Kubernetes code migrate to it
if err := wait.PollImmediateInfiniteWithContext(ctx, 2*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
// Get the node object
node, err := client.CoreV1().Nodes().Get(ctx, nodeConfig.AgentConfig.NodeName, metav1.GetOptions{})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
func (a *agentTunnel) setKubeletPort(ctx context.Context, apiServerReady <-chan struct{}) {
<-apiServerReady

wait.PollImmediateWithContext(ctx, time.Second, util.DefaultAPIServerReadyTimeout, func(ctx context.Context) (bool, error) {
wait.PollUntilContextTimeout(ctx, time.Second, util.DefaultAPIServerReadyTimeout, true, func(ctx context.Context) (bool, error) {
var readyTime metav1.Time
nodeName := os.Getenv("NODE_NAME")
node, err := a.client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) {
}

if !c.config.EtcdDisableSnapshots {
wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
err := c.managedDB.ReconcileSnapshotData(ctx)
if err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {
return
}
// We use Poll here instead of Until because we want to wait the interval before running the function.
go wait.PollUntilWithContext(ctx, 30*time.Second, func(ctx context.Context) (bool, error) {
go wait.PollUntilContextCancel(ctx, 30*time.Second, true, func(ctx context.Context) (bool, error) {
clientURLs, err := c.managedDB.GetMembersClientURLs(ctx)
if err != nil {
logrus.Warnf("Failed to get etcd ClientURLs: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func RotateBootstrapToken(ctx context.Context, config *config.Control, oldToken
tokenKey := storageKey(normalizedToken)

var bootstrapList []client.Value
if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0)
if err != nil {
return false, err
Expand Down Expand Up @@ -198,7 +198,7 @@ func (c *Cluster) storageBootstrap(ctx context.Context) error {

attempts := 0
tokenKey := storageKey(normalizedToken)
return wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
attempts++
value, saveBootstrap, err := getBootstrapKeyFromStorage(ctx, storageClient, normalizedToken, token)
c.saveBootstrap = saveBootstrap
Expand Down Expand Up @@ -258,7 +258,7 @@ func getBootstrapKeyFromStorage(ctx context.Context, storageClient client.Client
var bootstrapList []client.Value
var err error

if err := wait.PollImmediateUntilWithContext(ctx, 5*time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, 5*time.Second, true, func(ctx context.Context) (bool, error) {
bootstrapList, err = storageClient.List(ctx, "/bootstrap", 0)
if err != nil {
if errors.Is(err, rpctypes.ErrGPRCNotSupportedForLearner) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (e *ETCD) Start(ctx context.Context, clientAccessInfo *clientaccess.Info) e
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for container runtime to become ready before joining etcd cluster")
case <-e.config.Runtime.ContainerRuntimeReady:
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
if err := e.join(ctx, clientAccessInfo); err != nil {
// Retry the join if waiting for another member to be promoted, or waiting for peers to connect after promotion
if errors.Is(err, rpctypes.ErrTooManyLearners) || errors.Is(err, rpctypes.ErrUnhealthy) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/etcd/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) {
if config.ClusterReset {
logrus.Debug("Skip setting S3 snapshot cluster ID and token during cluster-reset")
} else {
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
if config.Runtime.Core == nil {
return false, nil
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/secretsencrypt/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func WriteEncryptionHashAnnotation(runtime *config.ControlRuntime, node *corev1.
// WaitForEncryptionConfigReload watches the metrics API, polling the latest time the encryption config was reloaded.
func WaitForEncryptionConfigReload(runtime *config.ControlRuntime, reloadSuccesses, reloadTime int64) error {
var lastFailure string
err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {

ctx := context.Background()
err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
newReloadTime, newReloadSuccess, err := GetEncryptionConfigMetrics(runtime, false)
if err != nil {
return true, err
Expand Down Expand Up @@ -238,7 +238,8 @@ func GetEncryptionConfigMetrics(runtime *config.ControlRuntime, initialMetrics b

// This is wrapped in a poller because on startup no metrics exist. Its only after the encryption config
// is modified and the first reload occurs that the metrics are available.
err = wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) {
ctx := context.Background()
err = wait.PollUntilContextTimeout(ctx, 5*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) {
data, err := restClient.Get().AbsPath("/metrics").DoRaw(context.TODO())
if err != nil {
return true, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func verifyNode(ctx context.Context, nodeClient coreclient.NodeController, node

func ensureSecret(ctx context.Context, config *Config, node *nodeInfo) {
runtime := config.ControlConfig.Runtime
wait.PollImmediateUntilWithContext(ctx, time.Second*5, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, time.Second*5, true, func(ctx context.Context) (bool, error) {
if runtime.Core != nil {
secretClient := runtime.Core.Core().V1().Secret()
// This is consistent with events attached to the node generated by the kubelet
Expand Down
2 changes: 1 addition & 1 deletion pkg/spegel/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewServerBootstrapper(controlConfig *config.Control) routing.Bootstrapper {
func (s *serverBootstrapper) Run(_ context.Context, id string) error {
s.controlConfig.Runtime.ClusterControllerStarts["spegel-p2p"] = func(ctx context.Context) {
nodes := s.controlConfig.Runtime.Core.Core().V1().Node()
wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (bool, error) {
_ = wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return false, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/spegel/spegel.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (c *Config) Start(ctx context.Context, nodeConfig *config.Node) error {

// Wait up to 5 seconds for the p2p network to find peers. This will return
// immediately if the node is bootstrapping from itself.
wait.PollImmediateWithContext(ctx, time.Second, resolveTimeout, func(_ context.Context) (bool, error) {
_ = wait.PollUntilContextTimeout(ctx, time.Second, resolveTimeout, true, func(_ context.Context) (bool, error) {
return router.Ready()
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/util/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout t
return err
}

err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) {
err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
healthStatus := 0
result := restClient.Get().AbsPath("/readyz").Do(ctx).StatusCode(&healthStatus)
if rerr := result.Error(); rerr != nil {
Expand Down Expand Up @@ -128,7 +128,7 @@ func WaitForRBACReady(ctx context.Context, kubeconfigPath string, timeout time.D
reviewFunc = subjectAccessReview(authClient, ra, user, groups)
}

err = wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) {
err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
status, rerr := reviewFunc(ctx)
if rerr != nil {
lastErr = rerr
Expand Down

0 comments on commit 3e305c9

Please sign in to comment.