Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement mock API for OCI #49

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,5 @@ replace (
github.com/aws/aws-sdk-go-v2/credentials v1.17.45 => github.com/pkedy/aws-sdk-go-v2/credentials v0.0.0-20241115203348-0198b6c98cd9
github.com/aws/aws-sdk-go-v2/service/autoscaling v1.48.0 => github.com/pkedy/aws-sdk-go-v2/service/autoscaling v0.0.0-20241115203348-0198b6c98cd9
github.com/aws/aws-sdk-go-v2/service/ec2 v1.187.0 => github.com/pkedy/aws-sdk-go-v2/service/ec2 v0.0.0-20241115203348-0198b6c98cd9
github.com/oracle/oci-go-sdk/v65 v65.78.0 => ../../oracle/oci-go-sdk/v65
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/oracle/oci-go-sdk/v65 v65.78.0 h1:iM7lFFA7cJkUD4tmrlsAHWgL3HuTuF9mdvTAliMkcFA=
github.com/oracle/oci-go-sdk/v65 v65.78.0/go.mod h1:IBEV9l1qBzUpo7zgGaRUhbB05BVfcDGYRFBCPlTcPp0=
github.com/pkedy/aws-sdk-go-v2 v0.0.0-20241115203348-0198b6c98cd9 h1:QhMFD0yJ9nEj4BCX9lREQ7twLM5oEL8y9UwKsRNJamo=
github.com/pkedy/aws-sdk-go-v2 v0.0.0-20241115203348-0198b6c98cd9/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/pkedy/aws-sdk-go-v2/service/ec2 v0.0.0-20241115203348-0198b6c98cd9 h1:wA7yd0OxRH3EWuKaJ7ijRowlWgH2b99nrP+d10+0Sc4=
Expand Down
227 changes: 111 additions & 116 deletions pkg/providers/oci/instance_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
import (
"context"
"fmt"
"net/http"
"sort"
"time"

"github.com/oracle/oci-go-sdk/v65/core"

Check failure on line 25 in pkg/providers/oci/instance_topology.go

View workflow job for this annotation

GitHub Actions / build

github.com/oracle/oci-go-sdk/[email protected]: replacement directory ../../oracle/oci-go-sdk/v65 does not exist

Check failure on line 25 in pkg/providers/oci/instance_topology.go

View workflow job for this annotation

GitHub Actions / test

github.com/oracle/oci-go-sdk/[email protected]: replacement directory ../../oracle/oci-go-sdk/v65 does not exist
"github.com/oracle/oci-go-sdk/v65/identity"

Check failure on line 26 in pkg/providers/oci/instance_topology.go

View workflow job for this annotation

GitHub Actions / build

github.com/oracle/oci-go-sdk/[email protected]: replacement directory ../../oracle/oci-go-sdk/v65 does not exist

Check failure on line 26 in pkg/providers/oci/instance_topology.go

View workflow job for this annotation

GitHub Actions / test

github.com/oracle/oci-go-sdk/[email protected]: replacement directory ../../oracle/oci-go-sdk/v65 does not exist
"k8s.io/klog/v2"

"github.com/NVIDIA/topograph/pkg/metrics"
"github.com/NVIDIA/topograph/pkg/topology"
"github.com/NVIDIA/topograph/pkg/translate"
)

type level int
Expand All @@ -39,109 +39,111 @@
hpcIslandLevel
)

