Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opt in device plugin #482

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions e2e2/test/cases/neuron/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
)

var (
testenv env.Environment
neuronTestImage *string
testenv env.Environment
neuronTestImage *string
installDevicePlugin *bool
)

var (
Expand All @@ -30,8 +31,21 @@ var (
neuronDevicePluginManifest []byte
)

func deployNeuronDevicePlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "neuron-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, err
}
return ctx, nil
}

func TestMain(m *testing.M) {
neuronTestImage = flag.String("neuronTestImage", "", "image for neuron single node test")
installDevicePlugin = flag.Bool("installDevicePlugin", true, "install neuron device plugin")
cfg, err := envconf.NewFromFlags()
if err != nil {
log.Fatalf("failed to initialize test environment: %v", err)
Expand All @@ -41,31 +55,23 @@ func TestMain(m *testing.M) {
defer cancel()
testenv = testenv.WithContext(ctx)

manifests := [][]byte{
neuronDevicePluginManifest,
neuronDevicePlugiRbacManifest,
}

testenv.Setup(
var manifests [][]byte
setUpFunctions := []env.Func{
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.ApplyManifests(config.Client().RESTConfig(), manifests...)
if err != nil {
return ctx, err
}
return ctx, nil
},
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "neuron-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, err
}
return ctx, nil
},
)
}

if *installDevicePlugin {
manifests = append(manifests, neuronDevicePluginManifest, neuronDevicePlugiRbacManifest)
setUpFunctions = append(setUpFunctions, deployNeuronDevicePlugin)
}

testenv.Setup(setUpFunctions...)

testenv.Finish(
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
Expand Down
195 changes: 109 additions & 86 deletions e2e2/test/cases/nvidia/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
)

var (
testenv env.Environment
nodeType *string
efaEnabled *bool
nvidiaTestImage *string
nodeCount int
gpuPerNode int
efaPerNode int
testenv env.Environment
nodeType *string
installDevicePlugin *bool
efaEnabled *bool
nvidiaTestImage *string
nodeCount int
gpuPerNode int
efaPerNode int
)

var (
Expand All @@ -42,10 +43,97 @@ var (
efaDevicePluginManifest []byte
)

func deployMPIOperator(ctx context.Context, config *envconf.Config) (context.Context, error) {
dep := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: "mpi-operator", Namespace: "mpi-operator"},
}
err := wait.For(conditions.New(config.Client().Resources()).DeploymentConditionMatch(&dep, appsv1.DeploymentAvailable, v1.ConditionTrue),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy mpi-operator: %v", err)
}
return ctx, nil
}

func deployNvidiaDevicePlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "nvidia-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy nvidia-device-plugin: %v", err)
}
return ctx, nil
}

func deployEFAPlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.ApplyManifests(config.Client().RESTConfig(), efaDevicePluginManifest)
if err != nil {
return ctx, err
}

ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "aws-efa-k8s-device-plugin-daemonset", Namespace: "kube-system"},
}
err = wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy efa-device-plugin: %v", err)
}

return ctx, nil
}

func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Context, error) {
clientset, err := kubernetes.NewForConfig(config.Client().RESTConfig())
if err != nil {
return ctx, err
}

nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return ctx, err
}

singleNodeType := true
for i := 1; i < len(nodes.Items)-1; i++ {
if nodes.Items[i].Labels["node.kubernetes.io/instance-type"] != nodes.Items[i-1].Labels["node.kubernetes.io/instance-type"] {
singleNodeType = false
}
}
if !singleNodeType {
return ctx, fmt.Errorf("Node types are not the same, all node types must be the same in the cluster")
}

if *nodeType != "" {
for _, v := range nodes.Items {
if v.Labels["node.kubernetes.io/instance-type"] == *nodeType {
nodeCount++
gpu := v.Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := v.Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}
}
} else {
log.Printf("No node type specified. Using the node type %s in the node groups.", nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeType = aws.String(nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeCount = len(nodes.Items)
gpu := nodes.Items[0].Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := nodes.Items[0].Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}

return ctx, nil
}

func TestMain(m *testing.M) {
nodeType = flag.String("nodeType", "", "node type for the tests")
nvidiaTestImage = flag.String("nvidiaTestImage", "", "nccl test image for nccl tests")
efaEnabled = flag.Bool("efaEnabled", false, "enable efa tests")
installDevicePlugin = flag.Bool("installDevicePlugin", true, "install nvidia device plugin")
cfg, err := envconf.NewFromFlags()
if err != nil {
log.Fatalf("failed to initialize test environment: %v", err)
Expand All @@ -57,95 +145,30 @@ func TestMain(m *testing.M) {

// all NVIDIA tests require the device plugin and MPI operator
manifests := [][]byte{
nvidiaDevicePluginManifest,
mpiOperatorManifest,
}

testenv.Setup(
setUpFunctions := []env.Func{
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.ApplyManifests(config.Client().RESTConfig(), manifests...)
if err != nil {
return ctx, err
}
return ctx, nil
},
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
dep := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: "mpi-operator", Namespace: "mpi-operator"},
}
err := wait.For(conditions.New(config.Client().Resources()).DeploymentConditionMatch(&dep, appsv1.DeploymentAvailable, v1.ConditionTrue),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy mpi-operator: %v", err)
}
return ctx, nil
},
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "nvidia-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy nvidia-device-plugin: %v", err)
}
return ctx, nil
},
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
clientset, err := kubernetes.NewForConfig(cfg.Client().RESTConfig())
if err != nil {
return ctx, err
}
if *efaEnabled {
err := fwext.ApplyManifests(cfg.Client().RESTConfig(), efaDevicePluginManifest)
if err != nil {
return ctx, err
}
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "aws-efa-k8s-device-plugin-daemonset", Namespace: "kube-system"},
}
err = wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy efa-device-plugin: %v", err)
}
}
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return ctx, err
}
deployMPIOperator,
checkNodeTypes,
}

