Skip to content

Commit 399f52c

Browse files
committed
Add a basic WAL
The WAL will serialize upserts and deletions for later replay.
1 parent a2fc06a commit 399f52c

File tree

8 files changed

+701
-0
lines changed

8 files changed

+701
-0
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/spf13/cobra v1.2.1
1313
github.com/spf13/viper v1.9.0
1414
github.com/stretchr/testify v1.7.0
15+
google.golang.org/api v0.56.0
1516
google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12
1617
google.golang.org/grpc v1.42.0
1718
google.golang.org/protobuf v1.27.1

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,7 @@ google.golang.org/api v0.48.0/go.mod h1:71Pr1vy+TAZRPkPs/xlCf5SsU8WjuAWv1Pfjbtuk
630630
google.golang.org/api v0.50.0/go.mod h1:4bNT5pAuq5ji4SRZm+5QIkjny9JAyVD/3gaSihNefaw=
631631
google.golang.org/api v0.51.0/go.mod h1:t4HdrdoNgyN5cbEfm7Lum0lcLDLiise1F8qDKX00sOU=
632632
google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6z3k=
633+
google.golang.org/api v0.56.0 h1:08F9XVYTLOGeSQb3xI9C0gXMuQanhdGed0cWFhDozbI=
633634
google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
634635
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
635636
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=

internal/wal/iterator.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package wal
2+
3+
import (
4+
"bufio"
5+
"encoding/binary"
6+
"errors"
7+
"fmt"
8+
"io"
9+
10+
"google.golang.org/api/iterator"
11+
"google.golang.org/protobuf/proto"
12+
)
13+
14+
var (
15+
ErrInvalidSkip = errors.New("wal: cannot skip, already at a later txid")
16+
ErrNoSuchTxid = errors.New("wal: no such txid")
17+
)
18+
19+
type Iterator struct {
20+
source *bufio.Reader
21+
peeked *Record
22+
started bool
23+
}
24+
25+
// NewIterator creates a new iterator for the given reader.
26+
func NewIterator(source io.Reader) *Iterator {
27+
return &Iterator{source: bufio.NewReader(source)}
28+
}
29+
30+
// Next returns the next record in the WAL and advances the iterator.
31+
func (r *Iterator) Next() (*Record, error) {
32+
if !r.started {
33+
r.started = true
34+
if err := verifySchemaVersion(r.source); err != nil {
35+
return nil, err
36+
}
37+
}
38+
39+
if r.peeked != nil {
40+
rec := r.peeked
41+
r.peeked = nil
42+
return rec, nil
43+
}
44+
45+
rec, err := r.nextRecord()
46+
if err == io.EOF {
47+
return nil, iterator.Done
48+
}
49+
50+
return rec, nil
51+
}
52+
53+
// SkipTo advances the iterator to the first record whose transaction ID is equal to or higher than
54+
// txid.
55+
func (r *Iterator) SkipTo(txid uint64) error {
56+
for {
57+
rec, err := r.Next()
58+
if err != nil {
59+
if errors.Is(err, iterator.Done) {
60+
return ErrNoSuchTxid
61+
}
62+
return err
63+
}
64+
65+
if rec.GetTxid() > txid {
66+
return ErrInvalidSkip
67+
}
68+
69+
if rec.GetTxid() == txid {
70+
r.peeked = rec
71+
return nil
72+
}
73+
}
74+
}
75+
76+
// nextRecord fetches the next record from the source.
77+
func (r *Iterator) nextRecord() (*Record, error) {
78+
var recordLen uint32
79+
if err := binary.Read(r.source, binary.LittleEndian, &recordLen); err != nil {
80+
return nil, err
81+
}
82+
83+
recordBytes := make([]byte, recordLen)
84+
if _, err := r.source.Read(recordBytes); err != nil {
85+
return nil, err
86+
}
87+
88+
var record Record
89+
if err := proto.Unmarshal(recordBytes, &record); err != nil {
90+
return nil, err
91+
}
92+
93+
return &record, nil
94+
}
95+
96+
// verifySchemaVersion checks the schema version of the SSTable index file and returns an error if
97+
// the version is not supported.
98+
func verifySchemaVersion(r io.Reader) error {
99+
var version uint32
100+
101+
if err := binary.Read(r, binary.LittleEndian, &version); err != nil {
102+
if errors.Is(err, io.EOF) {
103+
// Allow completely empty indices to simplify testing.
104+
return nil
105+
}
106+
return err
107+
}
108+
109+
if version != schemaVersion {
110+
return fmt.Errorf("unsupported WAL schema version: %d", version)
111+
}
112+
113+
return nil
114+
}

