Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk on all parts of the key #168

Merged
merged 2 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (c *Checker) ChecksumChunk(trxPool *dbconn.TrxPool, chunk *table.Chunk) err
}
if chunk.LowerBound != nil {
c.Lock()
c.recentValue = chunk.LowerBound.Value
// For recent value we only use the first part of the key.
c.recentValue = chunk.LowerBound.Value[0]
c.Unlock()
}
c.chunker.Feedback(chunk, time.Since(startTime))
Expand Down
10 changes: 5 additions & 5 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,19 +551,19 @@ func (r *Runner) postCutoverCheck(ctx context.Context) error {
// Instead of checker.Run we call checker.ChecksumChunk directly
// since we only care about a specifically crafted chunk.
chunk := &table.Chunk{
Key: cutoverTable.KeyColumns[0],
Key: []string{cutoverTable.KeyColumns[0]},
LowerBound: &table.Boundary{
Value: table.Datum{
Value: []table.Datum{{
Tp: cutoverTable.MaxValue().Tp,
Val: lowerBoundKey,
},
}},
Inclusive: true,
},
UpperBound: &table.Boundary{
Value: table.Datum{
Value: []table.Datum{{
Tp: cutoverTable.MaxValue().Tp,
Val: upperBoundKey,
},
}},
Inclusive: true,
},
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func TestCheckpoint(t *testing.T) {
// gives feedback back to table.
watermark, err := r.copier.GetLowWatermark()
assert.NoError(t, err)
assert.Equal(t, "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\": \"1001\",\"Inclusive\":true},\"UpperBound\":{\"Value\": \"2001\",\"Inclusive\":false}}", watermark)
assert.Equal(t, "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"1001\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"2001\"],\"Inclusive\":false}}", watermark)
// Dump a checkpoint
assert.NoError(t, r.dumpCheckpoint(context.TODO()))

Expand All @@ -750,15 +750,15 @@ func TestCheckpoint(t *testing.T) {

chunk, err := r.copier.Next4Test()
assert.NoError(t, err)
assert.Equal(t, "1001", chunk.LowerBound.Value.String())
assert.Equal(t, "1001", chunk.LowerBound.Value[0].String())
assert.NoError(t, r.copier.CopyChunk(context.TODO(), chunk))

// It's ideally not typical but you can still dump checkpoint from
// a restored checkpoint state. We won't have advanced anywhere from
// the last checkpoint because on restore, the LowerBound is taken.
watermark, err = r.copier.GetLowWatermark()
assert.NoError(t, err)
assert.Equal(t, "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\": \"1001\",\"Inclusive\":true},\"UpperBound\":{\"Value\": \"2001\",\"Inclusive\":false}}", watermark)
assert.Equal(t, "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"1001\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"2001\"],\"Inclusive\":false}}", watermark)
// Dump a checkpoint
assert.NoError(t, r.dumpCheckpoint(context.TODO()))

Expand All @@ -771,7 +771,7 @@ func TestCheckpoint(t *testing.T) {

watermark, err = r.copier.GetLowWatermark()
assert.NoError(t, err)
assert.Equal(t, "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\": \"11001\",\"Inclusive\":true},\"UpperBound\":{\"Value\": \"12001\",\"Inclusive\":false}}", watermark)
assert.Equal(t, "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"11001\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"12001\"],\"Inclusive\":false}}", watermark)
assert.NoError(t, r.db.Close())
}

Expand Down Expand Up @@ -824,7 +824,7 @@ func TestCheckpointRestore(t *testing.T) {

// Now insert a fake checkpoint, this uses a known bad value
// from issue #125
watermark := "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\":\"53926425\",\"Inclusive\":true},\"UpperBound\":{\"Value\":\"53926425\",\"Inclusive\":false}}"
watermark := "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\":[\"53926425\"],\"Inclusive\":true},\"UpperBound\":{\"Value\":[\"53926425\"],\"Inclusive\":false}}"
binlog := r.replClient.GetBinlogApplyPosition()
query := fmt.Sprintf("INSERT INTO %s (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (?, ?, ?, ?, ?, ?)",
r.checkpointTable.QuotedName)
Expand Down Expand Up @@ -944,7 +944,7 @@ func TestCheckpointDifferentRestoreOptions(t *testing.T) {

watermark, err := m.copier.GetLowWatermark()
assert.NoError(t, err)
assert.Equal(t, "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\": \"1001\",\"Inclusive\":true},\"UpperBound\":{\"Value\": \"2001\",\"Inclusive\":false}}", watermark)
assert.Equal(t, "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"1001\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"2001\"],\"Inclusive\":false}}", watermark)
// Dump a checkpoint
assert.NoError(t, m.dumpCheckpoint(context.TODO()))

Expand Down Expand Up @@ -1126,7 +1126,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) {
chunk, err := m.copier.Next4Test()
assert.NoError(t, err)
assert.NotNil(t, chunk)
assert.Equal(t, "`id1` < 1001", chunk.String())
assert.Equal(t, "((`id1` < 1001)\n OR (`id1` = 1001 AND `id2` < 1))", chunk.String())
assert.NoError(t, m.copier.CopyChunk(context.TODO(), chunk))

// Now insert some data.
Expand All @@ -1140,7 +1140,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) {
// Second chunk
chunk, err = m.copier.Next4Test()
assert.NoError(t, err)
assert.Equal(t, "`id1` >= 1001", chunk.String())
assert.Equal(t, "((`id1` > 1001)\n OR (`id1` = 1001 AND `id2` >= 1))", chunk.String())
assert.NoError(t, m.copier.CopyChunk(context.TODO(), chunk))

// Now insert some data.
Expand Down Expand Up @@ -1775,7 +1775,7 @@ func TestE2ERogueValues(t *testing.T) {
// Second chunk
chunk, err = m.copier.Next4Test()
assert.NoError(t, err)
assert.Equal(t, "`datetime` >= \"819 \\\". \"", chunk.String())
assert.Equal(t, "((`datetime` > \"819 \\\". \")\n OR (`datetime` = \"819 \\\". \" AND `col2` >= 1))", chunk.String())
assert.NoError(t, m.copier.CopyChunk(context.TODO(), chunk))

// Now insert some data.
Expand Down
2 changes: 1 addition & 1 deletion pkg/row/copier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func TestCopierFromCheckpoint(t *testing.T) {
t1new := table.NewTableInfo(db, "test", "_copierchkpt1_new")
assert.NoError(t, t1new.SetInfo(context.TODO()))

lowWatermark := `{"Key":"a","ChunkSize":1,"LowerBound":{"Value":3,"Inclusive":true},"UpperBound":{"Value":4,"Inclusive":false}}`
lowWatermark := `{"Key":["a"],"ChunkSize":1,"LowerBound":{"Value":["3"],"Inclusive":true},"UpperBound":{"Value":["4"],"Inclusive":false}}`
copier, err := NewCopierFromCheckpoint(db, t1, t1new, NewCopierDefaultConfig(), lowWatermark, 3, 3)
assert.NoError(t, err)
assert.NoError(t, copier.Run(context.Background())) // works
Expand Down
1 change: 0 additions & 1 deletion pkg/table/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ To deal with large gaps, the optimistic chunker also supports a special "prefetc

The following are known issues:

* The composite chunker was named as such because we realized one of the main deficiencies of the optimistic chunker is _composite PRIMARY KEYs_. Despite its name, the composite chunker only chunks on the first part of the `PRIMARY KEY`. We intend to address this feature in the future.
* Neither of the chunkers support chunking on anything other than the `PRIMARY KEY`. For our purposes, this is usually acceptable but we may have to revisit this in the future.
* Not specifically a limitation of chunking, but some optimizations in spirit require that the `PRIMARY KEY` not have collations. Thus, we explicitly disallow `VARCHAR` primary keys in the chunker. The optimizations are very useful, which makes this a complex problem to fix. But it also appears to be the most common incompatibility issue with spirit, so we may have to reconsider this at some point.

66 changes: 50 additions & 16 deletions pkg/table/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
// Chunk is returned by chunk.Next()
// Applications can use it to iterate over the rows.
type Chunk struct {
Key string
Key []string
ChunkSize uint64
LowerBound *Boundary
UpperBound *Boundary
}

// Boundary is used by chunk for lower or upper boundary
type Boundary struct {
Value Datum
Value []Datum
Inclusive bool
}

Expand All @@ -40,14 +40,16 @@ func (c *Chunk) String() string {
if !c.LowerBound.Inclusive {
operator = OpGreaterThan
}
conds = append(conds, fmt.Sprintf("`%s` %s %s", c.Key, operator, c.LowerBound.Value))
str := expandRowConstructorComparison(c.Key, operator, c.LowerBound.Value)
conds = append(conds, str)
}
if c.UpperBound != nil {
operator := OpLessEqual
if !c.UpperBound.Inclusive {
operator = OpLessThan
}
conds = append(conds, fmt.Sprintf("`%s` %s %s", c.Key, operator, c.UpperBound.Value))
str := expandRowConstructorComparison(c.Key, operator, c.UpperBound.Value)
conds = append(conds, str)
}
if c.LowerBound == nil && c.UpperBound == nil {
conds = append(conds, "1=1")
Expand All @@ -56,55 +58,87 @@ func (c *Chunk) String() string {
}

func (c *Chunk) JSON() string {
return fmt.Sprintf(`{"Key":"%s","ChunkSize":%d,"LowerBound":%s,"UpperBound":%s}`,
c.Key,
return fmt.Sprintf(`{"Key":["%s"],"ChunkSize":%d,"LowerBound":%s,"UpperBound":%s}`,
strings.Join(c.Key, `","`),
c.ChunkSize,
c.LowerBound.JSON(),
c.UpperBound.JSON(),
)
}

// JSON encodes a boundary as JSON. The values are represented as strings,
// to avoid JSON float behavior. See Issue #125
func (b *Boundary) JSON() string {
// encode values as strings otherwise we get JSON floats
// which can corrupt larger values. Issue #125
return fmt.Sprintf(`{"Value": "%s","Inclusive":%t}`, b.Value, b.Inclusive)
vals := make([]string, len(b.Value))
for i, v := range b.Value {
vals[i] = fmt.Sprintf(`"%s"`, v)
}
return fmt.Sprintf(`{"Value": [%s],"Inclusive":%t}`, strings.Join(vals, ","), b.Inclusive)
}

// comparesTo returns true if the boundaries are the same.
// It is used to compare two boundaries, such as if
// a.Upper == b.Lower. For this reason it does not compare
// the operator (inclusive or not)!
func (b *Boundary) comparesTo(b2 *Boundary) bool {
if len(b.Value) != len(b2.Value) {
return false
}
for i := range b.Value {
if b.Value[i].Tp != b2.Value[i].Tp {
return false
}
if b.Value[i].Val != b2.Value[i].Val {
return false
}
}
return true
}

type JSONChunk struct {
Key string
Key []string
ChunkSize uint64
LowerBound JSONBoundary
UpperBound JSONBoundary
}

type JSONBoundary struct {
Value interface{}
Value []string
Inclusive bool
}

func NewChunkFromJSON(jsonStr string, tp string) (*Chunk, error) {
func jsonStrings2Datums(ti *TableInfo, keys []string, vals []string) ([]Datum, error) {
datums := make([]Datum, len(keys))
for i, str := range vals {
tp := ti.datumTp(keys[i])
datums[i] = newDatum(fmt.Sprint(str), tp)
}
return datums, nil
}

func newChunkFromJSON(ti *TableInfo, jsonStr string) (*Chunk, error) {
var chunk JSONChunk
err := json.Unmarshal([]byte(jsonStr), &chunk)
if err != nil {
return nil, err
}
lowerVal, err := newDatumFromMySQL(fmt.Sprint(chunk.LowerBound.Value), tp)
lowerVals, err := jsonStrings2Datums(ti, chunk.Key, chunk.LowerBound.Value)
if err != nil {
return nil, err
}
upperVal, err := newDatumFromMySQL(fmt.Sprint(chunk.UpperBound.Value), tp)
upperVals, err := jsonStrings2Datums(ti, chunk.Key, chunk.UpperBound.Value)
if err != nil {
return nil, err
}
return &Chunk{
Key: chunk.Key,
ChunkSize: chunk.ChunkSize,
LowerBound: &Boundary{
Value: lowerVal,
Value: lowerVals,
Inclusive: chunk.LowerBound.Inclusive,
},
UpperBound: &Boundary{
Value: upperVal,
Value: upperVals,
Inclusive: chunk.UpperBound.Inclusive,
},
}, nil
Expand Down
88 changes: 79 additions & 9 deletions pkg/table/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,107 @@ import (

func TestChunk2String(t *testing.T) {
chunk := &Chunk{
Key: "id",
Key: []string{"id"},
LowerBound: &Boundary{
Value: newDatum(100, signedType),
Value: []Datum{newDatum(100, signedType)},
Inclusive: true,
},
UpperBound: &Boundary{
Value: newDatum(200, signedType),
Value: []Datum{newDatum(200, signedType)},
Inclusive: false,
},
}
assert.Equal(t, "`id` >= 100 AND `id` < 200", chunk.String())
chunk = &Chunk{
Key: "id",
Key: []string{"id"},
LowerBound: &Boundary{
Value: newDatum(100, signedType),
Value: []Datum{newDatum(100, signedType)},
Inclusive: false,
},
}
assert.Equal(t, "`id` > 100", chunk.String())
chunk = &Chunk{
Key: "id",
Key: []string{"id"},
UpperBound: &Boundary{
Value: newDatum(200, signedType),
Value: []Datum{newDatum(200, signedType)},
Inclusive: true,
},
}
assert.Equal(t, "`id` <= 200", chunk.String())

// Empty chunks are possible with the trivial chunker.
// Empty chunks are possible with the composite chunker
chunk = &Chunk{
Key: "id",
Key: []string{"id"},
}
assert.Equal(t, "1=1", chunk.String())
}

func TestCompositeChunks(t *testing.T) {
chunk := &Chunk{
Key: []string{"id1", "id2"},
LowerBound: &Boundary{
Value: []Datum{newDatum(100, signedType), newDatum(200, signedType)},
Inclusive: false,
},
UpperBound: &Boundary{
Value: []Datum{newDatum(100, signedType), newDatum(300, signedType)},
Inclusive: false,
},
}
assert.Equal(t, "((`id1` > 100)\n OR (`id1` = 100 AND `id2` > 200)) AND ((`id1` < 100)\n OR (`id1` = 100 AND `id2` < 300))", chunk.String())
// 4 parts to the key - pretty unlikely.
chunk = &Chunk{
Key: []string{"id1", "id2", "id3", "id4"},
LowerBound: &Boundary{
Value: []Datum{newDatum(100, signedType), newDatum(200, signedType), newDatum(200, signedType), newDatum(200, signedType)},
Inclusive: true,
},
UpperBound: &Boundary{
Value: []Datum{newDatum(101, signedType), newDatum(12, signedType), newDatum(123, signedType), newDatum(1, signedType)},
Inclusive: false,
},
}
assert.Equal(t, "((`id1` > 100)\n OR (`id1` = 100 AND `id2` > 200)\n OR (`id1` = 100 AND `id2` = 200 AND `id3` > 200)\n OR (`id1` = 100 AND `id2` = 200 AND `id3` = 200 AND `id4` >= 200)) AND ((`id1` < 101)\n OR (`id1` = 101 AND `id2` < 12)\n OR (`id1` = 101 AND `id2` = 12 AND `id3` < 123)\n OR (`id1` = 101 AND `id2` = 12 AND `id3` = 123 AND `id4` < 1))", chunk.String())
// A possible scenario when chunking on a non primary key is possible:
chunk = &Chunk{
Key: []string{"status", "id"},
LowerBound: &Boundary{
Value: []Datum{newDatum("ARCHIVED", binaryType), newDatum(1234, signedType)},
Inclusive: true,
},
UpperBound: &Boundary{
Value: []Datum{newDatum("ARCHIVED", binaryType), newDatum(5412, signedType)},
Inclusive: false,
},
}
assert.Equal(t, "((`status` > \"ARCHIVED\")\n OR (`status` = \"ARCHIVED\" AND `id` >= 1234)) AND ((`status` < \"ARCHIVED\")\n OR (`status` = \"ARCHIVED\" AND `id` < 5412))", chunk.String())
}

func TestComparesTo(t *testing.T) {
b1 := &Boundary{
Value: []Datum{newDatum(200, signedType)},
Inclusive: true,
}
b2 := &Boundary{
Value: []Datum{newDatum(200, signedType)},
Inclusive: true,
}
assert.True(t, b1.comparesTo(b2))
b2.Inclusive = false // change operator
assert.True(t, b1.comparesTo(b2)) // still compares
b2.Value = []Datum{newDatum(300, signedType)}
assert.False(t, b1.comparesTo(b2))

// Compound values.
b1 = &Boundary{
Value: []Datum{newDatum(200, signedType), newDatum(300, signedType)},
Inclusive: true,
}
b2 = &Boundary{
Value: []Datum{newDatum(200, signedType), newDatum(300, signedType)},
Inclusive: true,
}
assert.True(t, b1.comparesTo(b2))
b2.Value = []Datum{newDatum(200, signedType), newDatum(400, signedType)}
assert.False(t, b1.comparesTo(b2))
}
5 changes: 4 additions & 1 deletion pkg/table/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func NewChunker(t *TableInfo, chunkerTarget time.Duration, logger loggers.Advanc
if chunkerTarget == 0 {
chunkerTarget = ChunkerDefaultTarget
}
if err := t.isCompatibleWithChunker(); err != nil {
// Currently we always check if the key is memory comparable.
// This is required for spirit's Delta Map, but
// not specifically required for the chunker.
if err := t.isMemoryComparable(); err != nil {
return nil, err
}
// Use the optimistic chunker for auto_increment
Expand Down
Loading
Loading