Skip to content

Commit

Permalink
implement mock API for OCI
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Shmulevich <[email protected]>
  • Loading branch information
dmitsh committed Dec 20, 2024
1 parent cfe5bfc commit f05d8f1
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 128 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,5 @@ replace (
github.com/aws/aws-sdk-go-v2/credentials v1.17.45 => github.com/pkedy/aws-sdk-go-v2/credentials v0.0.0-20241115203348-0198b6c98cd9
github.com/aws/aws-sdk-go-v2/service/autoscaling v1.48.0 => github.com/pkedy/aws-sdk-go-v2/service/autoscaling v0.0.0-20241115203348-0198b6c98cd9
github.com/aws/aws-sdk-go-v2/service/ec2 v1.187.0 => github.com/pkedy/aws-sdk-go-v2/service/ec2 v0.0.0-20241115203348-0198b6c98cd9
github.com/oracle/oci-go-sdk/v65 v65.78.0 => ../../oracle/oci-go-sdk/v65
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/oracle/oci-go-sdk/v65 v65.78.0 h1:iM7lFFA7cJkUD4tmrlsAHWgL3HuTuF9mdvTAliMkcFA=
github.com/oracle/oci-go-sdk/v65 v65.78.0/go.mod h1:IBEV9l1qBzUpo7zgGaRUhbB05BVfcDGYRFBCPlTcPp0=
github.com/pkedy/aws-sdk-go-v2 v0.0.0-20241115203348-0198b6c98cd9 h1:QhMFD0yJ9nEj4BCX9lREQ7twLM5oEL8y9UwKsRNJamo=
github.com/pkedy/aws-sdk-go-v2 v0.0.0-20241115203348-0198b6c98cd9/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/pkedy/aws-sdk-go-v2/service/ec2 v0.0.0-20241115203348-0198b6c98cd9 h1:wA7yd0OxRH3EWuKaJ7ijRowlWgH2b99nrP+d10+0Sc4=
Expand Down
225 changes: 110 additions & 115 deletions pkg/providers/oci/instance_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package oci
import (
"context"
"fmt"
"net/http"
"sort"
"time"

Expand All @@ -29,6 +28,7 @@ import (

"github.com/NVIDIA/topograph/pkg/metrics"
"github.com/NVIDIA/topograph/pkg/topology"
"github.com/NVIDIA/topograph/pkg/translate"
)

type level int
Expand All @@ -39,109 +39,111 @@ const (
hpcIslandLevel
)

func GenerateInstanceTopology(ctx context.Context, factory ClientFactory, cis []topology.ComputeInstances) ([]*core.ComputeBareMetalHostSummary, error) {
var err error
bareMetalHostSummaries := []*core.ComputeBareMetalHostSummary{}
func GenerateInstanceTopology(ctx context.Context, factory ClientFactory, pageSize *int, cis []topology.ComputeInstances) (hosts []core.ComputeHostSummary, blockMap map[string]string, err error) {
blockMap = make(map[string]string)

for _, ci := range cis {
if bareMetalHostSummaries, err = generateInstanceTopology(ctx, factory, &ci, bareMetalHostSummaries); err != nil {
return nil, err
var client Client
if client, err = factory(ci.Region, pageSize); err != nil {
return
}
if hosts, err = getComputeHostInfo(ctx, client, hosts, blockMap); err != nil {
return
}
}

return bareMetalHostSummaries, nil
return
}

func getComputeCapacityTopologies(ctx context.Context, client Client) (cct []core.ComputeCapacityTopologySummary, err error) {
compartmentId := client.TenancyOCID()

adRequest := identity.ListAvailabilityDomainsRequest{
CompartmentId: &compartmentId,
}
func getComputeHostSummary(ctx context.Context, client Client, availabilityDomain *string) ([]core.ComputeHostSummary, error) {
var hosts []core.ComputeHostSummary

timeStart := time.Now()
ads, err := client.ListAvailabilityDomains(ctx, adRequest)
if err != nil {
return cct, fmt.Errorf("unable to get AD: %v", err)
req := core.ListComputeHostsRequest{
CompartmentId: client.TenancyOCID(),
AvailabilityDomain: availabilityDomain,
Limit: client.Limit(),
}
requestLatency.WithLabelValues("ListAvailabilityDomains", ads.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())

for _, ad := range ads.Items {
cctRequest := core.ListComputeCapacityTopologiesRequest{
CompartmentId: &compartmentId,
AvailabilityDomain: ad.Name,
for {
timeStart := time.Now()
resp, err := client.ListComputeHosts(ctx, req)
requestLatency.WithLabelValues("ListComputeHosts", resp.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())
if err != nil {
return nil, err
}

for {
timeStart := time.Now()
resp, err := client.ListComputeCapacityTopologies(ctx, cctRequest)
requestLatency.WithLabelValues("ListComputeCapacityTopologies", resp.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())
if err != nil {
if resp.HTTPResponse().StatusCode == http.StatusNotFound {
return cct, fmt.Errorf("%v for getting ComputeCapacityTopology in %s: %v", resp.HTTPResponse().StatusCode, *ad.Name, err)
} else {
return cct, fmt.Errorf("unable to get ComputeCapacity Topologies in %s : %v", *ad.Name, err)
}
}
cct = append(cct, resp.Items...)
klog.V(4).Infof("Received computeCapacityTopology %d groups; processed %d", len(resp.Items), len(cct))
if resp.OpcNextPage != nil {
cctRequest.Page = resp.OpcNextPage
} else {
break
}
hosts = append(hosts, resp.Items...)

if resp.OpcNextPage != nil {
req.Page = resp.OpcNextPage
} else {
break
}
}

return cct, nil
return hosts, nil
}

func getBMHSummaryPerComputeCapacityTopology(ctx context.Context, client Client, topologyID string) (bmhSummary []core.ComputeBareMetalHostSummary, err error) {
compartmentId := client.TenancyOCID()
request := core.ListComputeCapacityTopologyComputeBareMetalHostsRequest{
ComputeCapacityTopologyId: &topologyID,
CompartmentId: &compartmentId,
// getLocalBlockMap returns a map between LocalBlocks and ComputeGpuMemoryFabrics
func getLocalBlockMap(ctx context.Context, client Client, availabilityDomain *string, blockMap map[string]string) error {
req := core.ListComputeGpuMemoryFabricsRequest{
CompartmentId: client.TenancyOCID(),
AvailabilityDomain: availabilityDomain,
Limit: client.Limit(),
}

for {
timeStart := time.Now()
response, err := client.ListComputeCapacityTopologyComputeBareMetalHosts(ctx, request)
requestLatency.WithLabelValues("ListComputeCapacityTopologyComputeBareMetalHosts", response.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())
resp, err := client.ListComputeGpuMemoryFabrics(ctx, req)
requestLatency.WithLabelValues("ListComputeGpuMemoryFabrics", resp.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())
if err != nil {
klog.Errorln(err.Error())
break
return err
}

bmhSummary = append(bmhSummary, response.Items...)
for _, fabrics := range resp.Items {
blockMap[*fabrics.ComputeLocalBlockId] = *fabrics.Id
}

if response.OpcNextPage != nil {
request.Page = response.OpcNextPage
if resp.OpcNextPage != nil {
req.Page = resp.OpcNextPage
} else {
break
}
}
return bmhSummary, nil

return nil
}

func getBareMetalHostSummaries(ctx context.Context, client Client) ([]core.ComputeBareMetalHostSummary, error) {
computeCapacityTopology, err := getComputeCapacityTopologies(ctx, client)
func getComputeHostInfo(ctx context.Context, client Client, hosts []core.ComputeHostSummary, blockMap map[string]string) ([]core.ComputeHostSummary, error) {
req := identity.ListAvailabilityDomainsRequest{
CompartmentId: client.TenancyOCID(),
}

timeStart := time.Now()
resp, err := client.ListAvailabilityDomains(ctx, req)
if err != nil {
return nil, fmt.Errorf("unable to get compute capacity topologies: %s", err.Error())
return nil, fmt.Errorf("unable to get availability domains: %v", err)
}
klog.V(4).Infof("Received computeCapacityTopology for %d groups", len(computeCapacityTopology))
requestLatency.WithLabelValues("ListAvailabilityDomains", resp.HTTPResponse().Status).Observe(time.Since(timeStart).Seconds())

var bareMetalHostSummaries []core.ComputeBareMetalHostSummary
for _, cct := range computeCapacityTopology {
bareMetalHostSummary, err := getBMHSummaryPerComputeCapacityTopology(ctx, client, *cct.Id)
for _, ad := range resp.Items {
summary, err := getComputeHostSummary(ctx, client, ad.Name)
if err != nil {
return nil, fmt.Errorf("unable to get bare metal hosts info: %s", err.Error())
return nil, fmt.Errorf("unable to get hosts info: %v", err)
}
hosts = append(hosts, summary...)

if err = getLocalBlockMap(ctx, client, ad.Name, blockMap); err != nil {
return nil, fmt.Errorf("unable to get local block map: %v", err)
}
bareMetalHostSummaries = append(bareMetalHostSummaries, bareMetalHostSummary...)
}
klog.V(4).Infof("Returning bareMetalHostSummaries for %d nodes", len(bareMetalHostSummaries))

return bareMetalHostSummaries, nil
klog.V(4).Infof("Returning host info for %d nodes and %d blocks", len(hosts), len(blockMap))

return hosts, nil
}

func toGraph(bareMetalHostSummaries []*core.ComputeBareMetalHostSummary, cis []topology.ComputeInstances) (*topology.Vertex, error) {
func toGraph(hosts []core.ComputeHostSummary, blockMap map[string]string, cis []topology.ComputeInstances) (*topology.Vertex, error) {
instanceToNodeMap := make(map[string]string)
for _, ci := range cis {
for instance, node := range ci.Instances {
Expand All @@ -152,18 +154,25 @@ func toGraph(bareMetalHostSummaries []*core.ComputeBareMetalHostSummary, cis []t

nodes := make(map[string]*topology.Vertex)
forest := make(map[string]*topology.Vertex)
domainMap := translate.NewDomainMap()

levelWiseSwitchCount := map[level]int{localBlockLevel: 0, networkBlockLevel: 0, hpcIslandLevel: 0}
bareMetalHostSummaries = filterAndSort(bareMetalHostSummaries, instanceToNodeMap)
for _, bmhSummary := range bareMetalHostSummaries {
nodeName := instanceToNodeMap[*bmhSummary.InstanceId]
delete(instanceToNodeMap, *bmhSummary.InstanceId)
hosts = filterAndSort(hosts, instanceToNodeMap)
for _, host := range hosts {
nodeName := instanceToNodeMap[*host.InstanceId]
delete(instanceToNodeMap, *host.InstanceId)

instance := &topology.Vertex{
Name: nodeName,
ID: *bmhSummary.InstanceId,
ID: *host.InstanceId,
}

localBlockId := *host.LocalBlockId

if blockDomain, ok := blockMap[localBlockId]; ok {
domainMap.AddHost(blockDomain, nodeName)
}

localBlockId := *bmhSummary.ComputeLocalBlockId
localBlock, ok := nodes[localBlockId]
if !ok {
levelWiseSwitchCount[localBlockLevel]++
Expand All @@ -176,7 +185,7 @@ func toGraph(bareMetalHostSummaries []*core.ComputeBareMetalHostSummary, cis []t
}
localBlock.Vertices[instance.ID] = instance

networkBlockId := *bmhSummary.ComputeNetworkBlockId
networkBlockId := *host.NetworkBlockId
networkBlock, ok := nodes[networkBlockId]
if !ok {
levelWiseSwitchCount[networkBlockLevel]++
Expand All @@ -189,7 +198,7 @@ func toGraph(bareMetalHostSummaries []*core.ComputeBareMetalHostSummary, cis []t
}
networkBlock.Vertices[localBlockId] = localBlock

hpcIslandId := *bmhSummary.ComputeHpcIslandId
hpcIslandId := *host.HpcIslandId
hpcIsland, ok := nodes[hpcIslandId]
if !ok {
levelWiseSwitchCount[hpcIslandLevel]++
Expand Down Expand Up @@ -231,75 +240,61 @@ func toGraph(bareMetalHostSummaries []*core.ComputeBareMetalHostSummary, cis []t
Vertices: make(map[string]*topology.Vertex),
}
root.Vertices[topology.TopologyTree] = treeRoot
if len(domainMap) != 0 {
root.Vertices[topology.TopologyBlock] = domainMap.ToBlocks()
}
return root, nil

}

func filterAndSort(bareMetalHostSummaries []*core.ComputeBareMetalHostSummary, instanceToNodeMap map[string]string) []*core.ComputeBareMetalHostSummary {
var filtered []*core.ComputeBareMetalHostSummary
for _, bmh := range bareMetalHostSummaries {
if bmh.InstanceId == nil {
klog.V(5).Infof("Instance ID is nil for bmhSummary %s", bmh.String())
func filterAndSort(hosts []core.ComputeHostSummary, instanceToNodeMap map[string]string) []core.ComputeHostSummary {
var filtered []core.ComputeHostSummary
for _, host := range hosts {
if host.InstanceId == nil {
klog.V(5).Infof("Instance ID is nil for host %s", host.String())
continue
}

if bmh.ComputeLocalBlockId == nil {
klog.Warningf("ComputeLocalBlockId is nil for instance %q", *bmh.InstanceId)
missingAncestor.WithLabelValues("localBlock", *bmh.InstanceId).Add(float64(1))
if host.LocalBlockId == nil {
klog.Warningf("LocalBlockId is nil for instance %q", *host.InstanceId)
missingAncestor.WithLabelValues("LocalBlock", *host.InstanceId).Add(float64(1))
continue
}

if bmh.ComputeNetworkBlockId == nil {
klog.Warningf("ComputeNetworkBlockId is nil for instance %q", *bmh.InstanceId)
missingAncestor.WithLabelValues("networkBlock", *bmh.InstanceId).Add(float64(1))
if host.NetworkBlockId == nil {
klog.Warningf("NetworkBlockId is nil for instance %q", *host.InstanceId)
missingAncestor.WithLabelValues("networkBlock", *host.InstanceId).Add(float64(1))
continue
}

if bmh.ComputeHpcIslandId == nil {
klog.Warningf("ComputeHpcIslandId is nil for instance %q", *bmh.InstanceId)
missingAncestor.WithLabelValues("hpcIsland", *bmh.InstanceId).Add(float64(1))
if host.HpcIslandId == nil {
klog.Warningf("HpcIslandId is nil for instance %q", *host.InstanceId)
missingAncestor.WithLabelValues("hpcIsland", *host.InstanceId).Add(float64(1))
continue
}

if _, ok := instanceToNodeMap[*bmh.InstanceId]; ok {
klog.V(4).Infof("Adding bmhSummary %s", bmh.String())
filtered = append(filtered, bmh)
if _, ok := instanceToNodeMap[*host.InstanceId]; ok {
klog.V(4).Infof("Adding host %s", host.String())
filtered = append(filtered, host)
} else {
klog.V(4).Infof("Skipping bmhSummary %s", bmh.String())
klog.V(4).Infof("Skipping host %s", host.String())
}
}

sort.Slice(filtered, func(i, j int) bool {
if filtered[i].ComputeHpcIslandId != filtered[j].ComputeHpcIslandId {
return *filtered[i].ComputeHpcIslandId < *filtered[j].ComputeHpcIslandId
if filtered[i].HpcIslandId != filtered[j].HpcIslandId {
return *filtered[i].HpcIslandId < *filtered[j].HpcIslandId
}

if filtered[i].ComputeNetworkBlockId != filtered[j].ComputeNetworkBlockId {
return *filtered[i].ComputeNetworkBlockId < *filtered[j].ComputeNetworkBlockId
if filtered[i].NetworkBlockId != filtered[j].NetworkBlockId {
return *filtered[i].NetworkBlockId < *filtered[j].NetworkBlockId
}

if filtered[i].ComputeLocalBlockId != filtered[j].ComputeLocalBlockId {
return *filtered[i].ComputeLocalBlockId < *filtered[j].ComputeLocalBlockId
if filtered[i].LocalBlockId != filtered[j].LocalBlockId {
return *filtered[i].LocalBlockId < *filtered[j].LocalBlockId
}

return *filtered[i].InstanceId < *filtered[j].InstanceId
})
return filtered
}

func generateInstanceTopology(ctx context.Context, factory ClientFactory, ci *topology.ComputeInstances, bareMetalHostSummaries []*core.ComputeBareMetalHostSummary) ([]*core.ComputeBareMetalHostSummary, error) {
client, err := factory(ci.Region)
if err != nil {
return nil, err
}

bmh, err := getBareMetalHostSummaries(ctx, client)
if err != nil {
return nil, fmt.Errorf("unable to populate compute capacity topology: %s", err.Error())
}

for _, bm := range bmh {
bareMetalHostSummaries = append(bareMetalHostSummaries, &bm)
}
return bareMetalHostSummaries, nil
}
Loading

0 comments on commit f05d8f1

Please sign in to comment.