func GenerateInstanceTopology(ctx context.Context, factory ClientFactory, cis []topology.ComputeInstances) ([]*core.ComputeBareMetalHostSummary, error) {
var err error
bareMetalHostSummaries := []*core.ComputeBareMetalHostSummary{}
func GenerateInstanceTopology(ctx context.Context, factory ClientFactory, pageSize *int, cis []topology.ComputeInstances) (hosts []core.ComputeHostSummary, blockMap map[string]string, err error) {
blockMap = make(map[string]string)

for _, ci := range cis {
if bareMetalHostSummaries, err = generateInstanceTopology(ctx, factory, &ci, bareMetalHostSummaries); err != nil {
return nil, err
var client Client
if client, err = factory(ci.Region, pageSize); err != nil {
return
}
if hosts, err = getComputeHostInfo(ctx, client, hosts, blockMap); err != nil {
return
}
}

return bareMetalHostSummaries, nil
return
}

func getComputeCapacityTopologies(ctx context.Context, client Client) (cct []core.ComputeCapacityTopologySummary, err error) {
compartmentId := client.TenancyOCID()

adRequest := identity.ListAvailabilityDomainsRequest{
CompartmentId: &compartmentId,
}
func getComputeHostSummary(ctx context.Context, client Client, availabilityDomain *string) ([]core.ComputeHostSummary, error) {
var hosts []core.ComputeHostSummary

timeStart := time.Now()
ads, err := client.ListAvailabilityDomains(ctx, adRequest)
if err != nil {
return cct, fmt.Errorf("unable to get AD: %v", err)
req := core.ListComputeHostsRequest{
CompartmentId: client.TenancyOCID(),
AvailabilityDomain: availabilityDomain,
Limit: client.Limit(),
}
requestLatency.WithLabelValues("ListAvailabilityDomains", ads.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())

for _, ad := range ads.Items {
cctRequest := core.ListComputeCapacityTopologiesRequest{
CompartmentId: &compartmentId,
AvailabilityDomain: ad.Name,
for {
timeStart := time.Now()
resp, err := client.ListComputeHosts(ctx, req)
requestLatency.WithLabelValues("ListComputeHosts", resp.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())
if err != nil {
return nil, err
}

for {
timeStart := time.Now()
resp, err := client.ListComputeCapacityTopologies(ctx, cctRequest)
requestLatency.WithLabelValues("ListComputeCapacityTopologies", resp.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())
if err != nil {
if resp.HTTPResponse().StatusCode == http.StatusNotFound {
return cct, fmt.Errorf("%v for getting ComputeCapacityTopology in %s: %v", resp.HTTPResponse().StatusCode, *ad.Name, err)
} else {
return cct, fmt.Errorf("unable to get ComputeCapacity Topologies in %s : %v", *ad.Name, err)
}
}
cct = append(cct, resp.Items...)
klog.V(4).Infof("Received computeCapacityTopology %d groups; processed %d", len(resp.Items), len(cct))
if resp.OpcNextPage != nil {
cctRequest.Page = resp.OpcNextPage
} else {
break
}
hosts = append(hosts, resp.Items...)

if resp.OpcNextPage != nil {
req.Page = resp.OpcNextPage
} else {
break
}
}

return cct, nil
return hosts, nil
}

func getBMHSummaryPerComputeCapacityTopology(ctx context.Context, client Client, topologyID string) (bmhSummary []core.ComputeBareMetalHostSummary, err error) {
compartmentId := client.TenancyOCID()
request := core.ListComputeCapacityTopologyComputeBareMetalHostsRequest{
ComputeCapacityTopologyId: &topologyID,
CompartmentId: &compartmentId,
// getLocalBlockMap returns a map between LocalBlocks and ComputeGpuMemoryFabrics
func getLocalBlockMap(ctx context.Context, client Client, availabilityDomain *string, blockMap map[string]string) error {
req := core.ListComputeGpuMemoryFabricsRequest{
CompartmentId: client.TenancyOCID(),
AvailabilityDomain: availabilityDomain,
Limit: client.Limit(),
}

for {
timeStart := time.Now()
response, err := client.ListComputeCapacityTopologyComputeBareMetalHosts(ctx, request)
requestLatency.WithLabelValues("ListComputeCapacityTopologyComputeBareMetalHosts", response.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())
resp, err := client.ListComputeGpuMemoryFabrics(ctx, req)
requestLatency.WithLabelValues("ListComputeGpuMemoryFabrics", resp.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())
if err != nil {
klog.Errorln(err.Error())
break
return err
}

bmhSummary = append(bmhSummary, response.Items...)
for _, fabrics := range resp.Items {
blockMap[*fabrics.ComputeLocalBlockId] = *fabrics.Id
}

if response.OpcNextPage != nil {
request.Page = response.OpcNextPage
if resp.OpcNextPage != nil {
req.Page = resp.OpcNextPage
} else {
break
}
}
return bmhSummary, nil

return nil
}

func getBareMetalHostSummaries(ctx context.Context, client Client) ([]core.ComputeBareMetalHostSummary, error) {
computeCapacityTopology, err := getComputeCapacityTopologies(ctx, client)
func getComputeHostInfo(ctx context.Context, client Client, hosts []core.ComputeHostSummary, blockMap map[string]string) ([]core.ComputeHostSummary, error) {
req := identity.ListAvailabilityDomainsRequest{
CompartmentId: client.TenancyOCID(),
}

timeStart := time.Now()
resp, err := client.ListAvailabilityDomains(ctx, req)
if err != nil {
return nil, fmt.Errorf("unable to get compute capacity topologies: %s", err.Error())
return nil, fmt.Errorf("unable to get availability domains: %v", err)
}
klog.V(4).Infof("Received computeCapacityTopology for %d groups", len(computeCapacityTopology))
requestLatency.WithLabelValues("ListAvailabilityDomains", resp.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())

var bareMetalHostSummaries []core.ComputeBareMetalHostSummary
for _, cct := range computeCapacityTopology {
bareMetalHostSummary, err := getBMHSummaryPerComputeCapacityTopology(ctx, client, *cct.Id)
for _, ad := range resp.Items {
summary, err := getComputeHostSummary(ctx, client, ad.Name)
if err != nil {
return nil, fmt.Errorf("unable to get bare metal hosts info: %s", err.Error())
return nil, fmt.Errorf("unable to get hosts info: %v", err)
}
hosts = append(hosts, summary...)

if err = getLocalBlockMap(ctx, client, ad.Name, blockMap); err != nil {
return nil, fmt.Errorf("unable to get local block map: %v", err)
}
bareMetalHostSummaries = append(bareMetalHostSummaries, bareMetalHostSummary...)
}
klog.V(4).Infof("Returning bareMetalHostSummaries for %d nodes", len(bareMetalHostSummaries))

return bareMetalHostSummaries, nil
klog.V(4).Infof("Returning host info for %d nodes and %d blocks", len(hosts), len(blockMap))

return hosts, nil
}

func toGraph(bareMetalHostSummaries []*core.ComputeBareMetalHostSummary, cis []topology.ComputeInstances) (*topology.Vertex, error) {
func toGraph(hosts []core.ComputeHostSummary, blockMap map[string]string, cis []topology.ComputeInstances) (*topology.Vertex, error) {
instanceToNodeMap := make(map[string]string)
for _, ci := range cis {
for instance, node := range ci.Instances {
Expand All @@ -152,18 +154,25 @@

nodes := make(map[string]*topology.Vertex)
forest := make(map[string]*topology.Vertex)
domainMap := translate.NewDomainMap()

levelWiseSwitchCount := map[level]int{localBlockLevel: 0, networkBlockLevel: 0, hpcIslandLevel: 0}
bareMetalHostSummaries = filterAndSort(bareMetalHostSummaries, instanceToNodeMap)
for _, bmhSummary := range bareMetalHostSummaries {
nodeName := instanceToNodeMap[*bmhSummary.InstanceId]
delete(instanceToNodeMap, *bmhSummary.InstanceId)
hosts = filterAndSort(hosts, instanceToNodeMap)
for _, host := range hosts {
nodeName := instanceToNodeMap[*host.Id]
delete(instanceToNodeMap, *host.Id)

instance := &topology.Vertex{
Name: nodeName,
ID: *bmhSummary.InstanceId,
ID: *host.Id,
}

localBlockId := *host.LocalBlockId

if blockDomain, ok := blockMap[localBlockId]; ok {
domainMap.AddHost(blockDomain, nodeName)
}

localBlockId := *bmhSummary.ComputeLocalBlockId
localBlock, ok := nodes[localBlockId]
if !ok {
levelWiseSwitchCount[localBlockLevel]++
Expand All @@ -176,7 +185,7 @@
}
localBlock.Vertices[instance.ID] = instance

networkBlockId := *bmhSummary.ComputeNetworkBlockId
networkBlockId := *host.NetworkBlockId
networkBlock, ok := nodes[networkBlockId]
if !ok {
levelWiseSwitchCount[networkBlockLevel]++
Expand All @@ -189,7 +198,7 @@
}
networkBlock.Vertices[localBlockId] = localBlock

hpcIslandId := *bmhSummary.ComputeHpcIslandId
hpcIslandId := *host.HpcIslandId
hpcIsland, ok := nodes[hpcIslandId]
if !ok {
levelWiseSwitchCount[hpcIslandLevel]++
Expand Down Expand Up @@ -231,75 +240,61 @@
Vertices: make(map[string]*topology.Vertex),
}
root.Vertices[topology.TopologyTree] = treeRoot
if len(domainMap) != 0 {
root.Vertices[topology.TopologyBlock] = domainMap.ToBlocks()
}
return root, nil

}

func filterAndSort(bareMetalHostSummaries []*core.ComputeBareMetalHostSummary, instanceToNodeMap map[string]string) []*core.ComputeBareMetalHostSummary {
var filtered []*core.ComputeBareMetalHostSummary
for _, bmh := range bareMetalHostSummaries {
if bmh.InstanceId == nil {
klog.V(5).Infof("Instance ID is nil for bmhSummary %s", bmh.String())
func filterAndSort(hosts []core.ComputeHostSummary, instanceToNodeMap map[string]string) []core.ComputeHostSummary {
var filtered []core.ComputeHostSummary
for _, host := range hosts {
if host.Id == nil {
klog.Warningf("InstanceID is nil for host %s", host.String())
continue
}

if bmh.ComputeLocalBlockId == nil {
klog.Warningf("ComputeLocalBlockId is nil for instance %q", *bmh.InstanceId)
missingAncestor.WithLabelValues("localBlock", *bmh.InstanceId).Add(float64(1))
if host.LocalBlockId == nil {
klog.Warningf("LocalBlockId is nil for instance %q", *host.Id)
missingAncestor.WithLabelValues("LocalBlock", *host.Id).Add(float64(1))
continue
}

if bmh.ComputeNetworkBlockId == nil {
klog.Warningf("ComputeNetworkBlockId is nil for instance %q", *bmh.InstanceId)
missingAncestor.WithLabelValues("networkBlock", *bmh.InstanceId).Add(float64(1))
if host.NetworkBlockId == nil {
klog.Warningf("NetworkBlockId is nil for instance %q", *host.Id)
missingAncestor.WithLabelValues("networkBlock", *host.Id).Add(float64(1))
continue
}

if bmh.ComputeHpcIslandId == nil {
klog.Warningf("ComputeHpcIslandId is nil for instance %q", *bmh.InstanceId)
missingAncestor.WithLabelValues("hpcIsland", *bmh.InstanceId).Add(float64(1))
if host.HpcIslandId == nil {
klog.Warningf("HpcIslandId is nil for instance %q", *host.Id)
missingAncestor.WithLabelValues("hpcIsland", *host.Id).Add(float64(1))
continue
}

if _, ok := instanceToNodeMap[*bmh.InstanceId]; ok {
klog.V(4).Infof("Adding bmhSummary %s", bmh.String())
filtered = append(filtered, bmh)
if _, ok := instanceToNodeMap[*host.Id]; ok {
klog.V(4).Infof("Adding host %s", host.String())
filtered = append(filtered, host)
} else {
klog.V(4).Infof("Skipping bmhSummary %s", bmh.String())
klog.V(4).Infof("Skipping host %s", host.String())
}
}

sort.Slice(filtered, func(i, j int) bool {
if filtered[i].ComputeHpcIslandId != filtered[j].ComputeHpcIslandId {
return *filtered[i].ComputeHpcIslandId < *filtered[j].ComputeHpcIslandId
if filtered[i].HpcIslandId != filtered[j].HpcIslandId {
return *filtered[i].HpcIslandId < *filtered[j].HpcIslandId
}

if filtered[i].ComputeNetworkBlockId != filtered[j].ComputeNetworkBlockId {
return *filtered[i].ComputeNetworkBlockId < *filtered[j].ComputeNetworkBlockId
if filtered[i].NetworkBlockId != filtered[j].NetworkBlockId {
return *filtered[i].NetworkBlockId < *filtered[j].NetworkBlockId
}

if filtered[i].ComputeLocalBlockId != filtered[j].ComputeLocalBlockId {
return *filtered[i].ComputeLocalBlockId < *filtered[j].ComputeLocalBlockId
if filtered[i].LocalBlockId != filtered[j].LocalBlockId {
return *filtered[i].LocalBlockId < *filtered[j].LocalBlockId
}

return *filtered[i].InstanceId < *filtered[j].InstanceId
return *filtered[i].Id < *filtered[j].Id
})
return filtered
}

func generateInstanceTopology(ctx context.Context, factory ClientFactory, ci *topology.ComputeInstances, bareMetalHostSummaries []*core.ComputeBareMetalHostSummary) ([]*core.ComputeBareMetalHostSummary, error) {
client, err := factory(ci.Region)
if err != nil {
return nil, err
}

bmh, err := getBareMetalHostSummaries(ctx, client)
if err != nil {
return nil, fmt.Errorf("unable to populate compute capacity topology: %s", err.Error())
}

for _, bm := range bmh {
bareMetalHostSummaries = append(bareMetalHostSummaries, &bm)
}
return bareMetalHostSummaries, nil
}
Loading
Loading