Skip to content

Commit

Permalink
feat: Monitoring metrics for sharded and non-sharded migration (#682)
Browse files Browse the repository at this point in the history
* fix: allow json download after migration has started

* initial changes for monitoring

* metric changes

* metric changes

* changes

* changes

* changes

* changes

* final changes

* go mod change

* Update config.json

* Update fetch.service.ts

* non-sharded migration flow

* package json changes reversed

* adding comments and making monitoring as a separate section

* error changes

* changes from comments

* comment changes

* UI changes
  • Loading branch information
asthamohta authored Nov 14, 2023
1 parent a7e4682 commit 752ae41
Show file tree
Hide file tree
Showing 14 changed files with 452 additions and 56 deletions.
227 changes: 227 additions & 0 deletions common/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,51 @@
package metrics

import (
"context"
"fmt"
"math"
"strings"
"sync"

dashboard "cloud.google.com/go/monitoring/dashboard/apiv1"
"cloud.google.com/go/monitoring/dashboard/apiv1/dashboardpb"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal/reports"
"github.com/GoogleCloudPlatform/spanner-migration-tool/proto/migration"
)

// Defines dimensions for Monitoring Dashboard Metrics
const (
// Default height of a tile in the monitoring dashboard
defaultMonitoringMetricHeight int32 = 16
// Default width of a tile in the monitoring dashboard
defaultMonitoringMetricWidth int32 = 16
// Default columns in the monitoring dashboard
defaultColumns int32 = 3
defaultMosaicColumns int32 = 48
)

var once sync.Once
var dashboardClient *dashboard.DashboardsClient

// MonitoringMetricsResources contains information required to create the monitoring dashboard
type MonitoringMetricsResources struct {
ProjectId string
DataflowJobId string
DatastreamId string
GcsBucketId string
PubsubSubscriptionId string
SpannerInstanceId string
SpannerDatabaseId string
ShardId string
}

type TileInfo struct {
Title string
TimeSeriesQueries map[string]string // Map of legend template and their corresponding queries
}

// GetMigrationData returns migration data comprising source schema details,
// request id, target dialect, connection mechanism etc based on
// the conv object, source driver and target db
Expand Down Expand Up @@ -102,3 +139,193 @@ func getMigrationDataSourceDetails(driver string, migrationData *migration.Migra
return migration.MigrationData_SOURCE_CONNECTION_MECHANISM_UNSPECIFIED.Enum(), migration.MigrationData_SOURCE_UNSPECIFIED.Enum()
}
}

func getDashboardClient(ctx context.Context) *dashboard.DashboardsClient {
if dashboardClient == nil {
once.Do(func() {
dashboardClient, _ = dashboard.NewDashboardsClient(ctx)
})
return dashboardClient
}
return dashboardClient
}

// CreateDataflowShardMonitoringDashboard returns a monitoring dashboard for a single shard
func (resourceIds MonitoringMetricsResources) CreateDataflowShardMonitoringDashboard(ctx context.Context) (*dashboardpb.Dashboard, error) {
var mosaicLayoutTiles []*dashboardpb.MosaicLayout_Tile
var heightOffset int32 = 0

// create independent metrics tiles
independentMetricsTiles := createShardIndependentMetrics(resourceIds)
heightOffset += setWidgetPositions(independentMetricsTiles, heightOffset)
mosaicLayoutTiles = append(mosaicLayoutTiles, independentMetricsTiles...)

var mosaicGroups = []struct {
groupTitle string
groupCreateTileFunction func(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile
}{
{groupTitle: fmt.Sprintf("Dataflow Job: %s", resourceIds.DataflowJobId), groupCreateTileFunction: createShardDataflowMetrics},
{groupTitle: fmt.Sprintf("Datastream: %s", resourceIds.DatastreamId), groupCreateTileFunction: createShardDatastreamMetrics},
{groupTitle: fmt.Sprintf("GCS Bucket: %s", strings.Split(resourceIds.GcsBucketId, "/")[2]), groupCreateTileFunction: createShardGcsMetrics},
{groupTitle: fmt.Sprintf("Pubsub: %s", resourceIds.PubsubSubscriptionId), groupCreateTileFunction: createShardPubsubMetrics},
{groupTitle: fmt.Sprintf("Spanner: instances/%s/databases/%s", resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId), groupCreateTileFunction: createShardSpannerMetrics},
}

for _, mosaicGroup := range mosaicGroups {
metricTiles := mosaicGroup.groupCreateTileFunction(resourceIds)
var groupTile *dashboardpb.MosaicLayout_Tile
groupTile, heightOffset = createCollapsibleGroupTile(TileInfo{Title: mosaicGroup.groupTitle}, metricTiles, heightOffset)
mosaicLayoutTiles = append(append(mosaicLayoutTiles, metricTiles...), groupTile)
}

mosaicLayout := dashboardpb.MosaicLayout{
Columns: defaultMosaicColumns,
Tiles: mosaicLayoutTiles,
}
layout := dashboardpb.Dashboard_MosaicLayout{
MosaicLayout: &mosaicLayout,
}

dashboardDisplayName := "Migration Dashboard"
if resourceIds.ShardId != "" {
dashboardDisplayName = fmt.Sprintf("Shard Migration Dashboard %s", resourceIds.ShardId)
}
db := dashboardpb.Dashboard{
DisplayName: dashboardDisplayName,
Layout: &layout,
}
req := &dashboardpb.CreateDashboardRequest{
Parent: "projects/" + resourceIds.ProjectId,
Dashboard: &db,
}
client := getDashboardClient(ctx)
resp, err := client.CreateDashboard(ctx, req)
if err != nil {
return nil, err
}
return resp, err
}

func createShardDataflowMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
dataflowTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{"Dataflow Workers CPU Utilization", map[string]string{"": fmt.Sprintf(dataflowCpuUtilQuery, resourceIds.DataflowJobId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Dataflow Workers Memory Utilization", map[string]string{"": fmt.Sprintf(dataflowMemoryUtilQuery, resourceIds.DataflowJobId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Dataflow Workers Backlog Time Seconds", map[string]string{"": fmt.Sprintf(dataflowBacklogTimeQuery, resourceIds.DataflowJobId, resourceIds.ProjectId)}}),
}
return dataflowTiles
}

func createShardDatastreamMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
datastreamTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{
"Datastream Total Latency",
map[string]string{"p50 " + resourceIds.DatastreamId: fmt.Sprintf(datastreamTotalLatencyQuery, resourceIds.DatastreamId, resourceIds.ProjectId, "50"), "p90 " + resourceIds.DatastreamId: fmt.Sprintf(datastreamTotalLatencyQuery, resourceIds.DatastreamId, resourceIds.ProjectId, "90")}}),
createXYChartTile(TileInfo{"Datastream Throughput", map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamThroughputQuery, resourceIds.DatastreamId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Datastream Unsupported Events", map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamUnsupportedEventsQuery, resourceIds.DatastreamId, resourceIds.ProjectId)}}),
}
return datastreamTiles
}

func createShardGcsMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
gcsBucketTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{"GCS Bucket Total Bytes", map[string]string{resourceIds.GcsBucketId: fmt.Sprintf(gcsTotalBytesQuery, strings.Split(resourceIds.GcsBucketId, "/")[2], resourceIds.ProjectId)}}),
}
return gcsBucketTiles
}

func createShardSpannerMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
spannerTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(
TileInfo{"Spanner CPU Utilisation",
map[string]string{"Database CPU Utilisation": fmt.Sprintf(spannerCpuUtilDbQuery, resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId, resourceIds.ProjectId), "Instance CPU Utilisation": fmt.Sprintf(spannerCpuUtilInstanceQuery, resourceIds.SpannerInstanceId, resourceIds.ProjectId)}}),
createXYChartTile(
TileInfo{"Spanner Storage",
map[string]string{"Database Storage": fmt.Sprintf(spannerStorageUtilDbQuery, resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId, resourceIds.ProjectId), "Instance Storage": fmt.Sprintf(spannerStorageUtilInstanceQuery, resourceIds.SpannerInstanceId, resourceIds.ProjectId)}}),
}
return spannerTiles
}

func createShardPubsubMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
pubsubTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{"Pubsub Subscription Sent Message Count", map[string]string{resourceIds.PubsubSubscriptionId: fmt.Sprintf(pubsubSubscriptionSentMessageCountQuery, resourceIds.PubsubSubscriptionId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Pubsub Age of Oldest Unacknowledged Message", map[string]string{resourceIds.PubsubSubscriptionId: fmt.Sprintf(pubsubOldestUnackedMessageAgeQuery, resourceIds.PubsubSubscriptionId, resourceIds.ProjectId)}}),
}
return pubsubTiles
}

func createShardIndependentMetrics(resourceIds MonitoringMetricsResources) []*dashboardpb.MosaicLayout_Tile {
independentMetricsTiles := []*dashboardpb.MosaicLayout_Tile{
createXYChartTile(TileInfo{"Dataflow Workers CPU Utilization", map[string]string{"": fmt.Sprintf(dataflowCpuUtilQuery, resourceIds.DataflowJobId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Datastream Throughput", map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamThroughputQuery, resourceIds.DatastreamId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Datastream Unsupported Events", map[string]string{resourceIds.DatastreamId: fmt.Sprintf(datastreamUnsupportedEventsQuery, resourceIds.DatastreamId, resourceIds.ProjectId)}}),
createXYChartTile(TileInfo{"Pubsub Age of Oldest Unacknowledged Message", map[string]string{resourceIds.PubsubSubscriptionId: fmt.Sprintf(pubsubOldestUnackedMessageAgeQuery, resourceIds.PubsubSubscriptionId, resourceIds.ProjectId)}}), createXYChartTile(
TileInfo{"Spanner CPU Utilisation",
map[string]string{"Database CPU Utilisation": fmt.Sprintf(spannerCpuUtilDbQuery, resourceIds.SpannerInstanceId, resourceIds.SpannerDatabaseId, resourceIds.ProjectId), "Instance CPU Utilisation": fmt.Sprintf(spannerCpuUtilInstanceQuery, resourceIds.SpannerInstanceId, resourceIds.ProjectId)}}),
}
return independentMetricsTiles
}

// createXYChartTile returns a single tile in a mosaic layout dashboard
func createXYChartTile(tileInfo TileInfo) *dashboardpb.MosaicLayout_Tile {
var dataSets []*dashboardpb.XyChart_DataSet
for legendTemplate, query := range tileInfo.TimeSeriesQueries {
ds := &dashboardpb.XyChart_DataSet{
PlotType: dashboardpb.XyChart_DataSet_LINE,
TargetAxis: dashboardpb.XyChart_DataSet_Y1,
TimeSeriesQuery: &dashboardpb.TimeSeriesQuery{
Source: &dashboardpb.TimeSeriesQuery_TimeSeriesQueryLanguage{
TimeSeriesQueryLanguage: query,
},
},
}
if legendTemplate != "" {
ds.LegendTemplate = legendTemplate
}
dataSets = append(dataSets, ds)
}
tile := dashboardpb.MosaicLayout_Tile{
Widget: &dashboardpb.Widget{
Title: tileInfo.Title,
Content: &dashboardpb.Widget_XyChart{
XyChart: &dashboardpb.XyChart{
ChartOptions: &dashboardpb.ChartOptions{
Mode: dashboardpb.ChartOptions_COLOR,
},
DataSets: dataSets,
},
},
},
}
return &tile
}

// createCollapsibleGroupTile returns a collapsible group tile in a mosaic layout dashboard
func createCollapsibleGroupTile(tileInfo TileInfo, tiles []*dashboardpb.MosaicLayout_Tile, heightOffset int32) (*dashboardpb.MosaicLayout_Tile, int32) {
groupTileHeight := setWidgetPositions(tiles, heightOffset)
groupTile := dashboardpb.MosaicLayout_Tile{
XPos: 0,
YPos: heightOffset,
Width: defaultMonitoringMetricWidth * defaultColumns,
Height: groupTileHeight,
Widget: &dashboardpb.Widget{
Title: tileInfo.Title,
Content: &dashboardpb.Widget_CollapsibleGroup{
CollapsibleGroup: &dashboardpb.CollapsibleGroup{
Collapsed: true,
},
},
},
}
return &groupTile, heightOffset + groupTileHeight
}

// setWidgetPositions positions the tiles in the monitoring dashboard
func setWidgetPositions(tiles []*dashboardpb.MosaicLayout_Tile, heightOffset int32) int32 {
for tilePosition, tile := range tiles {
tile.XPos = (int32(tilePosition) % defaultColumns) * defaultMonitoringMetricWidth
tile.YPos = heightOffset + (int32(tilePosition)/defaultColumns)*defaultMonitoringMetricHeight
tile.Width = defaultMonitoringMetricWidth
tile.Height = defaultMonitoringMetricHeight
}
return ((int32(len(tiles)-1) / defaultColumns) + 1) * defaultMonitoringMetricHeight
}
49 changes: 49 additions & 0 deletions common/metrics/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package metrics

// Defines queries for Monitoring Dashboard Metrics
const (
dataflowCpuUtilQuery = "fetch gce_instance | metric 'compute.googleapis.com/instance/cpu/utilization' | filter " +
"(metadata.user_labels.dataflow_job_id == '%s') && resource.project_id == '%s' | group_by 1m, " +
"[value_utilization_mean: mean(value.utilization)] | every 1m | group_by [metric.instance_name], " +
"[value_utilization_mean_percentile: percentile(value_utilization_mean, 90)]"
dataflowMemoryUtilQuery = "fetch gce_instance | metric 'compute.googleapis.com/guest/memory/bytes_used' | filter (metadata.user_labels.dataflow_job_id == '%s') " +
"&& resource.project_id == '%s' && (metric.state == 'used') | align next_older(1m) | every 1m | " +
"group_by [metric.instance_name], [value_bytes_used_percentile: percentile(value.bytes_used, 90 )]"
dataflowBacklogTimeQuery = "fetch dataflow_job | metric 'dataflow.googleapis.com/job/estimated_backlog_processing_time' | " +
"filter (metric.job_id == '%s') && resource.project_id == '%s' | group_by 1m, " +
"[value_estimated_backlog_processing_time_mean: mean(value.estimated_backlog_processing_time)] | every 1m"
datastreamTotalLatencyQuery = "fetch datastream.googleapis.com/Stream | metric 'datastream.googleapis.com/stream/total_latencies' " +
"| filter (resource.stream_id == '%s') | filter resource.resource_container == '%s' | " +
"align delta(1m) | every 1m | group_by [], [value_total_latencies_percentile: percentile(value.total_latencies, %s)]"
datastreamUnsupportedEventsQuery = "fetch datastream.googleapis.com/Stream| metric 'datastream.googleapis.com/stream/unsupported_event_count'| " +
"filter (resource.stream_id == '%s') | filter (resource.resource_container == '%s') | align delta(10m)| every 10m| group_by [], " +
"[value_unsupported_event_count_sum: sum(value.unsupported_event_count)]"
datastreamThroughputQuery = "fetch datastream.googleapis.com/Stream| metric 'datastream.googleapis.com/stream/event_count'" +
"| filter (resource.stream_id == '%s') | filter (resource.resource_container == '%s')| align rate(1m)| group_by [], " +
"[value_event_count_sum: mean(value.event_count)]| every 1m"
gcsTotalBytesQuery = "fetch gcs_bucket | metric 'storage.googleapis.com/storage/total_bytes' | filter " +
"(resource.bucket_name == '%s') && resource.project_id == '%s' | group_by 1m, [value_total_bytes_mean: mean(value.total_bytes)] | every 1m | " +
"group_by [], [value_total_bytes_mean_aggregate: aggregate(value_total_bytes_mean)]"
pubsubSubscriptionSentMessageCountQuery = "fetch pubsub_subscription | metric 'pubsub.googleapis.com/subscription/sent_message_count' | " +
"filter (resource.subscription_id == '%s') && resource.project_id == '%s' | align rate(1m) | every 1m | group_by [], " +
"[value_sent_message_count_aggregate: aggregate(value.sent_message_count)]"
pubsubOldestUnackedMessageAgeQuery = "fetch pubsub_subscription | metric 'pubsub.googleapis.com/subscription/oldest_unacked_message_age' | " +
"filter (resource.subscription_id == '%s') && resource.project_id == '%s' | group_by 1m, " +
"[value_oldest_unacked_message_age_mean: mean(value.oldest_unacked_message_age)] | every 1m | group_by [], " +
"[value_oldest_unacked_message_age_mean_max: max(value_oldest_unacked_message_age_mean)]"
spannerCpuUtilDbQuery = "fetch spanner_instance | metric 'spanner.googleapis.com/instance/cpu/utilization' | " +
"filter (resource.instance_id == '%s') && (metric.database == '%s') && resource.project_id == '%s' " +
"| group_by 1m, [value_utilization_mean: mean(value.utilization)] | every 1m | group_by [], " +
"[value_utilization_mean_percentile: percentile(value_utilization_mean, 90)]"
spannerCpuUtilInstanceQuery = "fetch spanner_instance | metric 'spanner.googleapis.com/instance/cpu/utilization' | filter (resource.instance_id == '%s') && resource.project_id == '%s' |" +
" group_by 1m, [value_utilization_mean: mean(value.utilization)] | every 1m | group_by [], " +
"[value_utilization_mean_percentile: percentile(value_utilization_mean, 90)]"
spannerStorageUtilDbQuery = "fetch spanner_instance | metric 'spanner.googleapis.com/instance/storage/used_bytes' | " +
"filter (resource.instance_id == '%s') && (metric.database == '%s') && resource.project_id == '%s' | " +
"group_by 1m, [value_used_bytes_mean: mean(value.used_bytes)] | every 1m | group_by [], " +
"[value_used_bytes_mean_aggregate: aggregate(value_used_bytes_mean)]"
spannerStorageUtilInstanceQuery = "fetch spanner_instance | metric 'spanner.googleapis.com/instance/storage/used_bytes' | filter " +
"(resource.instance_id == '%s') && resource.project_id == '%s' | group_by 1m, " +
"[value_used_bytes_mean: mean(value.used_bytes)] | every 1m | group_by [], " +
"[value_used_bytes_mean_aggregate: aggregate(value_used_bytes_mean)]"
)
Loading

0 comments on commit 752ae41

Please sign in to comment.