Skip to content
This repository has been archived by the owner on Sep 6, 2018. It is now read-only.

Streaming snapshot sends #223

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 37 additions & 42 deletions append_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package raft

import (
"io"
"io/ioutil"

"code.google.com/p/gogoprotobuf/proto"
"github.com/goraft/raft/protobuf"
Expand All @@ -18,13 +17,6 @@ type AppendEntriesRequest struct {
Entries []*protobuf.LogEntry
}

// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
pb *protobuf.AppendEntriesResponse
peer string
append bool
}

// Creates a new AppendEntries request.
func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint64,
commitIndex uint64, leaderName string, entries []*LogEntry) *AppendEntriesRequest {
Expand All @@ -44,9 +36,7 @@ func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint6
}
}

// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
func (req *AppendEntriesRequest) Marshal() ([]byte, error) {
pb := &protobuf.AppendEntriesRequest{
Term: proto.Uint64(req.Term),
PrevLogIndex: proto.Uint64(req.PrevLogIndex),
Expand All @@ -56,26 +46,20 @@ func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
Entries: req.Entries,
}

p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}

return w.Write(p)
return proto.Marshal(pb)
}

// Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error) {
return encode(req, w)
}

if err != nil {
return -1, err
}
func (req *AppendEntriesRequest) Unmarshal(data []byte) error {

pb := new(protobuf.AppendEntriesRequest)
if err := proto.Unmarshal(data, pb); err != nil {
return -1, err
return err
}

req.Term = pb.GetTerm()
Expand All @@ -85,7 +69,20 @@ func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
req.LeaderName = pb.GetLeaderName()
req.Entries = pb.GetEntries()

return len(data), nil
return nil
}

// Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error) {
return decode(req, r)
}

// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
pb *protobuf.AppendEntriesResponse
peer string
append bool
}

// Creates a new AppendEntries response.
Expand Down Expand Up @@ -118,29 +115,27 @@ func (aer *AppendEntriesResponse) Success() bool {
return aer.pb.GetSuccess()
}

func (resp *AppendEntriesResponse) Marshal() ([]byte, error) {
p, err := proto.Marshal(resp.pb)
return p, err
}

// Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error) {
b, err := proto.Marshal(resp.pb)
if err != nil {
return -1, err
}
return encode(resp, w)
}

return w.Write(b)
func (resp *AppendEntriesResponse) Unmarshal(b []byte) error {
if resp.pb == nil {
resp.pb = &protobuf.AppendEntriesResponse{}
}
err := resp.pb.Unmarshal(b)
return err
}

// Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return -1, err
}

resp.pb = new(protobuf.AppendEntriesResponse)
if err := proto.Unmarshal(data, resp.pb); err != nil {
return -1, err
}

return len(data), nil
return decode(resp, r)
}
42 changes: 31 additions & 11 deletions http_transporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,23 +202,40 @@ func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *Sn
}

// Sends a SnapshotRequest RPC to a peer.
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
var b bytes.Buffer
if _, err := req.Encode(&b); err != nil {
traceln("transporter.rv.encoding.error:", err)
return nil
}
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequestHeader, body StateMachineIo) *SnapshotRecoveryResponse {

url := joinPath(peer.ConnectionString, t.snapshotRecoveryPath)
traceln(server.Name(), "POST", url)

httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
// spin off writer function
pRead, pWrite := io.Pipe()
writeErr := make(chan error, 1)
go func() {
defer pWrite.Close()
if _, err := req.Encode(pWrite); err != nil {
writeErr <- fmt.Errorf("transporter.rv.encoding.error:%s", err)
return
}
if _, err := body.WriteSnapshot(pWrite); err != nil {
writeErr <- fmt.Errorf("err opening StateMachine snapshot:%s", err)
return
}
writeErr <- nil
return
}()

// invoke http request and get our response
httpResp, err := t.httpClient.Post(url, "application/protobuf", pRead)
if httpResp == nil || err != nil {
traceln("transporter.rv.response.error:", err)
return nil
}
defer httpResp.Body.Close()

err = <-writeErr
if err != nil {
traceln("transporter.rv.write.err:", err)
return nil
}
resp := &SnapshotRecoveryResponse{}
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
traceln("transporter.rv.decoding.error:", err)
Expand All @@ -242,7 +259,7 @@ func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
http.Error(w, "", http.StatusBadRequest)
return
}

