Skip to content

Commit

Permalink
Merge branch 'aws:main' into batch-optimization-nvidia
Browse files Browse the repository at this point in the history
  • Loading branch information
mattcjo authored Oct 11, 2024
2 parents f000ec6 + 00009de commit 1907a82
Show file tree
Hide file tree
Showing 28 changed files with 882 additions and 336 deletions.
46 changes: 26 additions & 20 deletions e2e2/test/cases/neuron/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
)

var (
testenv env.Environment
neuronTestImage *string
testenv env.Environment
neuronTestImage *string
installDevicePlugin *bool
)

var (
Expand All @@ -30,8 +31,21 @@ var (
neuronDevicePluginManifest []byte
)

func deployNeuronDevicePlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "neuron-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, err
}
return ctx, nil
}

func TestMain(m *testing.M) {
neuronTestImage = flag.String("neuronTestImage", "", "image for neuron single node test")
installDevicePlugin = flag.Bool("installDevicePlugin", true, "install neuron device plugin")
cfg, err := envconf.NewFromFlags()
if err != nil {
log.Fatalf("failed to initialize test environment: %v", err)
Expand All @@ -41,31 +55,23 @@ func TestMain(m *testing.M) {
defer cancel()
testenv = testenv.WithContext(ctx)

manifests := [][]byte{
neuronDevicePluginManifest,
neuronDevicePlugiRbacManifest,
}

testenv.Setup(
var manifests [][]byte
setUpFunctions := []env.Func{
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.ApplyManifests(config.Client().RESTConfig(), manifests...)
if err != nil {
return ctx, err
}
return ctx, nil
},
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "neuron-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, err
}
return ctx, nil
},
)
}

if *installDevicePlugin {
manifests = append(manifests, neuronDevicePluginManifest, neuronDevicePlugiRbacManifest)
setUpFunctions = append(setUpFunctions, deployNeuronDevicePlugin)
}

testenv.Setup(setUpFunctions...)

testenv.Finish(
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
Expand Down
197 changes: 112 additions & 85 deletions e2e2/test/cases/nvidia/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

fwext "github.com/aws/aws-k8s-tester/e2e2/internal/framework_extensions"
"github.com/aws/aws-sdk-go-v2/aws"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -23,13 +24,15 @@ import (
)

var (
testenv env.Environment
nodeType *string
efaEnabled *bool
nvidiaTestImage *string
nodeCount int
gpuPerNode int
efaPerNode int
testenv env.Environment
nodeType *string
installDevicePlugin *bool
efaEnabled *bool
nvidiaTestImage *string
skipUnitTestSubcommand *string
nodeCount int
gpuPerNode int
efaPerNode int
)

var (
Expand All @@ -41,10 +44,98 @@ var (
efaDevicePluginManifest []byte
)

func deployMPIOperator(ctx context.Context, config *envconf.Config) (context.Context, error) {
dep := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: "mpi-operator", Namespace: "mpi-operator"},
}
err := wait.For(conditions.New(config.Client().Resources()).DeploymentConditionMatch(&dep, appsv1.DeploymentAvailable, v1.ConditionTrue),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy mpi-operator: %v", err)
}
return ctx, nil
}

func deployNvidiaDevicePlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "nvidia-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy nvidia-device-plugin: %v", err)
}
return ctx, nil
}

func deployEFAPlugin(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.ApplyManifests(config.Client().RESTConfig(), efaDevicePluginManifest)
if err != nil {
return ctx, err
}

ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "aws-efa-k8s-device-plugin-daemonset", Namespace: "kube-system"},
}
err = wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, fmt.Errorf("failed to deploy efa-device-plugin: %v", err)
}

return ctx, nil
}

func checkNodeTypes(ctx context.Context, config *envconf.Config) (context.Context, error) {
clientset, err := kubernetes.NewForConfig(config.Client().RESTConfig())
if err != nil {
return ctx, err
}

nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return ctx, err
}

singleNodeType := true
for i := 1; i < len(nodes.Items)-1; i++ {
if nodes.Items[i].Labels["node.kubernetes.io/instance-type"] != nodes.Items[i-1].Labels["node.kubernetes.io/instance-type"] {
singleNodeType = false
}
}
if !singleNodeType {
return ctx, fmt.Errorf("Node types are not the same, all node types must be the same in the cluster")
}