internal/wal/iterator_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package wal
2+
3+
import (
4+
"os"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
"google.golang.org/api/iterator"
9+
)
10+
11+
func TestIterator_Next(t *testing.T) {
12+
w, teardown := createNewWAL(t, "wal_iterator_next")
13+
defer teardown()
14+
15+
err := w.Upsert(1, "foo", []byte("foo value"))
16+
require.NoError(t, err)
17+
18+
err = w.Delete(2, "bar")
19+
require.NoError(t, err)
20+
21+
err = w.Upsert(3, "bar", []byte("bar value"))
22+
require.NoError(t, err)
23+
24+
require.NoError(t, w.Close())
25+
26+
it := createIterator(t, w.file.Name())
27+
28+
rec, err := it.Next()
29+
require.NoError(t, err)
30+
require.Equal(t, uint64(1), rec.GetTxid())
31+
require.Equal(t, "foo", rec.GetUpsert().GetKey())
32+
require.Equal(t, "foo value", string(rec.GetUpsert().GetValue()))
33+
34+
rec, err = it.Next()
35+
require.NoError(t, err)
36+
require.Equal(t, uint64(2), rec.GetTxid())
37+
require.Equal(t, "bar", rec.GetDelete().GetKey())
38+
39+
rec, err = it.Next()
40+
require.NoError(t, err)
41+
require.Equal(t, uint64(3), rec.GetTxid())
42+
require.Equal(t, "bar", rec.GetUpsert().GetKey())
43+
require.Equal(t, "bar value", string(rec.GetUpsert().GetValue()))
44+
45+
_, err = it.Next()
46+
require.ErrorIs(t, iterator.Done, err)
47+
}
48+
49+
func TestIterator_SkipTo(t *testing.T) {
50+
w, teardown := createNewWAL(t, "wal_iterator_peek")
51+
defer teardown()
52+
53+
err := w.Upsert(1, "foo", []byte("foo value"))
54+
require.NoError(t, err)
55+
56+
err = w.Delete(2, "bar")
57+
require.NoError(t, err)
58+
59+
err = w.Upsert(3, "bar", []byte("bar value"))
60+
require.NoError(t, err)
61+
62+
require.NoError(t, w.Close())
63+
64+
it := createIterator(t, w.file.Name())
65+
66+
require.NoError(t, it.SkipTo(2))
67+
68+
rec, err := it.Next()
69+
require.NoError(t, err)
70+
require.Equal(t, uint64(2), rec.GetTxid())
71+
require.Equal(t, "bar", rec.GetDelete().GetKey())
72+
73+
// Can't skip backwards.
74+
err = it.SkipTo(1)
75+
require.ErrorIs(t, ErrInvalidSkip, err)
76+
77+
// Can't skip to missing ID.
78+
err = it.SkipTo(4)
79+
require.ErrorIs(t, ErrNoSuchTxid, err)
80+
}
81+
82+
// -------------------------------------------------------------------------------------------------
83+
84+
func createIterator(t *testing.T, path string) *Iterator {
85+
f, err := os.Open(path)
86+
require.NoError(t, err)
87+
88+
return NewIterator(f)
89+
}

0 commit comments

Comments
 (0)