r.Body.Close()
resp := server.AppendEntries(req)
if resp == nil {
http.Error(w, "Failed creating response.", http.StatusInternalServerError)
Expand All @@ -265,6 +282,7 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
http.Error(w, "", http.StatusBadRequest)
return
}
r.Body.Close()

resp := server.RequestVote(req)
if resp == nil {
Expand All @@ -288,6 +306,7 @@ func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc {
http.Error(w, "", http.StatusBadRequest)
return
}
r.Body.Close()

resp := server.RequestSnapshot(req)
if resp == nil {
Expand All @@ -306,13 +325,14 @@ func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFun
return func(w http.ResponseWriter, r *http.Request) {
traceln(server.Name(), "RECV /snapshotRecovery")

req := &SnapshotRecoveryRequest{}
req := &SnapshotRecoveryRequestHeader{}
if _, err := req.Decode(r.Body); err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
r.Body.Close()

resp := server.SnapshotRecoveryRequest(req)
resp := server.SnapshotRecoveryRequest(req, r.Body)
if resp == nil {
http.Error(w, "Failed creating response.", http.StatusInternalServerError)
return
Expand Down
4 changes: 2 additions & 2 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {

// Sends an Snapshot Recovery request to the peer through the transport.
func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.snapshot)
req := newSnapshotRecoveryRequestHeader(p.server.name, p.server.snapshot)
debugln("peer.snap.recovery.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req, p.server.StateMachine())

if resp == nil {
debugln("peer.snap.recovery.timeout: ", p.Name)
Expand Down
77 changes: 32 additions & 45 deletions request_vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package raft

import (
"io"
"io/ioutil"

"code.google.com/p/gogoprotobuf/proto"
"github.com/goraft/raft/protobuf"
Expand Down Expand Up @@ -34,45 +33,38 @@ func newRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint6
}
}

// Encodes the RequestVoteRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *RequestVoteRequest) Encode(w io.Writer) (int, error) {
func (req *RequestVoteRequest) Marshal() ([]byte, error) {
pb := &protobuf.RequestVoteRequest{
Term: proto.Uint64(req.Term),
LastLogIndex: proto.Uint64(req.LastLogIndex),
LastLogTerm: proto.Uint64(req.LastLogTerm),
CandidateName: proto.String(req.CandidateName),
}
p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}

return w.Write(p)
return proto.Marshal(pb)
}

// Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *RequestVoteRequest) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)

if err != nil {
return -1, err
}

totalBytes := len(data)
// Encodes the RequestVoteRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *RequestVoteRequest) Encode(w io.Writer) (int, error) {
return encode(req, w)
}

func (req *RequestVoteRequest) Unmarshal(data []byte) error {
pb := &protobuf.RequestVoteRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
if err := proto.Unmarshal(data, pb); err != nil {
return err
}

req.Term = pb.GetTerm()
req.LastLogIndex = pb.GetLastLogIndex()
req.LastLogTerm = pb.GetLastLogTerm()
req.CandidateName = pb.GetCandidateName()
return nil
}

return totalBytes, nil
// Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and
// any error that occurs.
func (req *RequestVoteRequest) Decode(r io.Reader) (int, error) {
return decode(req, r)
}

// Creates a new RequestVote response.
Expand All @@ -83,40 +75,35 @@ func newRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse
}
}

// Encodes the RequestVoteResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *RequestVoteResponse) Encode(w io.Writer) (int, error) {
func (resp *RequestVoteResponse) Marshal() ([]byte, error) {
pb := &protobuf.RequestVoteResponse{
Term: proto.Uint64(resp.Term),
VoteGranted: proto.Bool(resp.VoteGranted),
}

p, err := proto.Marshal(pb)
if err != nil {
return -1, err
}

return w.Write(p)
return proto.Marshal(pb)
}

// Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *RequestVoteResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)

if err != nil {
return 0, err
}
// Encodes the RequestVoteResponse to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (resp *RequestVoteResponse) Encode(w io.Writer) (int, error) {
return encode(resp, w)
}

totalBytes := len(data)
func (resp *RequestVoteResponse) Unmarshal(data []byte) error {

pb := &protobuf.RequestVoteResponse{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
if err := proto.Unmarshal(data, pb); err != nil {
return err
}

resp.Term = pb.GetTerm()
resp.VoteGranted = pb.GetVoteGranted()
return nil
}

return totalBytes, nil
// Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and
// any error that occurs.
func (resp *RequestVoteResponse) Decode(r io.Reader) (int, error) {
return decode(resp, r)
}
Loading