if *nodeType != "" {
for _, v := range nodes.Items {
if v.Labels["node.kubernetes.io/instance-type"] == *nodeType {
nodeCount++
gpu := v.Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := v.Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}
}
} else {
log.Printf("No node type specified. Using the node type %s in the node groups.", nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeType = aws.String(nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeCount = len(nodes.Items)
gpu := nodes.Items[0].Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := nodes.Items[0].Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}

return ctx, nil
}

func TestMain(m *testing.M) {
nodeType = flag.String("nodeType", "", "node type for the tests")
nvidiaTestImage = flag.String("nvidiaTestImage", "", "nccl test image for nccl tests")
efaEnabled = flag.Bool("efaEnabled", false, "enable efa tests")
installDevicePlugin = flag.Bool("installDevicePlugin", true, "install nvidia device plugin")
skipUnitTestSubcommand = flag.String("skipUnitTestSubcommand", "", "optional command to skip specified unit test, `-s test1|test2|...`")
cfg, err := envconf.NewFromFlags()
if err != nil {
log.Fatalf("failed to initialize test environment: %v", err)
Expand All @@ -56,94 +147,30 @@ func TestMain(m *testing.M) {

// all NVIDIA tests require the device plugin and MPI operator
manifests := [][]byte{
nvidiaDevicePluginManifest,
mpiOperatorManifest,
}

testenv.Setup(
setUpFunctions := []env.Func{
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
err := fwext.ApplyManifests(config.Client().RESTConfig(), manifests...)
if err != nil {
return ctx, err
}
return ctx, nil
},
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
dep := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: "mpi-operator", Namespace: "mpi-operator"},
}
err := wait.For(conditions.New(config.Client().Resources()).DeploymentConditionMatch(&dep, appsv1.DeploymentAvailable, v1.ConditionTrue),
wait.WithContext(ctx))
if err != nil {
return ctx, err
}
return ctx, nil
},
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "nvidia-device-plugin-daemonset", Namespace: "kube-system"},
}
err := wait.For(fwext.NewConditionExtension(config.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, err
}
return ctx, nil
},
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
clientset, err := kubernetes.NewForConfig(cfg.Client().RESTConfig())
if err != nil {
return ctx, err
}
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return ctx, err
}
if *efaEnabled {
err := fwext.ApplyManifests(cfg.Client().RESTConfig(), efaDevicePluginManifest)
if err != nil {
return ctx, err
}
ds := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Name: "efa-device-plugin-daemonset", Namespace: "kube-system"},
}
err = wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).DaemonSetReady(&ds),
wait.WithContext(ctx))
if err != nil {
return ctx, err
}
}
deployMPIOperator,
checkNodeTypes,
}

singleNodeType := true
for i := 1; i < len(nodes.Items)-1; i++ {
if nodes.Items[i].Labels["node.kubernetes.io/instance-type"] != nodes.Items[i-1].Labels["node.kubernetes.io/instance-type"] {
singleNodeType = false
}
}
if !singleNodeType {
return ctx, fmt.Errorf("Node types are not the same, all node types must be the same in the cluster")
}
if *nodeType != "" {
for _, v := range nodes.Items {
if v.Labels["node.kubernetes.io/instance-type"] == *nodeType {
nodeCount++
gpu := v.Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := v.Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}
}
} else {
log.Printf("No node type specified. Using the node type %s in the node groups.", nodes.Items[0].Labels["node.kubernetes.io/instance-type"])
nodeCount = len(nodes.Items)
gpu := nodes.Items[0].Status.Capacity["nvidia.com/gpu"]
gpuPerNode = int(gpu.Value())
efa := nodes.Items[0].Status.Capacity["vpc.amazonaws.com/efa"]
efaPerNode = int(efa.Value())
}
return ctx, nil
},
)
if *installDevicePlugin {
manifests = append(manifests, nvidiaDevicePluginManifest)
setUpFunctions = append(setUpFunctions, deployNvidiaDevicePlugin)
}

if *efaEnabled {
setUpFunctions = append(setUpFunctions, deployEFAPlugin)
}

testenv.Setup(setUpFunctions...)

testenv.Finish(
func(ctx context.Context, config *envconf.Config) (context.Context, error) {
Expand Down
Loading

0 comments on commit 1907a82

Please sign in to comment.