Skip to content

Commit 36877bb

Browse files
committed
Chunk on multiple parts of the key
1 parent 90884a3 commit 36877bb

15 files changed

+379
-144
lines changed

pkg/checksum/checker.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ func (c *Checker) ChecksumChunk(trxPool *dbconn.TrxPool, chunk *table.Chunk) err
114114
}
115115
if chunk.LowerBound != nil {
116116
c.Lock()
117-
c.recentValue = chunk.LowerBound.Value
117+
// For recent value we only use the first part of the key.
118+
c.recentValue = chunk.LowerBound.Value[0]
118119
c.Unlock()
119120
}
120121
c.chunker.Feedback(chunk, time.Since(startTime))

pkg/migration/runner.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -551,19 +551,19 @@ func (r *Runner) postCutoverCheck(ctx context.Context) error {
551551
// Instead of checker.Run we call checker.ChecksumChunk directly
552552
// since we only care about a specifically crafted chunk.
553553
chunk := &table.Chunk{
554-
Key: cutoverTable.KeyColumns[0],
554+
Key: []string{cutoverTable.KeyColumns[0]},
555555
LowerBound: &table.Boundary{
556-
Value: table.Datum{
556+
Value: []table.Datum{{
557557
Tp: cutoverTable.MaxValue().Tp,
558558
Val: lowerBoundKey,
559-
},
559+
}},
560560
Inclusive: true,
561561
},
562562
UpperBound: &table.Boundary{
563-
Value: table.Datum{
563+
Value: []table.Datum{{
564564
Tp: cutoverTable.MaxValue().Tp,
565565
Val: upperBoundKey,
566-
},
566+
}},
567567
Inclusive: true,
568568
},
569569
}

pkg/migration/runner_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ func TestCheckpoint(t *testing.T) {
723723
// gives feedback back to table.
724724
watermark, err := r.copier.GetLowWatermark()
725725
assert.NoError(t, err)
726-
assert.Equal(t, "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\": \"1001\",\"Inclusive\":true},\"UpperBound\":{\"Value\": \"2001\",\"Inclusive\":false}}", watermark)
726+
assert.Equal(t, "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"1001\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"2001\"],\"Inclusive\":false}}", watermark)
727727
// Dump a checkpoint
728728
assert.NoError(t, r.dumpCheckpoint(context.TODO()))
729729

@@ -750,15 +750,15 @@ func TestCheckpoint(t *testing.T) {
750750

751751
chunk, err := r.copier.Next4Test()
752752
assert.NoError(t, err)
753-
assert.Equal(t, "1001", chunk.LowerBound.Value.String())
753+
assert.Equal(t, "1001", chunk.LowerBound.Value[0].String())
754754
assert.NoError(t, r.copier.CopyChunk(context.TODO(), chunk))
755755

756756
// It's ideally not typical but you can still dump checkpoint from
757757
// a restored checkpoint state. We won't have advanced anywhere from
758758
// the last checkpoint because on restore, the LowerBound is taken.
759759
watermark, err = r.copier.GetLowWatermark()
760760
assert.NoError(t, err)
761-
assert.Equal(t, "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\": \"1001\",\"Inclusive\":true},\"UpperBound\":{\"Value\": \"2001\",\"Inclusive\":false}}", watermark)
761+
assert.Equal(t, "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"1001\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"2001\"],\"Inclusive\":false}}", watermark)
762762
// Dump a checkpoint
763763
assert.NoError(t, r.dumpCheckpoint(context.TODO()))
764764

@@ -771,7 +771,7 @@ func TestCheckpoint(t *testing.T) {
771771

772772
watermark, err = r.copier.GetLowWatermark()
773773
assert.NoError(t, err)
774-
assert.Equal(t, "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\": \"11001\",\"Inclusive\":true},\"UpperBound\":{\"Value\": \"12001\",\"Inclusive\":false}}", watermark)
774+
assert.Equal(t, "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"11001\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"12001\"],\"Inclusive\":false}}", watermark)
775775
assert.NoError(t, r.db.Close())
776776
}
777777

@@ -824,7 +824,7 @@ func TestCheckpointRestore(t *testing.T) {
824824

825825
// Now insert a fake checkpoint, this uses a known bad value
826826
// from issue #125
827-
watermark := "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\":\"53926425\",\"Inclusive\":true},\"UpperBound\":{\"Value\":\"53926425\",\"Inclusive\":false}}"
827+
watermark := "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\":[\"53926425\"],\"Inclusive\":true},\"UpperBound\":{\"Value\":[\"53926425\"],\"Inclusive\":false}}"
828828
binlog := r.replClient.GetBinlogApplyPosition()
829829
query := fmt.Sprintf("INSERT INTO %s (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (?, ?, ?, ?, ?, ?)",
830830
r.checkpointTable.QuotedName)
@@ -944,7 +944,7 @@ func TestCheckpointDifferentRestoreOptions(t *testing.T) {
944944

945945
watermark, err := m.copier.GetLowWatermark()
946946
assert.NoError(t, err)
947-
assert.Equal(t, "{\"Key\":\"id\",\"ChunkSize\":1000,\"LowerBound\":{\"Value\": \"1001\",\"Inclusive\":true},\"UpperBound\":{\"Value\": \"2001\",\"Inclusive\":false}}", watermark)
947+
assert.Equal(t, "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"1001\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"2001\"],\"Inclusive\":false}}", watermark)
948948
// Dump a checkpoint
949949
assert.NoError(t, m.dumpCheckpoint(context.TODO()))
950950

@@ -1126,7 +1126,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) {
11261126
chunk, err := m.copier.Next4Test()
11271127
assert.NoError(t, err)
11281128
assert.NotNil(t, chunk)
1129-
assert.Equal(t, "`id1` < 1001", chunk.String())
1129+
assert.Equal(t, "((`id1` < 1001)\n OR (`id1` = 1001 AND `id2` < 1))", chunk.String())
11301130
assert.NoError(t, m.copier.CopyChunk(context.TODO(), chunk))
11311131

11321132
// Now insert some data.
@@ -1140,7 +1140,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) {
11401140
// Second chunk
11411141
chunk, err = m.copier.Next4Test()
11421142
assert.NoError(t, err)
1143-
assert.Equal(t, "`id1` >= 1001", chunk.String())
1143+
assert.Equal(t, "((`id1` > 1001)\n OR (`id1` = 1001 AND `id2` >= 1))", chunk.String())
11441144
assert.NoError(t, m.copier.CopyChunk(context.TODO(), chunk))
11451145

11461146
// Now insert some data.
@@ -1775,7 +1775,7 @@ func TestE2ERogueValues(t *testing.T) {
17751775
// Second chunk
17761776
chunk, err = m.copier.Next4Test()
17771777
assert.NoError(t, err)
1778-
assert.Equal(t, "`datetime` >= \"819 \\\". \"", chunk.String())
1778+
assert.Equal(t, "((`datetime` > \"819 \\\". \")\n OR (`datetime` = \"819 \\\". \" AND `col2` >= 1))", chunk.String())
17791779
assert.NoError(t, m.copier.CopyChunk(context.TODO(), chunk))
17801780

17811781
// Now insert some data.

pkg/row/copier_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ func TestCopierFromCheckpoint(t *testing.T) {
366366
t1new := table.NewTableInfo(db, "test", "_copierchkpt1_new")
367367
assert.NoError(t, t1new.SetInfo(context.TODO()))
368368

369-
lowWatermark := `{"Key":"a","ChunkSize":1,"LowerBound":{"Value":3,"Inclusive":true},"UpperBound":{"Value":4,"Inclusive":false}}`
369+
lowWatermark := `{"Key":["a"],"ChunkSize":1,"LowerBound":{"Value":["3"],"Inclusive":true},"UpperBound":{"Value":["4"],"Inclusive":false}}`
370370
copier, err := NewCopierFromCheckpoint(db, t1, t1new, NewCopierDefaultConfig(), lowWatermark, 3, 3)
371371
assert.NoError(t, err)
372372
assert.NoError(t, copier.Run(context.Background())) // works

pkg/table/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ To deal with large gaps, the optimistic chunker also supports a special "prefetc
4545

4646
The following are known issues:
4747

48-
* The composite chunker was named as such because we realized one of the main deficiencies of the optimistic chunker is _composite PRIMARY KEYs_. Despite its name, the composite chunker only chunks on the first part of the `PRIMARY KEY`. We intend to address this feature in the future.
4948
* Neither of the chunkers support chunking on anything other than the `PRIMARY KEY`. For our purposes, this is usually acceptable but we may have to revisit this in the future.
5049
* Not specifically a limitation of chunking, but some optimizations in spirit require that the `PRIMARY KEY` not have collations. Thus, we explicitly disallow `VARCHAR` primary keys in the chunker. The optimizations are very useful, which makes this a complex problem to fix. But it also appears to be the most common incompatibility issue with spirit, so we may have to reconsider this at some point.
5150

pkg/table/chunk.go

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ import (
99
// Chunk is returned by chunk.Next()
1010
// Applications can use it to iterate over the rows.
1111
type Chunk struct {
12-
Key string
12+
Key []string
1313
ChunkSize uint64
1414
LowerBound *Boundary
1515
UpperBound *Boundary
1616
}
1717

1818
// Boundary is used by chunk for lower or upper boundary
1919
type Boundary struct {
20-
Value Datum
20+
Value []Datum
2121
Inclusive bool
2222
}
2323

@@ -40,14 +40,16 @@ func (c *Chunk) String() string {
4040
if !c.LowerBound.Inclusive {
4141
operator = OpGreaterThan
4242
}
43-
conds = append(conds, fmt.Sprintf("`%s` %s %s", c.Key, operator, c.LowerBound.Value))
43+
str := expandRowConstructorComparison(c.Key, operator, c.LowerBound.Value)
44+
conds = append(conds, str)
4445
}
4546
if c.UpperBound != nil {
4647
operator := OpLessEqual
4748
if !c.UpperBound.Inclusive {
4849
operator = OpLessThan
4950
}
50-
conds = append(conds, fmt.Sprintf("`%s` %s %s", c.Key, operator, c.UpperBound.Value))
51+
str := expandRowConstructorComparison(c.Key, operator, c.UpperBound.Value)
52+
conds = append(conds, str)
5153
}
5254
if c.LowerBound == nil && c.UpperBound == nil {
5355
conds = append(conds, "1=1")
@@ -56,55 +58,87 @@ func (c *Chunk) String() string {
5658
}
5759

5860
func (c *Chunk) JSON() string {
59-
return fmt.Sprintf(`{"Key":"%s","ChunkSize":%d,"LowerBound":%s,"UpperBound":%s}`,
60-
c.Key,
61+
return fmt.Sprintf(`{"Key":["%s"],"ChunkSize":%d,"LowerBound":%s,"UpperBound":%s}`,
62+
strings.Join(c.Key, `","`),
6163
c.ChunkSize,
6264
c.LowerBound.JSON(),
6365
c.UpperBound.JSON(),
6466
)
6567
}
6668

69+
// JSON encodes a boundary as JSON. The values are represented as strings,
70+
// to avoid JSON float behavior. See Issue #125
6771
func (b *Boundary) JSON() string {
68-
// encode values as strings otherwise we get JSON floats
69-
// which can corrupt larger values. Issue #125
70-
return fmt.Sprintf(`{"Value": "%s","Inclusive":%t}`, b.Value, b.Inclusive)
72+
vals := make([]string, len(b.Value))
73+
for i, v := range b.Value {
74+
vals[i] = fmt.Sprintf(`"%s"`, v)
75+
}
76+
return fmt.Sprintf(`{"Value": [%s],"Inclusive":%t}`, strings.Join(vals, ","), b.Inclusive)
77+
}
78+
79+
// comparesTo returns true if the boundaries are the same.
80+
// It is used to compare two boundaries, such as if
81+
// a.Upper == b.Lower. For this reason it does not compare
82+
// the operator (inclusive or not)!
83+
func (b *Boundary) comparesTo(b2 *Boundary) bool {
84+
if len(b.Value) != len(b2.Value) {
85+
return false
86+
}
87+
for i := range b.Value {
88+
if b.Value[i].Tp != b2.Value[i].Tp {
89+
return false
90+
}
91+
if b.Value[i].Val != b2.Value[i].Val {
92+
return false
93+
}
94+
}
95+
return true
7196
}
7297

7398
type JSONChunk struct {
74-
Key string
99+
Key []string
75100
ChunkSize uint64
76101
LowerBound JSONBoundary
77102
UpperBound JSONBoundary
78103
}
79104

80105
type JSONBoundary struct {
81-
Value interface{}
106+
Value []string
82107
Inclusive bool
83108
}
84109

85-
func NewChunkFromJSON(jsonStr string, tp string) (*Chunk, error) {
110+
func jsonStrings2Datums(ti *TableInfo, keys []string, vals []string) ([]Datum, error) {
111+
datums := make([]Datum, len(keys))
112+
for i, str := range vals {
113+
tp := ti.datumTp(keys[i])
114+
datums[i] = newDatum(fmt.Sprint(str), tp)
115+
}
116+
return datums, nil
117+
}
118+
119+
func newChunkFromJSON(ti *TableInfo, jsonStr string) (*Chunk, error) {
86120
var chunk JSONChunk
87121
err := json.Unmarshal([]byte(jsonStr), &chunk)
88122
if err != nil {
89123
return nil, err
90124
}
91-
lowerVal, err := newDatumFromMySQL(fmt.Sprint(chunk.LowerBound.Value), tp)
125+
lowerVals, err := jsonStrings2Datums(ti, chunk.Key, chunk.LowerBound.Value)
92126
if err != nil {
93127
return nil, err
94128
}
95-
upperVal, err := newDatumFromMySQL(fmt.Sprint(chunk.UpperBound.Value), tp)
129+
upperVals, err := jsonStrings2Datums(ti, chunk.Key, chunk.UpperBound.Value)
96130
if err != nil {
97131
return nil, err
98132
}
99133
return &Chunk{
100134
Key: chunk.Key,
101135
ChunkSize: chunk.ChunkSize,
102136
LowerBound: &Boundary{
103-
Value: lowerVal,
137+
Value: lowerVals,
104138
Inclusive: chunk.LowerBound.Inclusive,
105139
},
106140
UpperBound: &Boundary{
107-
Value: upperVal,
141+
Value: upperVals,
108142
Inclusive: chunk.UpperBound.Inclusive,
109143
},
110144
}, nil

pkg/table/chunk_test.go

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,37 +8,107 @@ import (
88

99
func TestChunk2String(t *testing.T) {
1010
chunk := &Chunk{
11-
Key: "id",
11+
Key: []string{"id"},
1212
LowerBound: &Boundary{
13-
Value: newDatum(100, signedType),
13+
Value: []Datum{newDatum(100, signedType)},
1414
Inclusive: true,
1515
},
1616
UpperBound: &Boundary{
17-
Value: newDatum(200, signedType),
17+
Value: []Datum{newDatum(200, signedType)},
1818
Inclusive: false,
1919
},
2020
}
2121
assert.Equal(t, "`id` >= 100 AND `id` < 200", chunk.String())
2222
chunk = &Chunk{
23-
Key: "id",
23+
Key: []string{"id"},
2424
LowerBound: &Boundary{
25-
Value: newDatum(100, signedType),
25+
Value: []Datum{newDatum(100, signedType)},
2626
Inclusive: false,
2727
},
2828
}
2929
assert.Equal(t, "`id` > 100", chunk.String())
3030
chunk = &Chunk{
31-
Key: "id",
31+
Key: []string{"id"},
3232
UpperBound: &Boundary{
33-
Value: newDatum(200, signedType),
33+
Value: []Datum{newDatum(200, signedType)},
3434
Inclusive: true,
3535
},
3636
}
3737
assert.Equal(t, "`id` <= 200", chunk.String())
3838

39-
// Empty chunks are possible with the trivial chunker.
39+
// Empty chunks are possible with the composite chunker
4040
chunk = &Chunk{
41-
Key: "id",
41+
Key: []string{"id"},
4242
}
4343
assert.Equal(t, "1=1", chunk.String())
4444
}
45+
46+
func TestCompositeChunks(t *testing.T) {
47+
chunk := &Chunk{
48+
Key: []string{"id1", "id2"},
49+
LowerBound: &Boundary{
50+
Value: []Datum{newDatum(100, signedType), newDatum(200, signedType)},
51+
Inclusive: false,
52+
},
53+
UpperBound: &Boundary{
54+
Value: []Datum{newDatum(100, signedType), newDatum(300, signedType)},
55+
Inclusive: false,
56+
},
57+
}
58+
assert.Equal(t, "((`id1` > 100)\n OR (`id1` = 100 AND `id2` > 200)) AND ((`id1` < 100)\n OR (`id1` = 100 AND `id2` < 300))", chunk.String())
59+
// 4 parts to the key - pretty unlikely.
60+
chunk = &Chunk{
61+
Key: []string{"id1", "id2", "id3", "id4"},
62+
LowerBound: &Boundary{
63+
Value: []Datum{newDatum(100, signedType), newDatum(200, signedType), newDatum(200, signedType), newDatum(200, signedType)},
64+
Inclusive: true,
65+
},
66+
UpperBound: &Boundary{
67+
Value: []Datum{newDatum(101, signedType), newDatum(12, signedType), newDatum(123, signedType), newDatum(1, signedType)},
68+
Inclusive: false,
69+
},
70+
}
71+
assert.Equal(t, "((`id1` > 100)\n OR (`id1` = 100 AND `id2` > 200)\n OR (`id1` = 100 AND `id2` = 200 AND `id3` > 200)\n OR (`id1` = 100 AND `id2` = 200 AND `id3` = 200 AND `id4` >= 200)) AND ((`id1` < 101)\n OR (`id1` = 101 AND `id2` < 12)\n OR (`id1` = 101 AND `id2` = 12 AND `id3` < 123)\n OR (`id1` = 101 AND `id2` = 12 AND `id3` = 123 AND `id4` < 1))", chunk.String())
72+
// A possible scenario when chunking on a non primary key is possible:
73+
chunk = &Chunk{
74+
Key: []string{"status", "id"},
75+
LowerBound: &Boundary{
76+
Value: []Datum{newDatum("ARCHIVED", binaryType), newDatum(1234, signedType)},
77+
Inclusive: true,
78+
},
79+
UpperBound: &Boundary{
80+
Value: []Datum{newDatum("ARCHIVED", binaryType), newDatum(5412, signedType)},
81+
Inclusive: false,
82+
},
83+
}
84+
assert.Equal(t, "((`status` > \"ARCHIVED\")\n OR (`status` = \"ARCHIVED\" AND `id` >= 1234)) AND ((`status` < \"ARCHIVED\")\n OR (`status` = \"ARCHIVED\" AND `id` < 5412))", chunk.String())
85+
}
86+
87+
func TestComparesTo(t *testing.T) {
88+
b1 := &Boundary{
89+
Value: []Datum{newDatum(200, signedType)},
90+
Inclusive: true,
91+
}
92+
b2 := &Boundary{
93+
Value: []Datum{newDatum(200, signedType)},
94+
Inclusive: true,
95+
}
96+
assert.True(t, b1.comparesTo(b2))
97+
b2.Inclusive = false // change operator
98+
assert.True(t, b1.comparesTo(b2)) // still compares
99+
b2.Value = []Datum{newDatum(300, signedType)}
100+
assert.False(t, b1.comparesTo(b2))
101+
102+
// Compound values.
103+
b1 = &Boundary{
104+
Value: []Datum{newDatum(200, signedType), newDatum(300, signedType)},
105+
Inclusive: true,
106+
}
107+
b2 = &Boundary{
108+
Value: []Datum{newDatum(200, signedType), newDatum(300, signedType)},
109+
Inclusive: true,
110+
}
111+
assert.True(t, b1.comparesTo(b2))
112+
b2.Value = []Datum{newDatum(200, signedType), newDatum(400, signedType)}
113+
assert.False(t, b1.comparesTo(b2))
114+
}

0 commit comments

Comments
 (0)