Skip to content
This repository was archived by the owner on Sep 6, 2018. It is now read-only.

Streaming snapshot sends #223

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
snapshots now stream, rather than a big buffer
jbooth committed May 7, 2014
commit fb835800a57bfd0c4f0246fb5a5de1d71d83d4a7
37 changes: 27 additions & 10 deletions http_transporter.go
Original file line number Diff line number Diff line change
@@ -202,23 +202,40 @@ func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *Sn
}

// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
traceln("transporter.rv.encoding.error:", err)
return nil
}
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequestHeader, body StateMachineIo) *SnapshotRecoveryResponse {

url := joinPath(peer.ConnectionString, t.snapshotRecoveryPath)
traceln(server.Name(), "POST", url)

httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
// spin off writer function
pRead, pWrite := io.Pipe()
writeErr := make(chan error, 1)
go func() {
defer pWrite.Close()
if _, err := req.Encode(pWrite); err != nil {
writeErr <- fmt.Errorf("transporter.rv.encoding.error:%s", err)
return
}
if _, err := body.WriteSnapshot(pWrite); err != nil {
writeErr <- fmt.Errorf("err opening StateMachine snapshot:%s", err)
return
}
writeErr <- nil
return
}()

// invoke http request and get our response
httpResp, err := t.httpClient.Post(url, "application/protobuf", pRead)
if httpResp == nil || err != nil {
traceln("transporter.rv.response.error:", err)
return nil
}
defer httpResp.Body.Close()

err = <-writeErr
if err != nil {
traceln("transporter.rv.write.err:", err)
return nil
}
resp := &SnapshotRecoveryResponse{}
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
traceln("transporter.rv.decoding.error:", err)
@@ -306,13 +323,13 @@ func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFun
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /snapshotRecovery")

req := &SnapshotRecoveryRequest{}
req := &SnapshotRecoveryRequestHeader{}
if _, err := req.Decode(r.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}

resp := server.SnapshotRecoveryRequest(req)
resp := server.SnapshotRecoveryRequest(req, r.Body)
if resp == nil {
http.Error(w, "Failed creating response.", http.StatusInternalServerError)
return
4 changes: 2 additions & 2 deletions peer.go
Original file line number Diff line number Diff line change
@@ -277,9 +277,9 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {

// Sends an Snapshot Recovery request to the peer through the transport.
func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.snapshot)
req := newSnapshotRecoveryRequestHeader(p.server.name, p.server.snapshot)
debugln("peer.snap.recovery.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req, p.server.StateMachine())

if resp == nil {
debugln("peer.snap.recovery.timeout: ", p.Name)
126 changes: 61 additions & 65 deletions server.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path"
@@ -68,7 +68,7 @@ var StopError = errors.New("raft: Has been stopped")
type Server interface {
Name() string
Context() interface{}
StateMachine() StateMachine
StateMachine() StateMachineIo
Leader() string
State() string
Path() string
@@ -92,7 +92,7 @@ type Server interface {
AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
RequestVote(req *RequestVoteRequest) *RequestVoteResponse
RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
SnapshotRecoveryRequest(req *SnapshotRecoveryRequestHeader, body io.ReadCloser) *SnapshotRecoveryResponse
AddPeer(name string, connectiongString string) error
RemovePeer(name string) error
Peers() map[string]*Peer
@@ -137,7 +137,7 @@ type server struct {
// set to nil.
pendingSnapshot *Snapshot

stateMachine StateMachine
stateMachine StateMachineIo
maxLogEntriesPerRequest uint64

connectionString string
@@ -163,7 +163,7 @@ type ev struct {
// compaction is to be disabled. context can be anything (including nil)
// and is not used by the raft package except returned by
// Server.Context(). connectionString can be anything.
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachineIo, ctx interface{}, connectionString string) (Server, error) {
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
@@ -268,7 +268,7 @@ func (s *server) Context() interface{} {
}

// Retrieves the state machine passed into the constructor.
func (s *server) StateMachine() StateMachine {
func (s *server) StateMachine() StateMachineIo {
return s.stateMachine
}

@@ -877,7 +877,7 @@ func (s *server) snapshotLoop() {
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
case *SnapshotRecoveryRequest:
case *snapRecovReqAndBody:
e.returnValue = s.processSnapshotRecoveryRequest(req)
}
// Callback to event.
@@ -1199,12 +1199,7 @@ func (s *server) TakeSnapshot() error {

path := s.SnapshotPath(lastIndex, lastTerm)
// Attach snapshot to pending snapshot and save it to disk.
s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}

state, err := s.stateMachine.Save()
if err != nil {
return err
}
s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, path}

// Clone the list of peers.
peers := make([]*Peer, 0, len(s.peers)+1)
@@ -1215,9 +1210,23 @@ func (s *server) TakeSnapshot() error {

// Attach snapshot to pending snapshot and save it to disk.
s.pendingSnapshot.Peers = peers
s.pendingSnapshot.State = state
s.saveSnapshot()

// pipe state machine to snapshot file
snapOut, err := s.pendingSnapshot.writeState()
if err != nil {
return err
}
_, err = s.StateMachine().WriteSnapshot(snapOut)
if err != nil {
return err
}
err = snapOut.Close()
if err != nil {
return err
}
err = s.savePendingSnapshot()
if err != nil {
return err
}
// We keep some log entries after the snapshot.
// We do not want to send the whole snapshot to the slightly slow machines
if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
@@ -1229,17 +1238,12 @@ func (s *server) TakeSnapshot() error {
return nil
}

// Retrieves the log path for the server.
func (s *server) saveSnapshot() error {
// moves pending snapshot into position and removes previous snap
func (s *server) savePendingSnapshot() error {
if s.pendingSnapshot == nil {
return errors.New("pendingSnapshot.is.nil")
}

// Write snapshot to disk.
if err := s.pendingSnapshot.save(); err != nil {
return err
}

// Swap the current and last snapshots.
tmp := s.snapshot
s.snapshot = s.pendingSnapshot
@@ -1280,32 +1284,51 @@ func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse
return newSnapshotResponse(true)
}

func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
ret, _ := s.send(req)
type snapRecovReqAndBody struct {
req *SnapshotRecoveryRequestHeader
body io.ReadCloser
}

func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequestHeader, body io.ReadCloser) *SnapshotRecoveryResponse {
ret, _ := s.send(&snapRecovReqAndBody{req, body})
resp, _ := ret.(*SnapshotRecoveryResponse)
return resp
}

func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
// Recover state sent from request.
if err := s.stateMachine.Recovery(req.State); err != nil {
panic("cannot recover from previous state")
func (s *server) processSnapshotRecoveryRequest(r *snapRecovReqAndBody) *SnapshotRecoveryResponse {
req := r.req
// Create local snapshot.
s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, s.SnapshotPath(req.LastIndex, req.LastTerm)}

// we want to tee the state from req.body into both our state machine and a backup snapshot
snapOut, err := s.pendingSnapshot.writeState()
if err != nil {
panic("Couldn't create new snapshot file!")
}
tee := io.TeeReader(r.body, snapOut)
err = s.stateMachine.RecoverSnapshot(tee)
if err != nil {
panic("Couldn't recover state machine from snapshot!")
}
err = snapOut.Close()
if err != nil {
panic("Couldn't write snapshot file!")
}

err = s.savePendingSnapshot()
if err != nil {
panic(fmt.Errorf("Couldn't save pending snapshot: %s", err))
}

// Recover the cluster configuration.
s.peers = make(map[string]*Peer)
for _, peer := range req.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}

// Update log state.
s.currentTerm = req.LastTerm
s.log.updateCommitIndex(req.LastIndex)

// Create local snapshot.
s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
s.saveSnapshot()

// Clear the previous log entries.
s.log.compact(req.LastIndex, req.LastTerm)

@@ -1339,42 +1362,15 @@ func (s *server) LoadSnapshot() error {
snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])

// Read snapshot data.
file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
var state io.ReadCloser
s.snapshot, state, err = readSnapState(snapshotPath)
if err != nil {
return err
}
defer file.Close()

// Check checksum.
var checksum uint32
n, err := fmt.Fscanf(file, "%08x\n", &checksum)
if err != nil {
return err
} else if n != 1 {
return errors.New("checksum.err: bad.snapshot.file")
}

// Load remaining snapshot contents.
b, err := ioutil.ReadAll(file)
if err != nil {
return err
}

// Generate checksum.
byteChecksum := crc32.ChecksumIEEE(b)
if uint32(checksum) != byteChecksum {
s.debugln(checksum, " ", byteChecksum)
return errors.New("bad snapshot file")
}

// Decode snapshot.
if err = json.Unmarshal(b, &s.snapshot); err != nil {
s.debugln("unmarshal.snapshot.error: ", err)
return err
}
defer state.Close()

// Recover snapshot into state machine.
if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
if err = s.stateMachine.RecoverSnapshot(state); err != nil {
s.debugln("recovery.snapshot.error: ", err)
return err
}
224 changes: 179 additions & 45 deletions snapshot.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package raft

import (
"bufio"
"code.google.com/p/gogoprotobuf/proto"
"encoding/binary"
"encoding/json"
"fmt"
"github.com/goraft/raft/protobuf"
"hash"
"hash/crc32"
"io"
"io/ioutil"
"os"

"code.google.com/p/gogoprotobuf/proto"
"github.com/goraft/raft/protobuf"
)

// Snapshot represents an in-memory representation of the current state of the system.
@@ -19,17 +21,16 @@ type Snapshot struct {

// Cluster configuration.
Peers []*Peer `json:"peers"`
State []byte `json:"state"`
Path string `json:"path"`
}

// The request sent to a server to start from the snapshot.
type SnapshotRecoveryRequest struct {
type SnapshotRecoveryRequestHeader struct {
LeaderName string
LastIndex uint64
LastTerm uint64
Peers []*Peer
State []byte
// bytes of state stream here, see StateMachineIo.WriteSnapshot, StateMachineIo.RecoverSnapshot
}

// The response returned from a server appending entries to the log.
@@ -51,38 +52,135 @@ type SnapshotResponse struct {
Success bool `json:"success"`
}

// save writes the snapshot to file.
func (ss *Snapshot) save() error {
// returns the snapshot metadata and a readcloser of snapshot state
func readSnapState(path string) (*Snapshot, io.ReadCloser, error) {
f, err := os.Open(path)
if err != nil {
f.Close()
return nil, nil, err
}
in := bufio.NewReaderSize(f, 64*1024)
var checksum uint32
var headerLen int
_, err = fmt.Fscanf(in, "%08x\n%08x\n", &checksum, &headerLen)
if err != nil {
f.Close()
return nil, nil, err
}
header := make([]byte, headerLen, headerLen)
nRead := 0
for nRead < headerLen {
n, err := in.Read(header[nRead:])
if err != nil {
f.Close()
return nil, nil, err
}
nRead += n
}
snap := &Snapshot{}
err = json.Unmarshal(header, snap)
if err != nil {
f.Close()
return nil, nil, err
}
return snap, &sumReader{f, in, crc32.NewIEEE(), checksum}, nil
}

// pulls from the reader and adds up a sum, throws error at EOF or Close if sum not expected
type sumReader struct {
f *os.File
in io.Reader
sum hash.Hash32
expected uint32
}

func (s *sumReader) Read(b []byte) (n int, err error) {
n, err = s.in.Read(b)
if n > 0 {
s.sum.Write(b[:n])
}
if err == io.EOF {
if s.sum.Sum32() != s.expected {
return n, fmt.Errorf("Bad checksum! Got %d expected %d", s.sum.Sum32(), s.expected)
}
}
return n, err
}

func (s *sumReader) Close() (err error) {
err = s.f.Close()
if s.sum.Sum32() != s.expected {
return fmt.Errorf("Bad checksum! Got %d expected %d", s.sum.Sum32(), s.expected)
}
return err
}

// returns a writer which can be used to save a snapshot.
// upon close(), this snapshot has been fsync'd with a crc32 checksum in the first bytes of the file
func (ss *Snapshot) writeState() (io.WriteCloser, error) {
// Open the file for writing.
file, err := os.OpenFile(ss.Path, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
return nil, err
}
defer file.Close()

// Serialize to JSON.
// serialize header json
b, err := json.Marshal(ss)
if err != nil {
return err
return nil, err
}
// sum&size, then header bytes
// checksum initially zero, we overwrite later
bufOut := bufio.NewWriterSize(file, 64*1024)
_, err = fmt.Fprintf(bufOut, "%08x\n%08x\n", 0, len(b))
if err != nil {
file.Close()
return nil, fmt.Errorf("Error writing header: %s", err)
}
if _, err := bufOut.Write(b); err != nil {
file.Close()
return nil, fmt.Errorf("Error writing header: %s", err)
}
// state streams after the checksum and json header
return &sumWriter{
file,
bufOut,
crc32.NewIEEE(),
}, nil
}

// Generate checksum and write it to disk.
checksum := crc32.ChecksumIEEE(b)
if _, err = fmt.Fprintf(file, "%08x\n", checksum); err != nil {
return err
type sumWriter struct {
f *os.File
bufout *bufio.Writer // buffered writer
sum hash.Hash32
}

func (s *sumWriter) Write(b []byte) (n int, err error) {
n, err = s.bufout.Write(b)
if n > 0 {
s.sum.Write(b[:n])
}
return n, err
}

// Write the snapshot to disk.
if _, err = file.Write(b); err != nil {
func (s *sumWriter) Close() (err error) {
// flush rest of data from bufout
if err = s.bufout.Flush(); err != nil {
return err
}

// Ensure that the snapshot has been flushed to disk before continuing.
if err := file.Sync(); err != nil {
// seek to beginning to overwrite checksum
if _, err = s.f.Seek(0, os.SEEK_SET); err != nil {
return err
}

return nil
// write sum
if _, err = fmt.Fprintf(s.f, "%08x\n", s.sum.Sum32()); err != nil {
return err
}
// fsync
if err = s.f.Sync(); err != nil {
return fmt.Errorf("Error fsyncing snapshot: %s", err)
}
return s.f.Close()
}

// remove deletes the snapshot file.
@@ -94,19 +192,21 @@ func (ss *Snapshot) remove() error {
}

// Creates a new Snapshot request.
func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *SnapshotRecoveryRequest {
return &SnapshotRecoveryRequest{

func newSnapshotRecoveryRequestHeader(leaderName string, snapshot *Snapshot) *SnapshotRecoveryRequestHeader {
return &SnapshotRecoveryRequestHeader{
LeaderName: leaderName,
LastIndex: snapshot.LastIndex,
LastTerm: snapshot.LastTerm,
Peers: snapshot.Peers,
State: snapshot.State,
}
}

var empty = make([]byte, 0, 0)

// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error) {
func (req *SnapshotRecoveryRequestHeader) Encode(w io.Writer) (int, error) {

protoPeers := make([]*protobuf.SnapshotRecoveryRequest_Peer, len(req.Peers))

@@ -122,36 +222,25 @@ func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error) {
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
Peers: protoPeers,
State: req.State,
State: empty,
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}

return w.Write(p)
return encodeProto(w, pb)
}

// Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)

if err != nil {
return 0, err
}

totalBytes := len(data)
func (req *SnapshotRecoveryRequestHeader) Decode(r io.Reader) (int, error) {

pb := &protobuf.SnapshotRecoveryRequest{}
if err = proto.Unmarshal(data, pb); err != nil {

n, err := decodeProto(r, pb)
if err != nil {
return -1, err
}

req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
req.State = pb.GetState()

req.Peers = make([]*Peer, len(pb.Peers))

@@ -162,7 +251,52 @@ func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) {
}
}

return totalBytes, nil
return n, nil
}

func decodeProto(r io.Reader, p proto.Unmarshaler) (n int, err error) {
lenBuff := make([]byte, 4, 4)
n, err = r.Read(lenBuff)
if err != nil {
return n, err
}
length := int(binary.LittleEndian.Uint32(lenBuff))
body := make([]byte, length, length)
nRead := 0
for nRead < length {
n, err = r.Read(body[nRead:])
if n > 0 {
nRead += n
}
if err != nil {
return nRead + 4, err
}
}
return nRead + 4, nil
}

func encodeProto(w io.Writer, p proto.Marshaler) (n int, err error) {
buff, err := p.Marshal()
if err != nil {
return 0, err
}
lenBuff := make([]byte, 4, 4)
binary.LittleEndian.PutUint32(lenBuff, uint32(len(buff)))
n, err = w.Write(lenBuff)
if err != nil {
return n, err
}
nWritten := 0
for nWritten < len(buff) {
n, err = w.Write(buff[nWritten:])
if n > 0 {
nWritten += n
}
if err != nil {
return nWritten + 4, err
}
}
return nWritten + 4, nil
}

// Creates a new Snapshot response.
18 changes: 11 additions & 7 deletions snapshot_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package raft

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"io"
"testing"
"time"
)

// Ensure that a snapshot occurs when there are existing logs.
@@ -77,22 +77,26 @@ func TestSnapshotRequest(t *testing.T) {
assert.Equal(t, resp.Success, true)
assert.Equal(t, s.State(), Snapshotting)

stateToSend := &StateMachineIoWrapper{&DefaultStateMachine{[]byte("bar")}}
r, w := io.Pipe()
go func() {
stateToSend.WriteSnapshot(w)
}()
// Send recovery request.
resp2 := s.SnapshotRecoveryRequest(&SnapshotRecoveryRequest{
resp2 := s.SnapshotRecoveryRequest(&SnapshotRecoveryRequestHeader{
LeaderName: "1",
LastIndex: 5,
LastTerm: 2,
Peers: make([]*Peer, 0),
State: []byte("bar"),
})
}, r)
assert.Equal(t, resp2.Success, true)
})
}

func runServerWithMockStateMachine(state string, fn func(s Server, m *mock.Mock)) {
var m mockStateMachine
s := newTestServer("1", &testTransporter{})
s.(*server).stateMachine = &m
s.(*server).stateMachine = &StateMachineIoWrapper{&m}
if err := s.Start(); err != nil {
panic("server start error: " + err.Error())
}
81 changes: 81 additions & 0 deletions statemachine.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,90 @@
package raft

import (
"encoding/binary"
"io"
)

// StateMachine is the interface for allowing the host application to save and
// recovery the state machine. This makes it possible to make snapshots
// and compact the log.
type StateMachine interface {
Save() ([]byte, error)
Recovery([]byte) error
}

type DefaultStateMachine struct {
b []byte
}

func (d *DefaultStateMachine) Save() ([]byte, error) {
return d.b, nil
}

func (d *DefaultStateMachine) Recovery(b []byte) error {
d.b = b
return nil
}

type StateMachineIo interface {
WriteSnapshot(w io.Writer) (int, error)
RecoverSnapshot(r io.Reader) error
}

type StateMachineIoWrapper struct {
s StateMachine
}

func (s *StateMachineIoWrapper) WriteSnapshot(w io.Writer) (int, error) {
b, err := s.s.Save()
if err != nil {
return 0, err
}
length := len(b)
sizeBuff := make([]byte, 8, 8)
binary.LittleEndian.PutUint64(sizeBuff, uint64(length))
written := 0
for written < 8 {
n, err := w.Write(sizeBuff[written:])
if err != nil {
return 0, err
}
written += n
}
written = 0
for written < length {
n, err := w.Write(b[written:])
if n > 0 {
written += n
}
if err != nil {
return written, err
}
}
return written + 8, nil
}

func (s *StateMachineIoWrapper) RecoverSnapshot(r io.Reader) error {
sizeBuff := make([]byte, 8, 8)
nRead := 0
for nRead < 8 {
n, err := r.Read(sizeBuff[nRead:])
if n > 0 {
nRead += n
}
if err != nil {
return err
}
}
length := int(binary.LittleEndian.Uint64(sizeBuff))
snapshot := make([]byte, length, length)
nRead = 0
for nRead < length {
n, err := r.Read(snapshot[nRead:])
if err != nil {
return err
}
nRead += n
}
return s.s.Recovery(snapshot)
}
4 changes: 2 additions & 2 deletions test.go
Original file line number Diff line number Diff line change
@@ -146,8 +146,8 @@ func (t *testTransporter) SendSnapshotRequest(server Server, peer *Peer, req *Sn
return t.sendSnapshotRequestFunc(server, peer, req)
}

func (t *testTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
return t.SendSnapshotRecoveryRequest(server, peer, req)
func (t *testTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequestHeader, body StateMachineIo) *SnapshotRecoveryResponse {
return t.SendSnapshotRecoveryRequest(server, peer, req, body)
}

type testStateMachine struct {
2 changes: 1 addition & 1 deletion transporter.go
Original file line number Diff line number Diff line change
@@ -12,5 +12,5 @@ type Transporter interface {
SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
SendSnapshotRecoveryRequest(server Server, peer *Peer, header *SnapshotRecoveryRequestHeader, body StateMachineIo) *SnapshotRecoveryResponse
}