Skip to content

Commit

Permalink
Remove dependency on k8s.io/kubernetes (#2398)
Browse files Browse the repository at this point in the history
* Remove dependency on `k8s.io/kubernetes`

Signed-off-by: Jacob Salway <[email protected]>

* Make code compliant with Apache 2.0 license

Signed-off-by: Jacob Salway <[email protected]>

---------

Signed-off-by: Jacob Salway <[email protected]>
  • Loading branch information
jacobsalway authored Jan 28, 2025
1 parent 6e15770 commit ad30d15
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 5 deletions.
4 changes: 2 additions & 2 deletions cmd/sparkctl/app/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
clientWatch "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/util/interrupt"

crdclientset "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned"
"github.com/kubeflow/spark-operator/pkg/util"
)

var FollowEvents bool
Expand Down Expand Up @@ -147,7 +147,7 @@ func streamEvents(events watch.Interface, streamSince int64) error {

// Set 10 minutes inactivity timeout
watchExpire := 10 * time.Minute
intr := interrupt.New(nil, events.Stop)
intr := util.NewInterruptHandler(nil, events.Stop)
return intr.Run(func() error {
// Start rendering contents of the table without table header as it is already printed
table = prepareNewTable()
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ require (
k8s.io/apiextensions-apiserver v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v1.5.2
k8s.io/kubernetes v1.30.2
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
sigs.k8s.io/controller-runtime v0.17.5
sigs.k8s.io/scheduler-plugins v0.29.8
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -729,8 +729,6 @@ k8s.io/kube-openapi v0.0.0-20240709000822-3c01b740850f h1:2sXuKesAYbRHxL3aE2PN6z
k8s.io/kube-openapi v0.0.0-20240709000822-3c01b740850f/go.mod h1:UxDHUPsUwTOOxSU+oXURfFBcAS6JwiRXTYqYwfuGowc=
k8s.io/kubectl v0.29.3 h1:RuwyyIU42MAISRIePaa8Q7A3U74Q9P4MoJbDFz9o3us=
k8s.io/kubectl v0.29.3/go.mod h1:yCxfY1dbwgVdEt2zkJ6d5NNLOhhWgTyrqACIoFhpdd4=
k8s.io/kubernetes v1.30.2 h1:11WhS78OYX/lnSy6TXxPO6Hk+E5K9ZNrEsk9JgMSX8I=
k8s.io/kubernetes v1.30.2/go.mod h1:yPbIk3MhmhGigX62FLJm+CphNtjxqCvAIFQXup6RKS0=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
oras.land/oras-go v1.2.5 h1:XpYuAwAb0DfQsunIyMfeET92emK8km3W4yEzZvUbsTo=
Expand Down
99 changes: 99 additions & 0 deletions pkg/util/interrupt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Original code from Kubernetes (https://github.com/kubernetes/kubernetes)
https://github.com/kubernetes/kubernetes/blob/master/pkg/util/interrupt/interrupt.go
Only naming has been changed from the original implementation.
*/

package util

import (
"os"
"os/signal"
"sync"
"syscall"
)

// terminationSignals are signals that cause the program to exit in the
// supported platforms (linux, darwin, windows).
var terminationSignals = []os.Signal{syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT}

// InterruptHandler Handler guarantees execution of notifications after a critical section (the function passed
// to a Run method), even in the presence of process termination. It guarantees exactly once
// invocation of the provided notify functions.
type InterruptHandler struct {
notify []func()
final func(os.Signal)
once sync.Once
}

// NewInterruptHandler New creates a new handler that guarantees all notify functions are run after the critical
// section exits (or is interrupted by the OS), then invokes the final handler. If no final
// handler is specified, the default final is `os.Exit(1)`. A handler can only be used for
// one critical section.
func NewInterruptHandler(final func(os.Signal), notify ...func()) *InterruptHandler {
return &InterruptHandler{
final: final,
notify: notify,
}
}

// Close executes all the notification handlers if they have not yet been executed.
func (h *InterruptHandler) Close() {
h.once.Do(func() {
for _, fn := range h.notify {
fn()
}
})
}

// Signal is called when an os.Signal is received, and guarantees that all notifications
// are executed, then the final handler is executed. This function should only be called once
// per Handler instance.
func (h *InterruptHandler) Signal(s os.Signal) {
h.once.Do(func() {
for _, fn := range h.notify {
fn()
}
if h.final == nil {
os.Exit(1)
}
h.final(s)
})
}

// Run ensures that any notifications are invoked after the provided fn exits (even if the
// process is interrupted by an OS termination signal). Notifications are only invoked once
// per Handler instance, so calling Run more than once will not behave as the user expects.
func (h *InterruptHandler) Run(fn func() error) error {
ch := make(chan os.Signal, 1)
signal.Notify(ch, terminationSignals...)
defer func() {
signal.Stop(ch)
close(ch)
}()
go func() {
sig, ok := <-ch
if !ok {
return
}
h.Signal(sig)
}()
defer h.Close()
return fn()
}

0 comments on commit ad30d15

Please sign in to comment.