diff --git a/plugin/kaytu/compute_instance.go b/plugin/kaytu/compute_instance.go index 4b3368d..ac59f3b 100644 --- a/plugin/kaytu/compute_instance.go +++ b/plugin/kaytu/compute_instance.go @@ -39,31 +39,39 @@ type GcpComputeInstanceRightsizingRecommendation struct { } type GcpComputeInstanceWastageRequest struct { - RequestId *string `json:"requestId"` - CliVersion *string `json:"cliVersion"` - Identification map[string]string `json:"identification"` - Instance GcpComputeInstance `json:"instance"` - Disks []GcpComputeDisk `json:"disks"` - Metrics map[string][]Datapoint `json:"metrics"` - DiskCapacityUsed map[string]float64 `json:"diskCapacityUsed"` - Region string `json:"region"` - Preferences map[string]*string `json:"preferences"` - Loading bool `json:"loading"` + RequestId *string `json:"requestId"` + CliVersion *string `json:"cliVersion"` + Identification map[string]string `json:"identification"` + Instance GcpComputeInstance `json:"instance"` + Disks []GcpComputeDisk `json:"disks"` + Metrics map[string][]Datapoint `json:"metrics"` + DisksMetrics map[string]map[string][]Datapoint `json:"diskMetrics"` + Region string `json:"region"` + Preferences map[string]*string `json:"preferences"` + Loading bool `json:"loading"` } type RightsizingGcpComputeDisk struct { - Zone string `json:"zone"` - Region string `json:"region"` - DiskType string `json:"diskType"` - DiskSize *int64 `json:"diskSize"` - Cost float64 `json:"cost"` + Zone string `json:"zone"` + Region string `json:"region"` + DiskType string `json:"diskType"` + DiskSize int64 `json:"diskSize"` + ReadIopsLimit int64 `json:"readIopsLimit"` + WriteIopsLimit int64 `json:"writeIopsLimit"` + ReadThroughputLimit float64 `json:"readThroughputLimit"` + WriteThroughputLimit float64 `json:"writeThroughputLimit"` + + Cost float64 `json:"cost"` } type GcpComputeDiskRecommendation struct { Current RightsizingGcpComputeDisk Recommended *RightsizingGcpComputeDisk - UsedCapacity float64 `json:"usedCapacity"` + ReadIops Usage `json:"readIops"` + WriteIops Usage `json:"writeIops"` + ReadThroughput Usage `json:"readThroughput"` + WriteThroughput Usage `json:"writeThroughput"` Description string `json:"description"` } diff --git a/plugin/preferences/default.go b/plugin/preferences/default.go index b509f61..e3e3708 100644 --- a/plugin/preferences/default.go +++ b/plugin/preferences/default.go @@ -13,4 +13,8 @@ var DefaultComputeEnginePreferences = []*golang.PreferenceItem{ {Service: "ComputeInstance", Key: "MemoryGB", Alias: "Memory", IsNumber: true, Unit: "GiB"}, {Service: "ComputeInstance", Key: "CPUBreathingRoom", IsNumber: true, Value: wrapperspb.String("10"), PreventPinning: true, Unit: "%"}, {Service: "ComputeInstance", Key: "MemoryBreathingRoom", IsNumber: true, Value: wrapperspb.String("10"), PreventPinning: true, Unit: "%"}, + {Service: "ComputeInstance", Key: "ExcludeUpsizingFeature", Value: wrapperspb.String("Yes"), PreventPinning: true, PossibleValues: []string{"No", "Yes"}}, + + {Service: "ComputeDisk", Key: "DiskType"}, + {Service: "ComputeDisk", Key: "DiskSizeGb", IsNumber: true, Unit: "GiB"}, } diff --git a/plugin/processor/compute_instance/compute_instance.go b/plugin/processor/compute_instance/compute_instance.go index 8449ab3..dc7f196 100644 --- a/plugin/processor/compute_instance/compute_instance.go +++ b/plugin/processor/compute_instance/compute_instance.go @@ -1,7 +1,11 @@ package compute_instance import ( - "log" + "fmt" + "github.com/kaytu-io/kaytu/pkg/utils" + "strconv" + "strings" + "sync/atomic" "github.com/kaytu-io/kaytu/pkg/plugin/proto/src/golang" "github.com/kaytu-io/kaytu/pkg/plugin/sdk" @@ -17,6 +21,7 @@ type ComputeInstanceProcessor struct { publishResultSummary func(summary *golang.ResultSummary) kaytuAcccessToken string jobQueue *sdk.JobQueue + lazyloadCounter atomic.Uint32 } func NewComputeInstanceProcessor( @@ -27,7 +32,6 @@ func NewComputeInstanceProcessor( kaytuAcccessToken string, jobQueue *sdk.JobQueue, ) *ComputeInstanceProcessor { - log.Println("creating processor") r := &ComputeInstanceProcessor{ provider: prv, metricProvider: metricPrv, @@ -36,8 +40,7 @@ func NewComputeInstanceProcessor( publishResultSummary: publishResultSummary, kaytuAcccessToken: kaytuAcccessToken, jobQueue: jobQueue, - // configuration: configurations, - // lazyloadCounter: lazyloadCounter, + lazyloadCounter: atomic.Uint32{}, } jobQueue.Push(NewListComputeInstancesJob(r)) @@ -45,13 +48,97 @@ func NewComputeInstanceProcessor( } func (m *ComputeInstanceProcessor) ReEvaluate(id string, items []*golang.PreferenceItem) { - log.Println("Reevaluate unimplemented") - // v, _ := m.items.Get(id) - // v.Preferences = items - // m.items.Set(id, v) - // m.jobQueue.Push(NewOptimizeEC2InstanceJob(m, v)) + v, _ := m.items.Get(id) + v.Preferences = items + m.items.Set(id, v) + fmt.Println("HERE===================") + fmt.Println("Instance Metrics", len(v.Metrics)) + fmt.Println("Disk Metrics", len(v.DisksMetrics)) + fmt.Println("Disks", len(v.Disks)) + v.OptimizationLoading = true + m.publishOptimizationItem(v.ToOptimizationItem()) + m.jobQueue.Push(NewOptimizeComputeInstancesJob(m, v)) } -func (p *ComputeInstanceProcessor) ExportNonInteractive() *golang.NonInteractiveExport { - return nil +func (m *ComputeInstanceProcessor) ExportNonInteractive() *golang.NonInteractiveExport { + return &golang.NonInteractiveExport{ + Csv: m.exportCsv(), + } +} + +func (m *ComputeInstanceProcessor) exportCsv() []*golang.CSVRow { + headers := []string{ + "Project ID", "Region", "Resource Type", "Resource ID", "Resource Name", "Platform", + "Device Runtime (Hrs)", "Current Cost", "Recommendation Cost", "Net Savings", + "Current Spec", "Suggested Spec", "Parent Device", "Justification", "Additional Details", + } + var rows []*golang.CSVRow + rows = append(rows, &golang.CSVRow{Row: headers}) + + m.items.Range(func(key string, value ComputeInstanceItem) bool { + var additionalDetails []string + var rightSizingCost, saving, recSpec string + if value.Wastage.RightSizing.Recommended != nil { + rightSizingCost = utils.FormatPriceFloat(value.Wastage.RightSizing.Recommended.Cost) + saving = utils.FormatPriceFloat(value.Wastage.RightSizing.Current.Cost - value.Wastage.RightSizing.Recommended.Cost) + recSpec = value.Wastage.RightSizing.Recommended.MachineType + + additionalDetails = append(additionalDetails, + fmt.Sprintf("Machine Type:: Current: %s - Recommended: %s", value.Wastage.RightSizing.Current.MachineType, + value.Wastage.RightSizing.Recommended.MachineType)) + additionalDetails = append(additionalDetails, + fmt.Sprintf("Region:: Current: %s - Recommended: %s", value.Wastage.RightSizing.Current.Region, + value.Wastage.RightSizing.Recommended.Region)) + additionalDetails = append(additionalDetails, + fmt.Sprintf("CPU:: Current: %d - Recommended: %d", value.Wastage.RightSizing.Current.CPU, + value.Wastage.RightSizing.Recommended.CPU)) + additionalDetails = append(additionalDetails, + fmt.Sprintf("Memory:: Current: %d - Recommended: %d", value.Wastage.RightSizing.Current.MemoryMb, + value.Wastage.RightSizing.Recommended.MemoryMb)) + } + computeRow := []string{ + value.ProjectId, value.Region, "Compute Instance", value.Id, value.Name, value.Platform, + "730 Hrs", utils.FormatPriceFloat(value.Wastage.RightSizing.Current.Cost), rightSizingCost, saving, + value.Wastage.RightSizing.Current.MachineType, recSpec, "None", value.Wastage.RightSizing.Description, strings.Join(additionalDetails, "---")} + + rows = append(rows, &golang.CSVRow{Row: computeRow}) + + for _, d := range value.Disks { + dKey := strconv.FormatUint(d.Id, 10) + disk := value.Wastage.VolumeRightSizing[dKey] + var diskAdditionalDetails []string + var diskRightSizingCost, diskSaving, diskRecSpec string + if disk.Recommended != nil { + diskRightSizingCost = utils.FormatPriceFloat(disk.Recommended.Cost) + diskSaving = utils.FormatPriceFloat(disk.Current.Cost - disk.Recommended.Cost) + diskRecSpec = fmt.Sprintf("%s / %d GB", disk.Recommended.DiskType, disk.Recommended.DiskSize) + + diskAdditionalDetails = append(diskAdditionalDetails, + fmt.Sprintf("Region:: Current: %s - Recommended: %s", disk.Current.Region, + disk.Recommended.Region)) + diskAdditionalDetails = append(diskAdditionalDetails, + fmt.Sprintf("ReadIopsExpectation:: Current: %d - Recommended: %d", disk.Current.ReadIopsLimit, + disk.Recommended.ReadIopsLimit)) + diskAdditionalDetails = append(diskAdditionalDetails, + fmt.Sprintf("WriteIopsExpectation:: Current: %d - Recommended: %d", disk.Current.WriteIopsLimit, + disk.Recommended.WriteIopsLimit)) + diskAdditionalDetails = append(diskAdditionalDetails, + fmt.Sprintf("ReadThroughputExpectation:: Current: %.2f - Recommended: %.2f", disk.Current.ReadThroughputLimit, + disk.Recommended.ReadThroughputLimit)) + diskAdditionalDetails = append(diskAdditionalDetails, + fmt.Sprintf("WriteThroughputExpectation:: Current: %.2f - Recommended: %.2f", disk.Current.WriteThroughputLimit, + disk.Recommended.WriteThroughputLimit)) + } + diskRow := []string{ + value.ProjectId, value.Region, "Compute Disk", dKey, d.Name, "N/A", + "730 Hrs", utils.FormatPriceFloat(disk.Current.Cost), diskRightSizingCost, diskSaving, + fmt.Sprintf("%s / %d GB", disk.Current.DiskType, disk.Current.DiskSize), diskRecSpec, + "None", value.Wastage.RightSizing.Description, strings.Join(diskAdditionalDetails, "---")} + + rows = append(rows, &golang.CSVRow{Row: diskRow}) + } + + return true + }) + return rows } diff --git a/plugin/processor/compute_instance/compute_instance_item.go b/plugin/processor/compute_instance/compute_instance_item.go index 67785f9..6b13b44 100644 --- a/plugin/processor/compute_instance/compute_instance_item.go +++ b/plugin/processor/compute_instance/compute_instance_item.go @@ -1,15 +1,15 @@ package compute_instance import ( + "cloud.google.com/go/compute/apiv1/computepb" "fmt" - "google.golang.org/api/compute/v1" - "maps" - "strconv" - "github.com/kaytu-io/kaytu/pkg/plugin/proto/src/golang" "github.com/kaytu-io/kaytu/pkg/utils" "github.com/kaytu-io/plugin-gcp/plugin/kaytu" + "google.golang.org/api/compute/v1" "google.golang.org/protobuf/types/known/wrapperspb" + "maps" + "strconv" ) type ComputeInstanceItem struct { @@ -24,8 +24,10 @@ type ComputeInstanceItem struct { Skipped bool LazyLoadingEnabled bool SkipReason string + Instance *computepb.Instance Disks []compute.Disk Metrics map[string][]kaytu.Datapoint + DisksMetrics map[string]map[string][]kaytu.Datapoint Wastage kaytu.GcpComputeInstanceWastageResponse } @@ -50,9 +52,9 @@ func (i ComputeInstanceItem) ComputeInstanceDevice() (*golang.ChartRow, map[stri Value: utils.FormatPriceFloat(i.Wastage.RightSizing.Current.Cost), } - ZoneProperty := &golang.Property{ - Key: "Zone", - Current: i.Wastage.RightSizing.Current.Zone, + RegionProperty := &golang.Property{ + Key: "Region", + Current: i.Wastage.RightSizing.Current.Region, } MachineTypeProperty := &golang.Property{ @@ -66,6 +68,8 @@ func (i ComputeInstanceItem) ComputeInstanceDevice() (*golang.ChartRow, map[stri CPUProperty := &golang.Property{ Key: " CPU", Current: fmt.Sprintf("%d", i.Wastage.RightSizing.Current.CPU), + Average: utils.Percentage(i.Wastage.RightSizing.CPU.Avg), + Max: utils.Percentage(i.Wastage.RightSizing.CPU.Max), } memoryProperty := &golang.Property{ @@ -90,7 +94,7 @@ func (i ComputeInstanceItem) ComputeInstanceDevice() (*golang.ChartRow, map[stri row.Values["savings"] = &golang.ChartRowItem{ Value: utils.FormatPriceFloat(i.Wastage.RightSizing.Current.Cost - i.Wastage.RightSizing.Recommended.Cost), } - ZoneProperty.Recommended = i.Wastage.RightSizing.Recommended.Zone + RegionProperty.Recommended = i.Wastage.RightSizing.Recommended.Region MachineTypeProperty.Recommended = i.Wastage.RightSizing.Recommended.MachineType CPUProperty.Recommended = fmt.Sprintf("%d", i.Wastage.RightSizing.Recommended.CPU) memoryProperty.Recommended = fmt.Sprintf("%d MB", i.Wastage.RightSizing.Recommended.MemoryMb) @@ -99,7 +103,7 @@ func (i ComputeInstanceItem) ComputeInstanceDevice() (*golang.ChartRow, map[stri props := make(map[string]*golang.Properties) properties := &golang.Properties{} - properties.Properties = append(properties.Properties, ZoneProperty) + properties.Properties = append(properties.Properties, RegionProperty) properties.Properties = append(properties.Properties, MachineTypeProperty) properties.Properties = append(properties.Properties, MachineFamilyProperty) properties.Properties = append(properties.Properties, &golang.Property{ @@ -155,7 +159,31 @@ func (i ComputeInstanceItem) ComputeDiskDevice() ([]*golang.ChartRow, map[string } DiskSizeProperty := &golang.Property{ Key: "Disk Size", - Current: fmt.Sprintf("%d GB", d.SizeGb), + Current: fmt.Sprintf("%d GB", disk.Current.DiskSize), + } + DiskReadIopsProperty := &golang.Property{ + Key: " Read IOPS Expectation", + Current: fmt.Sprintf("%d", disk.Current.ReadIopsLimit), + Average: utils.PFloat64ToString(disk.ReadIops.Avg), + Max: utils.PFloat64ToString(disk.ReadIops.Max), + } + DiskWriteIopsProperty := &golang.Property{ + Key: " Write IOPS Expectation", + Current: fmt.Sprintf("%d", disk.Current.WriteIopsLimit), + Average: utils.PFloat64ToString(disk.WriteIops.Avg), + Max: utils.PFloat64ToString(disk.WriteIops.Max), + } + DiskReadThroughputProperty := &golang.Property{ + Key: " Read Throughput Expectation", + Current: fmt.Sprintf("%.2f Mb", disk.Current.ReadThroughputLimit), + Average: fmt.Sprintf("%s Mb", utils.PFloat64ToString(disk.ReadThroughput.Avg)), + Max: fmt.Sprintf("%s Mb", utils.PFloat64ToString(disk.ReadThroughput.Max)), + } + DiskWriteThroughputProperty := &golang.Property{ + Key: " Write Throughput Expectation", + Current: fmt.Sprintf("%.2f Mb", disk.Current.WriteThroughputLimit), + Average: fmt.Sprintf("%s Mb", utils.PFloat64ToString(disk.WriteThroughput.Avg)), + Max: fmt.Sprintf("%s Mb", utils.PFloat64ToString(disk.WriteThroughput.Max)), } if disk.Recommended != nil { @@ -167,9 +195,11 @@ func (i ComputeInstanceItem) ComputeDiskDevice() ([]*golang.ChartRow, map[string } RegionProperty.Recommended = disk.Recommended.Region DiskTypeProperty.Recommended = disk.Recommended.DiskType - if disk.Recommended.DiskSize != nil { - DiskSizeProperty.Recommended = fmt.Sprintf("%d GB", *disk.Recommended.DiskSize) - } + DiskReadIopsProperty.Recommended = fmt.Sprintf("%d", disk.Recommended.ReadIopsLimit) + DiskWriteIopsProperty.Recommended = fmt.Sprintf("%d", disk.Recommended.WriteIopsLimit) + DiskReadThroughputProperty.Recommended = fmt.Sprintf("%.2f Mb", disk.Recommended.ReadThroughputLimit) + DiskWriteThroughputProperty.Recommended = fmt.Sprintf("%.2f Mb", disk.Recommended.WriteThroughputLimit) + DiskSizeProperty.Recommended = fmt.Sprintf("%d GB", disk.Recommended.DiskSize) } properties := &golang.Properties{} @@ -177,6 +207,16 @@ func (i ComputeInstanceItem) ComputeDiskDevice() ([]*golang.ChartRow, map[string properties.Properties = append(properties.Properties, RegionProperty) properties.Properties = append(properties.Properties, DiskTypeProperty) properties.Properties = append(properties.Properties, DiskSizeProperty) + properties.Properties = append(properties.Properties, &golang.Property{ + Key: "IOPS", + }) + properties.Properties = append(properties.Properties, DiskReadIopsProperty) + properties.Properties = append(properties.Properties, DiskWriteIopsProperty) + properties.Properties = append(properties.Properties, &golang.Property{ + Key: "Throughput", + }) + properties.Properties = append(properties.Properties, DiskReadThroughputProperty) + properties.Properties = append(properties.Properties, DiskWriteThroughputProperty) props[key] = properties rows = append(rows, &row) @@ -193,10 +233,6 @@ func (i ComputeInstanceItem) Devices() ([]*golang.ChartRow, map[string]*golang.P instanceRows, instanceProps := i.ComputeInstanceDevice() diskRows, diskProps := i.ComputeDiskDevice() - fmt.Println("==========") - fmt.Println("disks", diskRows) - fmt.Println("instance", *instanceRows) - deviceRows = append(deviceRows, instanceRows) deviceRows = append(deviceRows, diskRows...) maps.Copy(deviceProps, instanceProps) @@ -210,11 +246,21 @@ func (i ComputeInstanceItem) ToOptimizationItem() *golang.ChartOptimizationItem deviceRows, deviceProps := i.Devices() status := "" - if i.Wastage.RightSizing.Recommended != nil { + if i.Skipped { + status = fmt.Sprintf("skipped - %s", i.SkipReason) + } else if i.LazyLoadingEnabled && !i.OptimizationLoading { + status = "press enter to load" + } else if i.OptimizationLoading { + status = "loading" + } else if i.Wastage.RightSizing.Recommended != nil { totalSaving := 0.0 totalCurrentCost := 0.0 totalSaving += i.Wastage.RightSizing.Current.Cost - i.Wastage.RightSizing.Recommended.Cost totalCurrentCost += i.Wastage.RightSizing.Current.Cost + for _, d := range i.Wastage.VolumeRightSizing { + totalSaving += d.Current.Cost - d.Recommended.Cost + totalCurrentCost += d.Current.Cost + } status = fmt.Sprintf("%s (%.2f%%)", utils.FormatPriceFloat(totalSaving), (totalSaving/totalCurrentCost)*100) } @@ -250,7 +296,7 @@ func (i ComputeInstanceItem) ToOptimizationItem() *golang.ChartOptimizationItem DevicesChartRows: deviceRows, DevicesProperties: deviceProps, Preferences: i.Preferences, - Description: "description placeholder", + Description: i.Wastage.RightSizing.Description, Loading: i.OptimizationLoading, Skipped: i.Skipped, SkipReason: wrapperspb.String(i.SkipReason), diff --git a/plugin/processor/compute_instance/job_compute_instance_list.go b/plugin/processor/compute_instance/job_compute_instance_list.go index 65f38d0..f2c41d5 100644 --- a/plugin/processor/compute_instance/job_compute_instance_list.go +++ b/plugin/processor/compute_instance/job_compute_instance_list.go @@ -64,16 +64,23 @@ func (job *ListComputeInstancesJob) Run(ctx context.Context) error { MachineType: util.TrimmedString(*instance.MachineType, "/"), Region: util.TrimmedString(*instance.Zone, "/"), Platform: instance.GetCpuPlatform(), - OptimizationLoading: false, + OptimizationLoading: true, Preferences: preferences.DefaultComputeEnginePreferences, Skipped: false, LazyLoadingEnabled: false, SkipReason: "NA", + Instance: instance, Disks: disks, Metrics: nil, + DisksMetrics: nil, } - log.Printf("OI instance: %s", oi.Name) + if !oi.Skipped { + job.processor.lazyloadCounter.Add(1) + if job.processor.lazyloadCounter.Load() > uint32(1) { + oi.LazyLoadingEnabled = true + } + } job.processor.items.Set(oi.Id, oi) job.processor.publishOptimizationItem(oi.ToOptimizationItem()) diff --git a/plugin/processor/compute_instance/job_get_compute_instance_metrics.go b/plugin/processor/compute_instance/job_get_compute_instance_metrics.go index b0f09a9..534a591 100644 --- a/plugin/processor/compute_instance/job_get_compute_instance_metrics.go +++ b/plugin/processor/compute_instance/job_get_compute_instance_metrics.go @@ -46,11 +46,6 @@ func (job *GetComputeInstanceMetricsJob) Run(ctx context.Context) error { endTime := time.Now() // end time of requested time series startTime := endTime.Add(-24 * 1 * time.Hour) // start time of requested time series - // metricName string, - // instanceID string, - // startTime time.Time, - // endTime time.Time, - // periodInSeconds int64, cpuRequest := job.processor.metricProvider.NewTimeSeriesRequest( fmt.Sprintf( `metric.type="%s" AND resource.labels.instance_id="%s"`, @@ -97,6 +92,108 @@ func (job *GetComputeInstanceMetricsJob) Run(ctx context.Context) error { return err } + disksMetrics := make(map[string]map[string][]kaytu.Datapoint) + for _, disk := range job.disks { + id := strconv.FormatUint(disk.Id, 10) + disksMetrics[id] = make(map[string][]kaytu.Datapoint) + + diskReadIopsRequest := job.processor.metricProvider.NewTimeSeriesRequest( + fmt.Sprintf( + `metric.type="%s" AND resource.labels.instance_id="%s" AND metric.labels.device_name="%s"`, + "compute.googleapis.com/instance/disk/read_ops_count", + fmt.Sprint(job.instance.GetId()), disk.Name), + &monitoringpb.TimeInterval{ + EndTime: timestamppb.New(endTime), + StartTime: timestamppb.New(startTime), + }, + &monitoringpb.Aggregation{ + AlignmentPeriod: &durationpb.Duration{ + Seconds: 60, + }, + PerSeriesAligner: monitoringpb.Aggregation_ALIGN_MEAN, // will represent all the datapoints in the above period, with a mean + }, + ) + + diskReadIopsMetrics, err := job.processor.metricProvider.GetMetric(diskReadIopsRequest) + if err != nil { + return err + } + + disksMetrics[id]["DiskReadIOPS"] = diskReadIopsMetrics + + diskWriteIopsRequest := job.processor.metricProvider.NewTimeSeriesRequest( + fmt.Sprintf( + `metric.type="%s" AND resource.labels.instance_id="%s" AND metric.labels.device_name="%s"`, + "compute.googleapis.com/instance/disk/write_ops_count", + fmt.Sprint(job.instance.GetId()), disk.Name), + &monitoringpb.TimeInterval{ + EndTime: timestamppb.New(endTime), + StartTime: timestamppb.New(startTime), + }, + &monitoringpb.Aggregation{ + AlignmentPeriod: &durationpb.Duration{ + Seconds: 60, + }, + PerSeriesAligner: monitoringpb.Aggregation_ALIGN_MEAN, // will represent all the datapoints in the above period, with a mean + }, + ) + + diskWriteIopsMetrics, err := job.processor.metricProvider.GetMetric(diskWriteIopsRequest) + if err != nil { + return err + } + + disksMetrics[id]["DiskWriteIOPS"] = diskWriteIopsMetrics + + diskReadThroughputRequest := job.processor.metricProvider.NewTimeSeriesRequest( + fmt.Sprintf( + `metric.type="%s" AND resource.labels.instance_id="%s" AND metric.labels.device_name="%s"`, + "compute.googleapis.com/instance/disk/read_bytes_count", + fmt.Sprint(job.instance.GetId()), disk.Name), + &monitoringpb.TimeInterval{ + EndTime: timestamppb.New(endTime), + StartTime: timestamppb.New(startTime), + }, + &monitoringpb.Aggregation{ + AlignmentPeriod: &durationpb.Duration{ + Seconds: 60, + }, + PerSeriesAligner: monitoringpb.Aggregation_ALIGN_MEAN, // will represent all the datapoints in the above period, with a mean + }, + ) + + diskReadThroughputMetrics, err := job.processor.metricProvider.GetMetric(diskReadThroughputRequest) + if err != nil { + return err + } + + disksMetrics[id]["DiskReadThroughput"] = diskReadThroughputMetrics + + diskWriteThroughputRequest := job.processor.metricProvider.NewTimeSeriesRequest( + fmt.Sprintf( + `metric.type="%s" AND resource.labels.instance_id="%s" AND metric.labels.device_name="%s"`, + "compute.googleapis.com/instance/disk/write_bytes_count", + fmt.Sprint(job.instance.GetId()), disk.Name), + &monitoringpb.TimeInterval{ + EndTime: timestamppb.New(endTime), + StartTime: timestamppb.New(startTime), + }, + &monitoringpb.Aggregation{ + AlignmentPeriod: &durationpb.Duration{ + Seconds: 60, + }, + PerSeriesAligner: monitoringpb.Aggregation_ALIGN_MEAN, // will represent all the datapoints in the above period, with a mean + }, + ) + + diskWriteThroughputMetrics, err := job.processor.metricProvider.GetMetric(diskWriteThroughputRequest) + if err != nil { + return err + } + + disksMetrics[id]["DiskWriteThroughput"] = diskWriteThroughputMetrics + } + instanceMetrics := make(map[string][]kaytu.Datapoint) instanceMetrics["cpuUtilization"] = cpumetric @@ -109,17 +206,21 @@ func (job *GetComputeInstanceMetricsJob) Run(ctx context.Context) error { MachineType: util.TrimmedString(*job.instance.MachineType, "/"), Region: util.TrimmedString(*job.instance.Zone, "/"), Platform: job.instance.GetCpuPlatform(), - OptimizationLoading: false, + OptimizationLoading: true, Preferences: preferences.DefaultComputeEnginePreferences, Skipped: false, LazyLoadingEnabled: false, SkipReason: "NA", + Instance: job.instance, Disks: job.disks, Metrics: instanceMetrics, + DisksMetrics: disksMetrics, } - for k, v := range oi.Metrics { - log.Printf("%s : %d", k, len(v)) + for d, v := range oi.DisksMetrics { + for k, v := range v { + log.Printf("%s %s : %d", d, k, len(v)) + } } job.processor.items.Set(oi.Id, oi) diff --git a/plugin/processor/compute_instance/job_optimize_compute_instance.go b/plugin/processor/compute_instance/job_optimize_compute_instance.go index 9130012..55e67e4 100644 --- a/plugin/processor/compute_instance/job_optimize_compute_instance.go +++ b/plugin/processor/compute_instance/job_optimize_compute_instance.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "strings" "github.com/google/uuid" "github.com/kaytu-io/kaytu/pkg/utils" @@ -34,6 +35,10 @@ func (job *OptimizeComputeInstancesJob) Description() string { } func (job *OptimizeComputeInstancesJob) Run(ctx context.Context) error { + if job.item.LazyLoadingEnabled { + job.processor.jobQueue.Push(NewGetComputeInstanceMetricsJob(job.processor, job.item.Instance, job.item.Disks)) + return nil + } requestId := uuid.NewString() @@ -41,13 +46,20 @@ func (job *OptimizeComputeInstancesJob) Run(ctx context.Context) error { diskFilled := make(map[string]float64) for _, disk := range job.item.Disks { id := strconv.FormatUint(disk.Id, 10) + typeURLParts := strings.Split(disk.Type, "/") + diskType := typeURLParts[len(typeURLParts)-1] + + zoneURLParts := strings.Split(disk.Zone, "/") + diskZone := zoneURLParts[len(zoneURLParts)-1] + region := strings.Join([]string{strings.Split(diskZone, "-")[0], strings.Split(diskZone, "-")[1]}, "-") + disks = append(disks, kaytu.GcpComputeDisk{ HashedDiskId: id, DiskSize: &disk.SizeGb, - DiskType: disk.Type, - Region: disk.Region, + DiskType: diskType, + Region: region, ProvisionedIops: &disk.ProvisionedIops, - Zone: disk.Zone, + Zone: diskZone, }) diskFilled[id] = 0 } @@ -61,11 +73,12 @@ func (job *OptimizeComputeInstancesJob) Run(ctx context.Context) error { Zone: job.item.Region, MachineType: job.item.MachineType, }, - Disks: disks, - Metrics: job.item.Metrics, - Region: job.item.Region, - Preferences: preferences.Export(job.item.Preferences), - Loading: false, + Disks: disks, + Metrics: job.item.Metrics, + DisksMetrics: job.item.DisksMetrics, + Region: job.item.Region, + Preferences: preferences.Export(job.item.Preferences), + Loading: false, } response, err := kaytu.Ec2InstanceWastageRequest(request, job.processor.kaytuAcccessToken) @@ -86,6 +99,8 @@ func (job *OptimizeComputeInstancesJob) Run(ctx context.Context) error { LazyLoadingEnabled: false, SkipReason: "NA", Metrics: job.item.Metrics, + DisksMetrics: job.item.DisksMetrics, + Instance: job.item.Instance, Disks: job.item.Disks, Wastage: *response, } diff --git a/plugin/service.go b/plugin/service.go index 5644795..ed45db9 100644 --- a/plugin/service.go +++ b/plugin/service.go @@ -179,6 +179,17 @@ func (p *GCPPlugin) StartProcess(cmd string, flags map[string]string, kaytuAcces }) } + publishNonInteractiveExport := func(ex *golang.NonInteractiveExport) { + err := p.stream.Send(&golang.PluginMessage{ + PluginMessage: &golang.PluginMessage_NonInteractive{ + NonInteractive: ex, + }, + }) + if err != nil { + log.Printf("failed to send non interactive export: %v", err) + } + } + publishResultsReady(false) if cmd == "compute-instance" { @@ -194,6 +205,7 @@ func (p *GCPPlugin) StartProcess(cmd string, flags map[string]string, kaytuAcces return fmt.Errorf("invalid command: %s", cmd) } jobQueue.SetOnFinish(func(ctx context.Context) { + publishNonInteractiveExport(p.processor.ExportNonInteractive()) publishResultsReady(true) }) diff --git a/utils/concurrent_map.go b/utils/concurrent_map.go index 5cc6e53..418cfd0 100644 --- a/utils/concurrent_map.go +++ b/utils/concurrent_map.go @@ -1,35 +1,37 @@ package util -import "sync" +import ( + "sync" +) type ConcurrentMap[K comparable, V any] struct { - data map[K]V - mutex sync.RWMutex + data sync.Map } func NewMap[K comparable, V any]() ConcurrentMap[K, V] { return ConcurrentMap[K, V]{ - data: map[K]V{}, - mutex: sync.RWMutex{}, + data: sync.Map{}, } } func (cm *ConcurrentMap[K, V]) Set(key K, value V) { - cm.mutex.Lock() - defer cm.mutex.Unlock() - cm.data[key] = value + cm.data.Store(key, value) } func (cm *ConcurrentMap[K, V]) Delete(key K) { - cm.mutex.Lock() - defer cm.mutex.Unlock() - delete(cm.data, key) + cm.data.Delete(key) } func (cm *ConcurrentMap[K, V]) Get(key K) (V, bool) { - cm.mutex.RLock() - defer cm.mutex.RUnlock() + v, ok := cm.data.Load(key) + if !ok { + return *new(V), false + } + return v.(V), true +} - v, ok := cm.data[key] - return v, ok +func (cm *ConcurrentMap[K, V]) Range(f func(key K, value V) bool) { + cm.data.Range(func(key, value any) bool { + return f(key.(K), value.(V)) + }) }