Skip to content

Commit

Permalink
Merge pull request #22 from peng225/broadcast-before-get-reponse
Browse files Browse the repository at this point in the history
broadcast before get reponse
  • Loading branch information
peng225 authored Jun 27, 2024
2 parents 492478d + 5e1a65e commit 83e9c50
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
16 changes: 8 additions & 8 deletions internal/agent/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,15 @@ func sendLog(ctx context.Context, destID int32, entryCount int64) error {
return nil
}

func broadcastHeartBeat() {
func BroadcastHeartBeat() error {
errCh := broadcastToLogSenderDaemons(pstore.LogSize())
for i := 0; i < len(grpcServers)/2; i++ {
err := <-errCh
if err != nil {
if errors.Is(err, DemotedToFollower) {
break
}
slog.Error("Unexpected heartbeat error.",
slog.String("err", err.Error()))
os.Exit(1)
return err
}
}
return nil
}

func heartBeatDaemon() {
Expand All @@ -249,6 +245,10 @@ func heartBeatDaemon() {
if vstate.role != Leader {
break
}
broadcastHeartBeat()
err := BroadcastHeartBeat()
if err != nil {
slog.Error("Heartbeat failed.",
slog.String("err", err.Error()))
}
}
}
30 changes: 30 additions & 0 deletions internal/web/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,48 @@ func LockHandler(w http.ResponseWriter, r *http.Request) {
for len(lhIDByte) != written {
n, err := w.Write(lhIDByte[written:])
if err != nil {
slog.Error("Write failed.",
slog.String("err", err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
written += n
}
err := agent.BroadcastHeartBeat()
if err != nil {
slog.Error("Heartbeat failed.",
slog.String("err", err.Error()))
w.Header().Add("Retry-After", "1")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
case http.MethodPut:
body, err := io.ReadAll(r.Body)
if err != nil {
slog.Error("Failed to read body.",
slog.String("err", err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
defer r.Body.Close()

lockRequestedID, err := strconv.ParseInt(string(body), 10, 32)
if err != nil {
slog.Error("Failed to parse body.",
slog.String("body", string(body)),
slog.String("err", err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
if lockRequestedID == int64(lockHandlerID) {
// Already has a lock.
slog.Info("Lock already acquired.",
slog.Int64("lockRequestedID", lockRequestedID))
break
}
if 0 <= lockHandlerID {
slog.Debug("Lock conflict.",
slog.Int("lockHandlerID", int(lockHandlerID)))
w.WriteHeader(http.StatusConflict)
return
}
Expand All @@ -92,10 +112,14 @@ func LockHandler(w http.ResponseWriter, r *http.Request) {
err = agent.AppendLog(&logEntry)
if err != nil && errors.Is(err, agent.DemotedToFollower) {
lid := agent.LeaderID()
slog.Debug("I am not a leader.",
slog.Int("leaderID", int(lid)))
http.Redirect(w, r, webServers[lid]+"/lock", http.StatusTemporaryRedirect)
return
}
default:
slog.Error("Method not allowed.",
slog.String("method", r.Method))
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
Expand All @@ -122,6 +146,8 @@ func UnlockHandler(w http.ResponseWriter, r *http.Request) {
}

if r.Method != http.MethodPut {
slog.Error("Method not allowed.",
slog.String("method", r.Method))
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
Expand All @@ -140,6 +166,8 @@ func UnlockHandler(w http.ResponseWriter, r *http.Request) {

body, err := io.ReadAll(r.Body)
if err != nil {
slog.Error("Failed to read body.",
slog.String("err", err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -176,6 +204,8 @@ func UnlockHandler(w http.ResponseWriter, r *http.Request) {
err = agent.AppendLog(&logEntry)
if err != nil && errors.Is(err, agent.DemotedToFollower) {
lid := agent.LeaderID()
slog.Debug("I am not a leader.",
slog.Int("leaderID", int(lid)))
http.Redirect(w, r, webServers[lid]+"/unlock", http.StatusTemporaryRedirect)
return
}
Expand Down

0 comments on commit 83e9c50

Please sign in to comment.