diff --git a/go/vt/vttablet/endtoend/misc_test.go b/go/vt/vttablet/endtoend/misc_test.go index 8d5630a5dd8..b91f0c39c80 100644 --- a/go/vt/vttablet/endtoend/misc_test.go +++ b/go/vt/vttablet/endtoend/misc_test.go @@ -1090,6 +1090,30 @@ func TestEngineReload(t *testing.T) { }) } +func TestUpdateTableIndexMetrics(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &connParams) + require.NoError(t, err) + defer conn.Close() + + if query := conn.BaseShowInnodbTableSizes(); query == "" { + t.Skip("additional table/index metrics not updated in this version of MySQL") + } + vars := framework.DebugVars() + + require.NotNil(t, framework.FetchVal(vars, "TableRows/vitess_a")) + require.NotNil(t, framework.FetchVal(vars, "TableRows/vitess_part")) + + require.NotNil(t, framework.FetchVal(vars, "TableClusteredIndexSize/vitess_a")) + require.NotNil(t, framework.FetchVal(vars, "TableClusteredIndexSize/vitess_part")) + + require.NotNil(t, framework.FetchVal(vars, "IndexCardinality/vitess_a.PRIMARY")) + require.NotNil(t, framework.FetchVal(vars, "IndexCardinality/vitess_part.PRIMARY")) + + require.NotNil(t, framework.FetchVal(vars, "IndexBytes/vitess_a.PRIMARY")) + require.NotNil(t, framework.FetchVal(vars, "IndexBytes/vitess_part.PRIMARY")) +} + // TestTuple tests that bind variables having tuple values work with vttablet. func TestTuple(t *testing.T) { client := framework.NewClient() diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index fb4a9d9367a..54803af9589 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -58,6 +58,8 @@ import ( ) const maxTableCount = 10000 +const maxPartitionsPerTable = 8192 +const maxIndexesPerTable = 64 type notifier func(full map[string]*Table, created, altered, dropped []*Table, udfsChanged bool) @@ -95,10 +97,16 @@ type Engine struct { // dbCreationFailed is for preventing log spam. dbCreationFailed bool - tableFileSizeGauge *stats.GaugesWithSingleLabel - tableAllocatedSizeGauge *stats.GaugesWithSingleLabel - innoDbReadRowsCounter *stats.Counter - SchemaReloadTimings *servenv.TimingsWrapper + tableFileSizeGauge *stats.GaugesWithSingleLabel + tableAllocatedSizeGauge *stats.GaugesWithSingleLabel + tableRowsGauge *stats.GaugesWithSingleLabel + tableClusteredIndexSizeGauge *stats.GaugesWithSingleLabel + + indexCardinalityGauge *stats.GaugesWithMultiLabels + indexBytesGauge *stats.GaugesWithMultiLabels + + innoDbReadRowsCounter *stats.Counter + SchemaReloadTimings *servenv.TimingsWrapper } // NewEngine creates a new Engine. @@ -119,6 +127,10 @@ func NewEngine(env tabletenv.Env) *Engine { _ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval) se.tableFileSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableFileSize", "tracks table file size", "Table") se.tableAllocatedSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableAllocatedSize", "tracks table allocated size", "Table") + se.tableRowsGauge = env.Exporter().NewGaugesWithSingleLabel("TableRows", "the estimated number of rows in the table", "Table") + se.tableClusteredIndexSizeGauge = env.Exporter().NewGaugesWithSingleLabel("TableClusteredIndexSize", "the byte size of the clustered index (i.e. row data)", "Table") + se.indexCardinalityGauge = env.Exporter().NewGaugesWithMultiLabels("IndexCardinality", "estimated number of unique values in the index", []string{"Table", "Index"}) + se.indexBytesGauge = env.Exporter().NewGaugesWithMultiLabels("IndexBytes", "byte size of the the index", []string{"Table", "Index"}) se.innoDbReadRowsCounter = env.Exporter().NewCounter("InnodbRowsRead", "number of rows read by mysql") se.SchemaReloadTimings = env.Exporter().NewTimings("SchemaReload", "time taken to reload the schema", "type") se.reloadTimeout = env.Config().SchemaChangeReloadTimeout @@ -432,7 +444,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error { // We therefore don't want to query for table sizes in getTableData() includeStats = false - innodbResults, err := conn.Conn.Exec(ctx, innodbTableSizesQuery, maxTableCount, false) + innodbResults, err := conn.Conn.Exec(ctx, innodbTableSizesQuery, maxTableCount*maxPartitionsPerTable, false) if err != nil { return vterrors.Wrapf(err, "in Engine.reload(), reading innodb tables") } @@ -466,8 +478,12 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error { } } } + if err := se.updateTableIndexMetrics(ctx, conn.Conn); err != nil { + log.Errorf("Updating index/table statistics failed, error: %v", err) + } // See testing in TestEngineReload } + } tableData, err := getTableData(ctx, conn.Conn, includeStats) if err != nil { @@ -689,6 +705,154 @@ func (se *Engine) updateInnoDBRowsRead(ctx context.Context, conn *connpool.Conn) return nil } +func (se *Engine) updateTableIndexMetrics(ctx context.Context, conn *connpool.Conn) error { + // Load all partitions so that we can extract the base table name from tables given as "TABLE#p#PARTITION" + type partition struct { + table string + partition string + } + + partitionsQuery := ` + select table_name, partition_name + from information_schema.partitions + where table_schema = database() and partition_name is not null + ` + partitionsResults, err := conn.Exec(ctx, partitionsQuery, 8192*maxTableCount, false) + if err != nil { + return err + } + partitions := make(map[string]partition) + for _, row := range partitionsResults.Rows { + p := partition{ + table: row[0].ToString(), + partition: row[1].ToString(), + } + key := p.table + "#p#" + p.partition + partitions[key] = p + } + + // Load table row counts and clustered index sizes. Results contain one row for every partition + type table struct { + table string + rows int64 + rowBytes int64 + } + tables := make(map[string]table) + tableStatsQuery := ` + select table_name, n_rows, clustered_index_size * @@innodb_page_size + from mysql.innodb_table_stats where database_name = database() + ` + tableStatsResults, err := conn.Exec(ctx, tableStatsQuery, maxTableCount*maxPartitionsPerTable, false) + if err != nil { + return err + } + for _, row := range tableStatsResults.Rows { + tableName := row[0].ToString() + rowCount, _ := row[1].ToInt64() + rowsBytes, _ := row[2].ToInt64() + + if strings.Contains(tableName, "#p#") { + if partition, ok := partitions[tableName]; ok { + tableName = partition.table + } + } + + t, ok := tables[tableName] + if !ok { + t = table{table: tableName} + } + t.rows += rowCount + t.rowBytes += rowsBytes + tables[tableName] = t + } + + type index struct { + table string + index string + bytes int64 + cardinality int64 + } + indexes := make(map[[2]string]index) + + // Load the byte sizes of all indexes. Results contain one row for every index/partition combination. + bytesQuery := ` + select table_name, index_name, stat_value * @@innodb_page_size + from mysql.innodb_index_stats + where database_name = database() and stat_name = 'size'; + ` + bytesResults, err := conn.Exec(ctx, bytesQuery, maxTableCount*maxIndexesPerTable, false) + if err != nil { + return err + } + for _, row := range bytesResults.Rows { + tableName := row[0].ToString() + indexName := row[1].ToString() + indexBytes, _ := row[2].ToInt64() + + if strings.Contains(tableName, "#p#") { + if partition, ok := partitions[tableName]; ok { + tableName = partition.table + } + } + + key := [2]string{tableName, indexName} + idx, ok := indexes[key] + if !ok { + idx = index{ + table: tableName, + index: indexName, + } + } + idx.bytes += indexBytes + indexes[key] = idx + } + + // Load index cardinalities. Results contain one row for every index (pre-aggregated across partitions). + cardinalityQuery := ` + select table_name, index_name, max(cardinality) + from information_schema.statistics s + where table_schema = database() + group by s.table_name, s.index_name + ` + cardinalityResults, err := conn.Exec(ctx, cardinalityQuery, maxTableCount*maxPartitionsPerTable, false) + if err != nil { + return err + } + for _, row := range cardinalityResults.Rows { + tableName := row[0].ToString() + indexName := row[1].ToString() + cardinality, _ := row[2].ToInt64() + + key := [2]string{tableName, indexName} + idx, ok := indexes[key] + if !ok { + idx = index{ + table: tableName, + index: indexName, + } + } + idx.cardinality = cardinality + indexes[key] = idx + } + + se.indexBytesGauge.ResetAll() + se.indexCardinalityGauge.ResetAll() + for _, idx := range indexes { + key := []string{idx.table, idx.index} + se.indexBytesGauge.Set(key, idx.bytes) + se.indexCardinalityGauge.Set(key, idx.cardinality) + } + + se.tableRowsGauge.ResetAll() + se.tableClusteredIndexSizeGauge.ResetAll() + for _, tbl := range tables { + se.tableRowsGauge.Set(tbl.table, tbl.rows) + se.tableClusteredIndexSizeGauge.Set(tbl.table, tbl.rowBytes) + } + + return nil +} + func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.Conn) (int64, error) { // Keep `SELECT UNIX_TIMESTAMP` is in uppercase because binlog server queries are case sensitive and expect it to be so. tm, err := conn.Exec(ctx, "SELECT UNIX_TIMESTAMP()", 1, false)