singleNodeType := true
for i := 1; i < len(nodes.Items)-1; i++ {
if nodes.Items[i].Labels["node.kubernetes.io/instance-type"] != nodes.Items[i-1].Labels["node.kubernetes.io/instance-type"] {
singleNodeType = false
}
}
if !singleNodeType {
return ctx, fmt.Errorf("Node types are not the same, all node types must be the same in the cluster")
}
if *nodeType != "" {
for _, v := range nodes.Items {
if v.Labels["node.kubernetes.io/instance-type"] == *nodeType {
nodeCount++
gpu := v.Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := v.Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}
}
} else {
log.Printf("No node type specified. Using the node type %s in the node groups.", nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeType = aws.String(nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeCount = len(nodes.Items)
gpu := nodes.Items[0].Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := nodes.Items[0].Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}
return ctx, nil
},
)
if *installDevicePlugin {
manifests = append(manifests, nvidiaDevicePluginManifest)
setUpFunctions = append(setUpFunctions, deployNvidiaDevicePlugin)
}

if *efaEnabled {
setUpFunctions = append(setUpFunctions, deployEFAPlugin)
}

testenv.Setup(setUpFunctions...)

testenv.Finish(
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
Expand Down
4 changes: 2 additions & 2 deletions kubetest2/internal/deployers/eksapi/nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (m *NodegroupManager) createManagedNodegroup(infra *Infrastructure, cluster
func (m *NodegroupManager) createUnmanagedNodegroup(infra *Infrastructure, cluster *Cluster, opts *deployerOptions) error {
stackName := m.getUnmanagedNodegroupStackName()
klog.Infof("creating unmanaged nodegroup stack...")
userData, userDataIsMimePart, err := generateUserData(opts.UserDataFormat, opts.EFA, cluster)
userData, userDataIsMimePart, err := generateUserData(opts.UserDataFormat, cluster)
if err != nil {
return err
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func (m *NodegroupManager) createUnmanagedNodegroup(infra *Infrastructure, clust
func (m *NodegroupManager) createUnmanagedNodegroupWithEFA(infra *Infrastructure, cluster *Cluster, opts *deployerOptions) error {
stackName := m.getUnmanagedNodegroupStackName()
klog.Infof("creating unmanaged nodegroup with EFA stack...")
userData, userDataIsMimePart, err := generateUserData(opts.UserDataFormat, opts.EFA, cluster)
userData, userDataIsMimePart, err := generateUserData(opts.UserDataFormat, cluster)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion kubetest2/internal/deployers/eksapi/templates/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type UserDataTemplateData struct {
CertificateAuthority string
CIDR string
APIServerEndpoint string
EFAEnabled bool
}

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,3 @@

[settings.host-containers.admin]
"enabled" = true

[settings.efa]
"enabled" = {{.EFAEnabled}}
3 changes: 1 addition & 2 deletions kubetest2/internal/deployers/eksapi/userdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/aws/aws-k8s-tester/kubetest2/internal/deployers/eksapi/templates"
)

func generateUserData(format string, efaEnabled bool, cluster *Cluster) (string, bool, error) {
func generateUserData(format string, cluster *Cluster) (string, bool, error) {
userDataIsMimePart := true
var t *template.Template
switch format {
Expand All @@ -29,7 +29,6 @@ func generateUserData(format string, efaEnabled bool, cluster *Cluster) (string,
CertificateAuthority: cluster.certificateAuthorityData,
CIDR: cluster.cidr,
Name: cluster.name,
EFAEnabled: efaEnabled,
}); err != nil {
return "", false, err
}
Expand Down
9 changes: 1 addition & 8 deletions kubetest2/internal/deployers/eksapi/userdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,40 +42,33 @@ const bottlerocketUserData = `[settings.kubernetes]

[settings.host-containers.admin]
"enabled" = true

[settings.efa]
"enabled" = true
`

func Test_generateUserData(t *testing.T) {
cases := []struct {
format string
efa bool
expected string
expectedIsMimePart bool
}{
{
format: "bootstrap.sh",
efa: false,
expected: bootstrapShUserData,
expectedIsMimePart: true,
},
{
format: "nodeadm",
efa: false,
expected: nodeadmUserData,
expectedIsMimePart: true,
},
{
format: "bottlerocket",
efa: true,
expected: bottlerocketUserData,
expectedIsMimePart: false,
},
}
for _, c := range cases {
t.Run(c.format, func(t *testing.T) {
actual, isMimePart, err := generateUserData(c.format, c.efa, &cluster)
actual, isMimePart, err := generateUserData(c.format, &cluster)
if err != nil {
t.Log(err)
t.Error(err)
Expand Down
Loading