Skip to content

Commit c68835c

Browse files
authored
Merge pull request #188 from squareup/prudhvi.remove_embedded_type
Revert "Merge pull request #183 from squareup/prudhvi.watermark_prior…
2 parents 39f29df + 0ecee86 commit c68835c

File tree

6 files changed

+256
-205
lines changed

6 files changed

+256
-205
lines changed

pkg/table/chunker.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type Chunker interface {
3434
IsRead() bool
3535
Close() error
3636
Next() (*Chunk, error)
37-
Feedback(chunk *Chunk, d time.Duration)
37+
Feedback(*Chunk, time.Duration)
3838
GetLowWatermark() (string, error)
3939
KeyAboveHighWatermark(interface{}) bool
4040
}
@@ -47,12 +47,10 @@ func NewChunker(t *TableInfo, chunkerTarget time.Duration, logger loggers.Advanc
4747
// tables with a single column key.
4848
if len(t.KeyColumns) == 1 && t.KeyIsAutoInc {
4949
return &chunkerOptimistic{
50-
coreChunker: &coreChunker{
51-
Ti: t,
52-
ChunkerTarget: chunkerTarget,
53-
lowerBoundWatermarkMap: make(map[string]*Chunk, 0),
54-
logger: logger,
55-
},
50+
Ti: t,
51+
ChunkerTarget: chunkerTarget,
52+
lowerBoundWatermarkMap: make(map[string]*Chunk, 0),
53+
logger: logger,
5654
}, nil
5755
}
5856
return NewCompositeChunker(t, chunkerTarget, logger, "", "")
@@ -62,12 +60,10 @@ func NewChunker(t *TableInfo, chunkerTarget time.Duration, logger loggers.Advanc
6260
// setting its Key if keyName and where conditions are provided
6361
func NewCompositeChunker(t *TableInfo, chunkerTarget time.Duration, logger loggers.Advanced, keyName string, whereCondition string) (Chunker, error) {
6462
c := chunkerComposite{
65-
coreChunker: &coreChunker{
66-
Ti: t,
67-
ChunkerTarget: chunkerTarget,
68-
lowerBoundWatermarkMap: make(map[string]*Chunk, 0),
69-
logger: logger,
70-
},
63+
Ti: t,
64+
ChunkerTarget: chunkerTarget,
65+
lowerBoundWatermarkMap: make(map[string]*Chunk, 0),
66+
logger: logger,
7167
}
7268
var err error
7369
if keyName != "" && whereCondition != "" {

pkg/table/chunker_composite.go

Lines changed: 109 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,37 @@ import (
66
"fmt"
77
"reflect"
88
"strings"
9+
"sync"
910
"time"
1011

12+
"github.com/siddontang/loggers"
1113
"golang.org/x/exp/slices"
1214
)
1315

1416
type chunkerComposite struct {
15-
*coreChunker
17+
sync.Mutex
18+
Ti *TableInfo
19+
chunkSize uint64
20+
chunkPtrs []Datum // a list of Ptrs for each of the keys.
21+
chunkKeys []string // all the keys to chunk on (usually all the col names of the PK)
22+
keyName string // the name of the key we are chunking on
23+
where string // any additional WHERE conditions.
24+
finalChunkSent bool
25+
isOpen bool
1626

17-
chunkSize uint64
18-
chunkPtrs []Datum // a list of Ptrs for each of the keys.
19-
chunkKeys []string // all the keys to chunk on (usually all the col names of the PK)
20-
keyName string // the name of the key we are chunking on
21-
where string // any additional WHERE conditions.
27+
// Dynamic Chunking is time based instead of row based.
28+
// It uses *time* to determine the target chunk size.
29+
chunkTimingInfo []time.Duration
30+
ChunkerTarget time.Duration // i.e. 500ms for target
31+
32+
// This is used for restore.
33+
watermark *Chunk
34+
// Map from lowerbound value of a chunk -> chunk,
35+
// Used to update the watermark by applying stored chunks,
36+
// by comparing their lowerBound with current watermark upperBound.
37+
lowerBoundWatermarkMap map[string]*Chunk
38+
39+
logger loggers.Advanced
2240
}
2341

2442
var _ Chunker = &chunkerComposite{}
@@ -227,6 +245,91 @@ func (t *chunkerComposite) Feedback(chunk *Chunk, d time.Duration) {
227245
}
228246
}
229247

248+
// GetLowWatermark returns the highest known value that has been safely copied,
249+
// which (due to parallelism) could be significantly behind the high watermark.
250+
// The value is discovered via ChunkerFeedback(), and when retrieved from this func
251+
// can be used to write a checkpoint for restoration.
252+
func (t *chunkerComposite) GetLowWatermark() (string, error) {
253+
t.Lock()
254+
defer t.Unlock()
255+
if t.watermark == nil || t.watermark.UpperBound == nil || t.watermark.LowerBound == nil {
256+
return "", errors.New("watermark not yet ready")
257+
}
258+
259+
return t.watermark.JSON(), nil
260+
}
261+
262+
// isSpecialRestoredChunk is used to test for the first chunk after restore-from-checkpoint.
263+
// The restored chunk is a really special beast because the lowerbound
264+
// will be repeated by the first chunk that is applied post restore.
265+
// This is called under a mutex.
266+
func (t *chunkerComposite) isSpecialRestoredChunk(chunk *Chunk) bool {
267+
if chunk.LowerBound == nil || chunk.UpperBound == nil || t.watermark == nil || t.watermark.LowerBound == nil || t.watermark.UpperBound == nil {
268+
return false // restored checkpoints always have both.
269+
}
270+
return chunk.LowerBound.comparesTo(t.watermark.LowerBound)
271+
}
272+
273+
// bumpWatermark updates the minimum value that is known to be safely copied,
274+
// and is called under a mutex.
275+
// Because of parallelism, it is possible that a chunk is copied out of order,
276+
// so this func needs to account for that.
277+
// Basically:
278+
// - If the chunk does not "align" to the current low watermark, it's stored in a map keyed by its lowerBound valuesString() value.
279+
// - If it does align, the watermark is bumped to the chunk's max value. Then
280+
// stored chunk map is checked to see if an existing chunk lowerBound aligns with the new watermark.
281+
// - If any stored chunk aligns, it is deleted off the map and the watermark is bumped.
282+
// - This process repeats until there is no more alignment from the stored map *or* the map is empty.
283+
func (t *chunkerComposite) bumpWatermark(chunk *Chunk) {
284+
if chunk.UpperBound == nil {
285+
return
286+
}
287+
// Check if this is the first chunk or it's the special restored chunk.
288+
// If so, set the watermark and then go on to applying any stored chunks.
289+
if (t.watermark == nil && chunk.LowerBound == nil) || t.isSpecialRestoredChunk(chunk) {
290+
t.watermark = chunk
291+
goto applyStoredChunks
292+
}
293+
294+
// Validate that chunk has lower bound before moving on
295+
if chunk.LowerBound == nil {
296+
errMsg := fmt.Sprintf("coreChunker.bumpWatermark: nil lowerBound value encountered more than once: %v", chunk)
297+
t.logger.Fatal(errMsg)
298+
}
299+
300+
// We haven't set the first chunk yet, or it's not aligned with the
301+
// previous watermark. Store it in the map keyed by its lowerBound, and move on.
302+
303+
// We only need to store by lowerBound because, when updating watermark
304+
// we always compare the upperBound of current watermark to lowerBound of stored chunks.
305+
// Key can never be nil, because first chunk will not hit this code path and all remaining chunks will have lowerBound.
306+
if t.watermark == nil || !t.watermark.UpperBound.comparesTo(chunk.LowerBound) {
307+
t.lowerBoundWatermarkMap[chunk.LowerBound.valuesString()] = chunk
308+
return
309+
}
310+
311+
// The remaining case is:
312+
// t.watermark.UpperBound.Value == chunk.LowerBound.Value
313+
// Replace the current watermark with the chunk.
314+
t.watermark = chunk
315+
316+
applyStoredChunks:
317+
318+
// Check the waterMarkMap for any chunks that align with the new watermark.
319+
// If there are any, bump the watermark and delete from the map.
320+
// If there are none, we're done.
321+
for t.waterMarkMapNotEmpty() && t.watermark.UpperBound != nil && t.lowerBoundWatermarkMap[t.watermark.UpperBound.valuesString()] != nil {
322+
key := t.watermark.UpperBound.valuesString()
323+
nextWatermark := t.lowerBoundWatermarkMap[key]
324+
t.watermark = nextWatermark
325+
delete(t.lowerBoundWatermarkMap, key)
326+
}
327+
}
328+
329+
func (t *chunkerComposite) waterMarkMapNotEmpty() bool {
330+
return t.lowerBoundWatermarkMap != nil && len(t.lowerBoundWatermarkMap) != 0
331+
}
332+
230333
func (t *chunkerComposite) open() (err error) {
231334
if t.isOpen {
232335
// This prevents an error where open is re-called
@@ -316,17 +419,3 @@ func (t *chunkerComposite) SetKey(keyName string, where string) error {
316419
t.where = where
317420
return nil
318421
}
319-
320-
// GetLowWatermark returns the highest known value that has been safely copied,
321-
// which (due to parallelism) could be significantly behind the high watermark.
322-
// The value is discovered via Chunker Feedback(), and when retrieved from this func
323-
// can be used to write a checkpoint for restoration.
324-
func (t *chunkerComposite) GetLowWatermark() (string, error) {
325-
t.Lock()
326-
defer t.Unlock()
327-
if t.watermark == nil || t.watermark.UpperBound == nil || t.watermark.LowerBound == nil {
328-
return "", errors.New("watermark not yet ready")
329-
}
330-
331-
return t.watermark.JSON(), nil
332-
}

pkg/table/chunker_composite_test.go

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -270,12 +270,10 @@ func TestCompositeLowWatermark(t *testing.T) {
270270
assert.NoError(t, t1.SetInfo(context.Background()))
271271

272272
chunker := &chunkerComposite{
273-
coreChunker: &coreChunker{
274-
Ti: t1,
275-
ChunkerTarget: ChunkerDefaultTarget,
276-
lowerBoundWatermarkMap: make(map[string]*Chunk, 0),
277-
logger: logrus.New(),
278-
},
273+
Ti: t1,
274+
ChunkerTarget: ChunkerDefaultTarget,
275+
lowerBoundWatermarkMap: make(map[string]*Chunk, 0),
276+
logger: logrus.New(),
279277
}
280278
_, err = chunker.Next()
281279
assert.Error(t, err) // not open yet
@@ -430,11 +428,9 @@ func TestSetKey(t *testing.T) {
430428
t1 := NewTableInfo(db, "test", "setkey_t1")
431429
assert.NoError(t, t1.SetInfo(context.Background()))
432430
chunker := &chunkerComposite{
433-
coreChunker: &coreChunker{
434-
Ti: t1,
435-
ChunkerTarget: 100 * time.Millisecond,
436-
logger: logrus.New(),
437-
},
431+
Ti: t1,
432+
ChunkerTarget: 100 * time.Millisecond,
433+
logger: logrus.New(),
438434
}
439435
err = chunker.SetKey("s", "status = 'ARCHIVED' AND updated_at < NOW() - INTERVAL 1 DAY")
440436
assert.NoError(t, err)
@@ -449,11 +445,9 @@ func TestSetKey(t *testing.T) {
449445

450446
// If I reset again with a different condition it should range as chunks.
451447
chunker = &chunkerComposite{
452-
coreChunker: &coreChunker{
453-
logger: logrus.New(),
454-
ChunkerTarget: 100 * time.Millisecond,
455-
Ti: t1,
456-
},
448+
Ti: t1,
449+
ChunkerTarget: 100 * time.Millisecond,
450+
logger: logrus.New(),
457451
}
458452
err = chunker.SetKey("s", "status = 'PENDING' AND updated_at > NOW() - INTERVAL 1 DAY")
459453
assert.NoError(t, err)
@@ -484,11 +478,9 @@ func TestSetKey(t *testing.T) {
484478
// Test other index types.
485479
for _, index := range []string{"u", "su", "ui"} {
486480
chunker = &chunkerComposite{
487-
coreChunker: &coreChunker{
488-
Ti: t1,
489-
ChunkerTarget: 100 * time.Millisecond,
490-
logger: logrus.New(),
491-
},
481+
Ti: t1,
482+
ChunkerTarget: 100 * time.Millisecond,
483+
logger: logrus.New(),
492484
}
493485
err = chunker.SetKey(index, "updated_at < NOW() - INTERVAL 1 DAY")
494486
assert.NoError(t, err)
@@ -554,11 +546,9 @@ func TestSetKeyCompositeKeyMerge(t *testing.T) {
554546
t1 := NewTableInfo(db, "test", "setkeycomposite_t1")
555547
assert.NoError(t, t1.SetInfo(context.Background()))
556548
chunker := &chunkerComposite{
557-
coreChunker: &coreChunker{
558-
logger: logrus.New(),
559-
ChunkerTarget: 100 * time.Millisecond,
560-
Ti: t1,
561-
},
549+
Ti: t1,
550+
ChunkerTarget: 100 * time.Millisecond,
551+
logger: logrus.New(),
562552
}
563553
err = chunker.SetKey("dnc", "")
564554
assert.NoError(t, err)

0 commit comments

Comments
 (0)