diff --git a/pkg/sql/compile/sql_executor.go b/pkg/sql/compile/sql_executor.go index f843102d83066..8c5c8e13a4bf5 100644 --- a/pkg/sql/compile/sql_executor.go +++ b/pkg/sql/compile/sql_executor.go @@ -190,13 +190,18 @@ func (s *sqlExecutor) getCompileContext( proc *process.Process, db string, lower int64) *compilerContext { - return &compilerContext{ + cc := &compilerContext{ ctx: ctx, defaultDB: db, engine: s.eng, proc: proc, lower: lower, } + // For testing: check if a stats cache is provided in context + if statsCache, ok := ctx.Value("test_stats_cache").(*plan.StatsCache); ok { + cc.statsCache = statsCache + } + return cc } func (s *sqlExecutor) adjustOptions( diff --git a/pkg/sql/compile/sql_executor_context.go b/pkg/sql/compile/sql_executor_context.go index 9cd6aad59ff79..7e24721009080 100644 --- a/pkg/sql/compile/sql_executor_context.go +++ b/pkg/sql/compile/sql_executor_context.go @@ -24,6 +24,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/defines" + "github.com/matrixorigin/matrixone/pkg/logutil" planpb "github.com/matrixorigin/matrixone/pkg/pb/plan" pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo" "github.com/matrixorigin/matrixone/pkg/perfcounter" @@ -137,6 +138,17 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (* return nil, moerr.NewNoSuchTable(ctx, dbName, tableName) } + // For testing only: check if stats cache already has stats for this table + // Only use cached stats if we're in a test environment (indicated by test_stats_cache in context) + // If so, use the cached stats instead of fetching from engine + if _, isTestEnv := c.GetContext().Value("test_stats_cache").(*plan.StatsCache); isTestEnv { + tableID := table.GetTableID(ctx) + if statsWrapper := c.statsCache.GetStatsInfo(tableID, false); statsWrapper != nil && statsWrapper.Stats != nil { + logutil.Infof("use test env cached stats for table %s (tableID=%d)", tableName, tableID) + return statsWrapper.Stats, nil + } + } + newCtx := perfcounter.AttachCalcTableStatsKey(ctx) statsInfo, err := table.Stats(newCtx, true) if err != nil { diff --git a/pkg/sql/plan/associative_law.go b/pkg/sql/plan/associative_law.go index 7982ad38d4b07..af395e8505858 100644 --- a/pkg/sql/plan/associative_law.go +++ b/pkg/sql/plan/associative_law.go @@ -14,7 +14,65 @@ package plan -import "github.com/matrixorigin/matrixone/pkg/pb/plan" +import ( + "fmt" + + "github.com/matrixorigin/matrixone/pkg/pb/plan" +) + +// checkExprInvolvesTags checks if an expression references any of the specified tags +func checkExprInvolvesTags(expr *plan.Expr, tagsMap map[int32]bool) bool { + switch e := expr.Expr.(type) { + case *plan.Expr_Col: + return tagsMap[e.Col.RelPos] + case *plan.Expr_F: + for _, arg := range e.F.Args { + if checkExprInvolvesTags(arg, tagsMap) { + return true + } + } + case *plan.Expr_W: + if checkExprInvolvesTags(e.W.WindowFunc, tagsMap) { + return true + } + for _, order := range e.W.OrderBy { + if checkExprInvolvesTags(order.Expr, tagsMap) { + return true + } + } + } + return false +} + +// migrateOnListConditions moves conditions involving specified tags from src to dst +func migrateOnListConditions(src *plan.Node, dst *plan.Node, tagsMap map[int32]bool) { + var kept []*plan.Expr + for _, cond := range src.OnList { + if checkExprInvolvesTags(cond, tagsMap) { + dst.OnList = append(dst.OnList, cond) + } else { + kept = append(kept, cond) + } + } + src.OnList = kept +} + +// getTableNameOrLabel returns table name from node's TableDef, or a label (A/B/C) if not available +func (builder *QueryBuilder) getTableNameOrLabel(nodeID int32, label string) string { + node := builder.qry.Nodes[nodeID] + if node.TableDef != nil && node.TableDef.Name != "" { + return node.TableDef.Name + } + return label +} + +// formatStatsInfo formats stats information for logging +func formatStatsInfo(stats *plan.Stats) string { + if stats == nil { + return "stats=nil" + } + return fmt.Sprintf("sel=%.4f,outcnt=%.2f,tablecnt=%.2f", stats.Selectivity, stats.Outcnt, stats.TableCnt) +} // for A*(B*C), if C.sel>0.9 and B 0.5 { return nodeID } + NodeA := builder.qry.Nodes[leftChild.Children[0]] NodeB := builder.qry.Nodes[leftChild.Children[1]] node.Children[0] = NodeB.NodeId determineHashOnPK(node.NodeId, builder) @@ -75,6 +153,24 @@ func (builder *QueryBuilder) applyAssociativeLawRule2(nodeID int32) int32 { node.Children[0] = leftChild.NodeId return node.NodeId } + + // Migrate OnList: conditions involving A must move to outer join + tagsAMap := make(map[int32]bool) + for _, tag := range builder.enumerateTags(NodeA.NodeId) { + tagsAMap[tag] = true + } + migrateOnListConditions(node, leftChild, tagsAMap) + + // Record table names and stats after migration + tableNameA := builder.getTableNameOrLabel(NodeA.NodeId, "A") + tableNameB := builder.getTableNameOrLabel(NodeB.NodeId, "B") + tableNameC := builder.getTableNameOrLabel(NodeC.NodeId, "C") + statsInfo := fmt.Sprintf("rule2: A=%s(stats:%s) B=%s(stats:%s) C=%s(stats:%s)", + tableNameA, formatStatsInfo(NodeA.Stats), + tableNameB, formatStatsInfo(NodeB.Stats), + tableNameC, formatStatsInfo(NodeC.Stats)) + builder.optimizationHistory = append(builder.optimizationHistory, statsInfo) + leftChild.Children[1] = node.NodeId ReCalcNodeStats(leftChild.NodeId, builder, true, false, true) return leftChild.NodeId @@ -110,6 +206,31 @@ func (builder *QueryBuilder) applyAssociativeLawRule3(nodeID int32) int32 { node.Children[0] = leftChild.NodeId return node.NodeId } + + // Migrate OnList: + // - node: move conditions involving B to leftChild + // - leftChild: move conditions involving C to node + tagsBMap := make(map[int32]bool) + for _, tag := range builder.enumerateTags(NodeB.NodeId) { + tagsBMap[tag] = true + } + tagsCMap := make(map[int32]bool) + for _, tag := range builder.enumerateTags(NodeC.NodeId) { + tagsCMap[tag] = true + } + migrateOnListConditions(node, leftChild, tagsBMap) + migrateOnListConditions(leftChild, node, tagsCMap) + + // Record table names and stats after migration + tableNameA := builder.getTableNameOrLabel(NodeA.NodeId, "A") + tableNameB := builder.getTableNameOrLabel(NodeB.NodeId, "B") + tableNameC := builder.getTableNameOrLabel(NodeC.NodeId, "C") + statsInfo := fmt.Sprintf("rule3: A=%s(stats:%s) B=%s(stats:%s) C=%s(stats:%s)", + tableNameA, formatStatsInfo(NodeA.Stats), + tableNameB, formatStatsInfo(NodeB.Stats), + tableNameC, formatStatsInfo(NodeC.Stats)) + builder.optimizationHistory = append(builder.optimizationHistory, statsInfo) + leftChild.Children[0] = node.NodeId ReCalcNodeStats(leftChild.NodeId, builder, true, false, true) return leftChild.NodeId diff --git a/pkg/sql/plan/join_order.go b/pkg/sql/plan/join_order.go index ce31a0fb08ff7..7385a785496c5 100644 --- a/pkg/sql/plan/join_order.go +++ b/pkg/sql/plan/join_order.go @@ -15,6 +15,7 @@ package plan import ( + "fmt" "sort" "strings" @@ -204,6 +205,7 @@ func HasColExpr(expr *plan.Expr, pos int32) int32 { } func (builder *QueryBuilder) determineJoinOrder(nodeID int32) int32 { + originalNodeID := nodeID if builder.optimizerHints != nil && builder.optimizerHints.joinOrdering != 0 { return nodeID } @@ -223,6 +225,9 @@ func (builder *QueryBuilder) determineJoinOrder(nodeID int32) int32 { } leaves, conds := builder.gatherJoinLeavesAndConds(node, nil, nil) + // Record middle: gathered leaves and conditions + builder.optimizationHistory = append(builder.optimizationHistory, + fmt.Sprintf("determineJoinOrder:middle (nodeID: %d, leaves: %d, conds: %d)", nodeID, len(leaves), len(conds))) newConds := deduceNewOnList(conds) conds = append(conds, newConds...) vertices := builder.getJoinGraph(leaves, conds) @@ -347,6 +352,14 @@ func (builder *QueryBuilder) determineJoinOrder(nodeID int32) int32 { FilterList: conds, }, nil) } + // Record after determineJoinOrder + if nodeID != originalNodeID { + builder.optimizationHistory = append(builder.optimizationHistory, + fmt.Sprintf("determineJoinOrder:after (nodeID: %d -> %d, remainingConds: %d)", originalNodeID, nodeID, len(conds))) + } else { + builder.optimizationHistory = append(builder.optimizationHistory, + fmt.Sprintf("determineJoinOrder:after (nodeID: %d, no change, remainingConds: %d)", nodeID, len(conds))) + } return nodeID } diff --git a/pkg/sql/plan/opt_misc_test.go b/pkg/sql/plan/opt_misc_test.go index d17f83a04f9a5..6901b167bd536 100644 --- a/pkg/sql/plan/opt_misc_test.go +++ b/pkg/sql/plan/opt_misc_test.go @@ -15,10 +15,12 @@ package plan import ( + "context" + "testing" + "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/stretchr/testify/require" - "testing" ) func TestRemapWindowClause(t *testing.T) { @@ -43,7 +45,13 @@ func TestRemapWindowClause(t *testing.T) { Typ: plan.Type{}, } colMap := make(map[[2]int32][2]int32) - var b *QueryBuilder + var b *QueryBuilder = &QueryBuilder{ + compCtx: &MockCompilerContext{ + ctx: context.Background(), + }, + optimizationHistory: []string{"test optimization history"}, + } err := b.remapWindowClause(f, 1, 1, colMap, nil) + t.Log(err) require.Error(t, err) } diff --git a/pkg/sql/plan/pushdown.go b/pkg/sql/plan/pushdown.go index 5b0fdf7f2330c..6cf5919b461e6 100644 --- a/pkg/sql/plan/pushdown.go +++ b/pkg/sql/plan/pushdown.go @@ -15,6 +15,8 @@ package plan import ( + "fmt" + "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/plan" @@ -22,6 +24,10 @@ import ( ) func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr, separateNonEquiConds bool) (int32, []*plan.Expr) { + originalNodeID := nodeID + // Record before pushdownFilters + builder.optimizationHistory = append(builder.optimizationHistory, + fmt.Sprintf("pushdownFilters:before (nodeID: %d, nodeType: %s, filters: %d)", nodeID, builder.qry.Nodes[nodeID].NodeType, len(filters))) node := builder.qry.Nodes[nodeID] var canPushdown, cantPushdown []*plan.Expr @@ -150,6 +156,9 @@ func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr, } case plan.Node_JOIN: + // Record middle: processing JOIN node + builder.optimizationHistory = append(builder.optimizationHistory, + fmt.Sprintf("pushdownFilters:middle (nodeID: %d, JOIN, filters: %d, onList: %d)", nodeID, len(filters), len(node.OnList))) leftTags := make(map[int32]bool) for _, tag := range builder.enumerateTags(node.Children[0]) { leftTags[tag] = true @@ -402,6 +411,9 @@ func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr, node.Children[1] = childID case plan.Node_UNION, plan.Node_UNION_ALL, plan.Node_MINUS, plan.Node_MINUS_ALL, plan.Node_INTERSECT, plan.Node_INTERSECT_ALL: + // Record middle: processing UNION/MINUS/INTERSECT node + builder.optimizationHistory = append(builder.optimizationHistory, + fmt.Sprintf("pushdownFilters:middle (nodeID: %d, %s, filters: %d)", nodeID, node.NodeType, len(filters))) leftChild := builder.qry.Nodes[node.Children[0]] rightChild := builder.qry.Nodes[node.Children[1]] var canPushDownRight []*plan.Expr @@ -457,6 +469,9 @@ func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr, node.Children[0] = childID case plan.Node_TABLE_SCAN, plan.Node_EXTERNAL_SCAN: + // Record middle: processing TABLE_SCAN/EXTERNAL_SCAN node + builder.optimizationHistory = append(builder.optimizationHistory, + fmt.Sprintf("pushdownFilters:middle (nodeID: %d, %s, filters: %d)", nodeID, node.NodeType, len(filters))) for _, filter := range filters { if onlyContainsTag(filter, node.BindingTags[0]) { node.FilterList = append(node.FilterList, filter) @@ -505,6 +520,14 @@ func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr, } } + // Record after pushdownFilters + if nodeID != originalNodeID { + builder.optimizationHistory = append(builder.optimizationHistory, + fmt.Sprintf("pushdownFilters:after (nodeID: %d -> %d, cantPushdown: %d)", originalNodeID, nodeID, len(cantPushdown))) + } else { + builder.optimizationHistory = append(builder.optimizationHistory, + fmt.Sprintf("pushdownFilters:after (nodeID: %d, no change, cantPushdown: %d)", nodeID, len(cantPushdown))) + } return nodeID, cantPushdown } diff --git a/pkg/sql/plan/query_builder.go b/pkg/sql/plan/query_builder.go index 55a082cffda7a..a4834313c65f8 100644 --- a/pkg/sql/plan/query_builder.go +++ b/pkg/sql/plan/query_builder.go @@ -94,6 +94,7 @@ func NewQueryBuilder(queryType plan.Query_StatementType, ctx CompilerContext, is isPrepareStatement: isPrepareStatement, deleteNode: make(map[uint64]int32), skipStats: skipStats, + optimizationHistory: make([]string, 0), } } @@ -193,6 +194,17 @@ func (builder *QueryBuilder) buildRemapErrorMessage( } } + // Optimization history + if len(builder.optimizationHistory) > 0 { + sb.WriteString("🔧 Optimization History:\n") + for _, hist := range builder.optimizationHistory { + sb.WriteString(fmt.Sprintf(" - %s\n", hist)) + } + sb.WriteString("\n") + } else { + sb.WriteString("🔧 Optimization History: (no associatelaw applied)\n\n") + } + // Available columns if len(colMap) > 0 { sb.WriteString("✅ Available Columns in Context:\n") diff --git a/pkg/sql/plan/types.go b/pkg/sql/plan/types.go index c6ad923535dbc..073e9d2537c5b 100644 --- a/pkg/sql/plan/types.go +++ b/pkg/sql/plan/types.go @@ -193,6 +193,10 @@ type QueryBuilder struct { aggSpillMem int64 optimizerHints *OptimizerHints + + // optimizationHistory records key optimization steps for debugging remap errors + // Only records when optimizations actually change the plan structure + optimizationHistory []string } type OptimizerHints struct { diff --git a/pkg/vm/engine/test/associative_law_remap_test.go b/pkg/vm/engine/test/associative_law_remap_test.go new file mode 100644 index 0000000000000..2299d9151b761 --- /dev/null +++ b/pkg/vm/engine/test/associative_law_remap_test.go @@ -0,0 +1,162 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "fmt" + "testing" + + "github.com/lni/goutils/leaktest" + "github.com/stretchr/testify/require" + + "github.com/matrixorigin/matrixone/pkg/catalog" + "github.com/matrixorigin/matrixone/pkg/common/runtime" + "github.com/matrixorigin/matrixone/pkg/container/types" + planpb "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/plan" + "github.com/matrixorigin/matrixone/pkg/sql/plan/explain" + "github.com/matrixorigin/matrixone/pkg/util/executor" + "github.com/matrixorigin/matrixone/pkg/vm/engine" + catalog2 "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils/config" + "github.com/matrixorigin/matrixone/pkg/vm/engine/test/testutil" +) + +// TestAssociativeLawRemapping tests that associative law transformations +// correctly migrate OnList conditions to avoid remapping errors +func TestAssociativeLawRemapping(t *testing.T) { + defer leaktest.AfterTest(t)() + + catalog.SetupDefines("") + + opts := testutil.TestOptions{} + opts.TaeEngineOptions = config.WithLongScanAndCKPOpts(nil) + p := testutil.InitEnginePack(opts, t) + defer p.Close() + + dbName := "test_db" + ctx := p.Ctx + + // Create database and tables using enginepack style + txnop := p.StartCNTxn() + + // Helper function to create schema with primary key constraint + createSchemaWithPK := func(name string, pkColName string, pkColType types.Type, otherCols map[string]types.Type) *catalog2.Schema { + schema := catalog2.NewEmptySchema(name) + schema.AppendPKCol(pkColName, pkColType, 0) + for colName, colType := range otherCols { + schema.AppendCol(colName, colType) + } + + // Create primary key constraint + pkDef := &planpb.PrimaryKeyDef{ + Names: []string{pkColName}, + PkeyColName: pkColName, + } + pkConstraint := &engine.PrimaryKeyDef{ + Pkey: pkDef, + } + constraintDef := &engine.ConstraintDef{ + Cts: []engine.Constraint{pkConstraint}, + } + schema.Constraint, _ = constraintDef.MarshalBinary() + schema.Finalize(false) + return schema + } + + // Create connector_job table schema + jobSchema := createSchemaWithPK("connector_job", "id", types.T_int64.ToType(), map[string]types.Type{ + "file_id": types.T_int64.ToType(), + "task_id": types.T_int64.ToType(), + "status": types.T_int8.ToType(), + }) + + // Create task table schema + taskSchema := createSchemaWithPK("task", "id", types.T_int64.ToType(), map[string]types.Type{ + "uid": types.New(types.T_varchar, 255, 0), + }) + + // Create file table schema + fileSchema := createSchemaWithPK("file", "id", types.T_int64.ToType(), map[string]types.Type{ + "job_id": types.T_int64.ToType(), + "task_id": types.T_int64.ToType(), + "table_id": types.T_int64.ToType(), + }) + + // Create tables + _, rels := p.CreateDBAndTables(txnop, dbName, jobSchema, taskSchema, fileSchema) + jobRel := rels[0] + + // Get table IDs + jobTableID := jobRel.GetTableID(p.Ctx) + + require.NoError(t, txnop.Commit(ctx)) + + // Set stats directly by creating a shared stats cache and injecting it + // through context. The compiler context will pick it up during query execution. + + // Create stats for each table to trigger associative law rule 1: + // Rule 1: A*(B*C) -> (A*B)*C when C.selectivity >= 0.9 and B.outcnt < C.outcnt + // - file (A): large table, 1000 rows + // - connector_job (B): small after filter, 9 rows -> outcnt = 7.29 after filter + // - task (C): large table, 1000 rows, selectivity = 1.0 + + // Create a shared stats cache that we can populate + sharedStatsCache := plan.NewStatsCache() + + // Connector_job table (B): small after filter + jobStats := plan.NewStatsInfo() + jobStats.TableCnt = 9 + sharedStatsCache.SetStatsInfo(jobTableID, jobStats) + + // Store stats cache in context so compiler context can access it + ctx = context.WithValue(ctx, "test_stats_cache", sharedStatsCache) + + // Get SQL executor + v, ok := runtime.ServiceRuntime("").GetGlobalVariables(runtime.InternalSQLExecutor) + require.True(t, ok) + exec := v.(executor.SQLExecutor) + + // Test the problematic query + // This should not cause remapping error after the fix + txnop = p.StartCNTxn() + res, err := exec.Exec(ctx, fmt.Sprintf(` + SELECT DISTINCT t.id + FROM %s.connector_job job, %s.task t, %s.file f + WHERE job.task_id = t.id + AND f.job_id = job.id + AND f.task_id = t.id + AND job.status != 4 + AND job.status != 5 + AND t.uid = '019ac448-45b7-7545-9762-2a73f9ab129' + AND f.table_id in (1) + `, dbName, dbName, dbName), executor.Options{}.WithTxn(txnop)) + + // Print logical plan in EXPLAIN format + if res.LogicalPlan != nil { + planObj := &plan.Plan{ + Plan: &plan.Plan_Query{ + Query: res.LogicalPlan, + }, + } + explainOutput := explain.DebugPlan(planObj) + t.Logf("Logical Plan (EXPLAIN format):\n%s", explainOutput) + } + // The query should succeed without remapping error + require.NoError(t, err, "Query should not cause remapping error") + require.NoError(t, txnop.Commit(ctx)) + +}