Skip to content

Commit

Permalink
make sharding can working on new partition codes (#21319)
Browse files Browse the repository at this point in the history
make sharding can working on new partition codes

Approved by: @iamlinjunhong
  • Loading branch information
zhangxu19830126 authored Jan 22, 2025
1 parent 42af4bc commit be0edb5
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 131 deletions.
91 changes: 15 additions & 76 deletions pkg/shardservice/storage_mo.go → pkg/shardservice/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ import (
"fmt"
"strings"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/log"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/partitionservice"
pb "github.com/matrixorigin/matrixone/pkg/pb/shard"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/txn/clock"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -66,6 +66,7 @@ type storage struct {
waiter client.TimestampWaiter
handles map[int]ReadFunc
engine engine.Engine
ps partitionservice.PartitionService
}

func NewShardStorage(
Expand All @@ -83,6 +84,7 @@ func NewShardStorage(
handles: handles,
engine: engine,
logger: runtime.ServiceRuntime(sid).Logger().Named("shardservice"),
ps: partitionservice.GetService(sid),
}
}

Expand Down Expand Up @@ -250,21 +252,26 @@ func (s *storage) Create(
// If the current table is a non partition
// table, we should not create sharding metadata
// for the current table.
partitions, err := readPartitionIDs(
partition, ok, err := s.ps.GetStorage().GetMetadata(
ctx,
table,
txn,
txnOp,
)
if err != nil ||
len(partitions) == 0 {
if err != nil || !ok {
return err
}

partitionIDs := make([]uint64, 0, len(partition.Partitions))
for _, p := range partition.Partitions {
partitionIDs = append(partitionIDs, p.PartitionID)
}

created = true
metadata := pb.ShardsMetadata{
Policy: pb.Policy_Partition,
ShardsCount: uint32(len(partitions)),
ShardsCount: uint32(len(partition.Partitions)),
AccountID: uint64(accountID),
ShardIDs: partitions,
ShardIDs: partitionIDs,
Version: 1,
MaxReplicaCount: 1,
}
Expand Down Expand Up @@ -463,49 +470,6 @@ func getTableIDByShardID(
return tableID, nil
}

func readPartitionIDs(
table uint64,
txn executor.TxnExecutor,
) ([]uint64, error) {
res, err := txn.Exec(
getPartitionsSQL(table),
executor.StatementOption{},
)
if err != nil {
return nil, err
}
var names []string
res.ReadRows(
func(rows int, cols []*vector.Vector) bool {
names = append(names, executor.GetStringRows(cols[0])...)
return true
},
)
res.Close()

if len(names) == 0 {
return nil, err
}

res, err = txn.Exec(
getTableIDsSQL(names),
executor.StatementOption{},
)
if err != nil {
return nil, err
}

var ids []uint64
res.ReadRows(
func(rows int, cols []*vector.Vector) bool {
ids = append(ids, executor.GetFixedRows[uint64](cols[0])...)
return true
},
)
res.Close()
return ids, nil
}

func execSQL(
sql []string,
txn executor.TxnExecutor,
Expand Down Expand Up @@ -591,31 +555,6 @@ func getDeleteShardsSQL(
)
}

func getPartitionsSQL(
table uint64,
) string {
return fmt.Sprintf("select partition_table_name from %s.%s where table_id = %d",
catalog.MO_CATALOG,
catalog.MO_TABLE_PARTITIONS,
table,
)
}

func getTableIDsSQL(
names []string,
) string {
values := make([]string, 0, len(names))
for _, name := range names {
values = append(values, fmt.Sprintf("'%s'", name))
}

return fmt.Sprintf("select rel_id from %s.%s where relname in (%s)",
catalog.MO_CATALOG,
catalog.MO_TABLES,
strings.Join(values, ","),
)
}

func getCreateSQLs(
table uint64,
metadata pb.ShardsMetadata,
Expand Down
File renamed without changes.
11 changes: 9 additions & 2 deletions pkg/tests/shard/partition_based_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/cnservice"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
Expand All @@ -34,6 +32,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/tests/testutils"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"github.com/stretchr/testify/require"
)

func TestPartitionBasedTableCanBeCreated(
Expand All @@ -42,6 +41,7 @@ func TestPartitionBasedTableCanBeCreated(
t.SkipNow()

runShardClusterTest(
t,
func(c embed.Cluster) {
db := testutils.GetDatabaseName(t)
tableID := mustCreatePartitionBasedTable(t, c, db, 3)
Expand Down Expand Up @@ -79,6 +79,7 @@ func TestPartitionBasedTableCanBeDeleted(
t.SkipNow()

runShardClusterTest(
t,
func(c embed.Cluster) {
db := testutils.GetDatabaseName(t)
tableID := mustCreatePartitionBasedTable(t, c, db, 3)
Expand Down Expand Up @@ -106,6 +107,7 @@ func TestInsertIntoWithLocalPartition(
t.SkipNow()

runShardClusterTest(
t,
func(c embed.Cluster) {
db := testutils.GetDatabaseName(t)
tableID := mustCreatePartitionBasedTable(t, c, db, 3)
Expand Down Expand Up @@ -152,6 +154,7 @@ func TestUpdateWithLocalPartition(
t.SkipNow()

runShardClusterTest(
t,
func(c embed.Cluster) {
db := testutils.GetDatabaseName(t)
tableID := mustCreatePartitionBasedTable(t, c, db, 3)
Expand Down Expand Up @@ -205,6 +208,7 @@ func TestInsertIntoWithRemotePartition(
t.SkipNow()

runShardClusterTest(
t,
func(c embed.Cluster) {
db := testutils.GetDatabaseName(t)
tableID := mustCreatePartitionBasedTable(t, c, db, 3)
Expand Down Expand Up @@ -251,6 +255,7 @@ func TestUpdateWithRemotePartition(
t.SkipNow()

runShardClusterTest(
t,
func(c embed.Cluster) {
db := testutils.GetDatabaseName(t)
tableID := mustCreatePartitionBasedTable(t, c, db, 3)
Expand Down Expand Up @@ -304,6 +309,7 @@ func TestSelectWithMultiPartition(
t.SkipNow()

runShardClusterTest(
t,
func(c embed.Cluster) {
db := testutils.GetDatabaseName(t)
tableID := mustCreatePartitionBasedTable(t, c, db, 3)
Expand Down Expand Up @@ -360,6 +366,7 @@ func TestUpdateOnNewCN(
t.SkipNow()

runShardClusterTestWithReuse(
t,
func(c embed.Cluster) {
db := testutils.GetDatabaseName(t)
tableID := mustCreatePartitionBasedTable(t, c, db, 12)
Expand Down
1 change: 1 addition & 0 deletions pkg/tests/shard/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
func TestReplicaBalance(t *testing.T) {
t.SkipNow()
runShardClusterTest(
t,
func(c embed.Cluster) {
// 3 replicas must allocated to 3 cn
db := testutils.GetDatabaseName(t)
Expand Down
26 changes: 6 additions & 20 deletions pkg/tests/shard/shard_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
)

func TestPartitionBasedShardCanBeCreated(t *testing.T) {
t.SkipNow()
embed.RunBaseClusterTests(
runShardClusterTest(
t,
func(c embed.Cluster) {
cn1, err := c.GetCNService(0)
require.NoError(t, err)
Expand All @@ -51,7 +51,6 @@ func TestPartitionBasedShardCanBeCreated(t *testing.T) {
db,
t.Name(),
cn1,
store,
)

checkPartitionBasedShardMetadata(
Expand All @@ -65,13 +64,13 @@ func TestPartitionBasedShardCanBeCreated(t *testing.T) {
}

func TestPartitionBasedShardCanBeDeleted(t *testing.T) {
t.SkipNow()
embed.RunBaseClusterTests(
runShardClusterTest(
t,
func(c embed.Cluster) {
accountID := uint32(0)
ctx, cancel := context.WithTimeout(
defines.AttachAccountId(context.Background(), accountID),
time.Second*10,
time.Second*60,
)
defer cancel()

Expand All @@ -88,7 +87,6 @@ func TestPartitionBasedShardCanBeDeleted(t *testing.T) {
db,
t.Name(),
cn1,
store,
)

exec := testutils.GetSQLExecutor(cn1)
Expand Down Expand Up @@ -186,12 +184,11 @@ func mustCreatePartitionTable(
db string,
table string,
cn embed.ServiceOperator,
store shardservice.ShardStorage,
) uint64 {
accountID := uint32(0)
ctx, cancel := context.WithTimeout(
defines.AttachAccountId(context.Background(), accountID),
time.Second*10,
time.Second*60,
)
defer cancel()

Expand All @@ -202,24 +199,13 @@ func mustCreatePartitionTable(
err := exec.ExecTxn(
ctx,
func(txn executor.TxnExecutor) error {
txnOp := txn.Txn()

res, err := txn.Exec(sql, executor.StatementOption{})
if err != nil {
return err
}
res.Close()

tableID = mustGetTableID(t, db, table, txn)
ok, err := store.Create(
ctx,
tableID,
txnOp,
)
if err != nil {
return err
}
require.True(t, ok)
return nil
},
executor.Options{}.
Expand Down
Loading

0 comments on commit be0edb5

Please sign in to comment.