Skip to content

Commit

Permalink
Merge pull request #16 from peng225/all-agent-failure-test
Browse files Browse the repository at this point in the history
all agent failure test
  • Loading branch information
peng225 authored Jun 22, 2024
2 parents 87b9190 + c3790b9 commit 9647394
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 138 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ proto:

.PHONY: run
run: $(STARFISH)
$(STARFISH) -id 0 -config config.yaml &
$(STARFISH) -id 1 -config config.yaml &
$(STARFISH) -id 2 -config config.yaml &
$(STARFISH) -id 0 -config config.yaml -pstore-dir /tmp &
$(STARFISH) -id 1 -config config.yaml -pstore-dir /tmp &
$(STARFISH) -id 2 -config config.yaml -pstore-dir /tmp &

.PHONY: test
test: $(STARFISH)
Expand Down
8 changes: 8 additions & 0 deletions internal/web/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ func LockHandler(w http.ResponseWriter, r *http.Request) {
if !agent.IsLeader() {
lid := agent.LeaderID()
if lid == agent.InvalidAgentID {
slog.Warn("Currently, there is no leader.")
w.Header().Add("Retry-After", "1")
w.WriteHeader(http.StatusServiceUnavailable)
} else {
slog.Warn("I am not a leader.",
slog.Int("leaderID", int(lid)))
http.Redirect(w, r, webEndpoints[lid]+"/lock", http.StatusTemporaryRedirect)
}
return
Expand All @@ -41,6 +44,7 @@ func LockHandler(w http.ResponseWriter, r *http.Request) {
i := 0
for agent.PendingApplyLogExist() {
if i == 3 {
slog.Warn("Pending requests exist.")
w.Header().Add("Retry-After", "1")
w.WriteHeader(http.StatusServiceUnavailable)
return
Expand Down Expand Up @@ -106,9 +110,12 @@ func UnlockHandler(w http.ResponseWriter, r *http.Request) {
if !agent.IsLeader() {
lid := agent.LeaderID()
if lid == agent.InvalidAgentID {
slog.Warn("Currently, there is no leader.")
w.Header().Add("Retry-After", "1")
w.WriteHeader(http.StatusServiceUnavailable)
} else {
slog.Warn("I am not a leader.",
slog.Int("leaderID", int(lid)))
http.Redirect(w, r, webEndpoints[lid]+"/unlock", http.StatusTemporaryRedirect)
}
return
Expand All @@ -122,6 +129,7 @@ func UnlockHandler(w http.ResponseWriter, r *http.Request) {
i := 0
for agent.PendingApplyLogExist() {
if i == 3 {
slog.Warn("Pending requests exist.")
w.Header().Add("Retry-After", "1")
w.WriteHeader(http.StatusServiceUnavailable)
return
Expand Down
43 changes: 43 additions & 0 deletions test/all_agent_failure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package test

import (
"math/rand"
"os/exec"
"sync"
"syscall"
"testing"

"github.com/stretchr/testify/require"
)

func TestAllAgentFailure(t *testing.T) {
c := readConfig(t, "../config.yaml")

lockHolder := 1
lockRequest(t, lockHolder, c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])
checkLockHolder(t, lockHolder, c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])

pids := getAgentPIDs(t)

var wg sync.WaitGroup
wg.Add(len(pids))
for _, pid := range pids {
pid := pid
go func() {
sendSignal(t, syscall.SIGTERM, pid)
wg.Done()
}()
}
wg.Wait()

cmd := exec.Command("make", "-C", "../", "run")
err := cmd.Run()
require.NoError(t, err)

for _, endpoint := range c.WebEndpoints {
t.Logf("endpoint: %s", endpoint)
checkLockHolder(t, lockHolder, endpoint)
}

unlockRequest(t, lockHolder, c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])
}
119 changes: 70 additions & 49 deletions test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/peng225/starfish/internal/agent"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)
Expand All @@ -32,74 +31,96 @@ func readConfig(t *testing.T, fileName string) *config {
return &c
}

// TODO: "Eventually" is needed for PUT requests, too.
func TestLockAndUnlock(t *testing.T) {
c := readConfig(t, "../config.yaml")

// Check the initial status.
func checkLockHolder(t *testing.T, lockHolder int, server string) {
t.Helper()
require.Eventually(t, func() bool {
resp, err := http.Get(c.WebEndpoints[rand.Intn(len(c.WebEndpoints))] + "/lock")
require.NoError(t, err)
if resp.StatusCode != http.StatusOK {
resp, err := http.Get(server + "/lock")
if err != nil {
t.Logf("Failed to get the lock holder. err: %s", err)
return false
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Logf("HTTP status code is not OK. statusCode: %d", resp.StatusCode)
return false
}
data, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, strconv.Itoa(int(agent.InvalidLockHolderID)), string(data))
if err != nil {
t.Logf("Failed to read HTTP response body. err: %s", err)
return false
}
if strconv.Itoa(lockHolder) != string(data) {
t.Logf("Unexpected lock holder. expected: %s, actual: %s",
strconv.Itoa(lockHolder), string(data))
return false
}
return true
}, 3*time.Second, 10*time.Microsecond)
}, 20*time.Second, 100*time.Millisecond)
}

// Lock and check.
lockHolder1 := strconv.Itoa(1)
req, err := http.NewRequest(http.MethodPut,
c.WebEndpoints[rand.Intn(len(c.WebEndpoints))]+"/lock",
bytes.NewBuffer([]byte(lockHolder1)))
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
func lockRequest(t *testing.T, lockHolder int, server string) {
t.Helper()
require.Eventually(t, func() bool {
resp, err := http.Get(c.WebEndpoints[rand.Intn(len(c.WebEndpoints))] + "/lock")
t.Logf("lockHolder: %d", lockHolder)
req, err := http.NewRequest(http.MethodPut,
server+"/lock",
bytes.NewBuffer([]byte(strconv.Itoa(lockHolder))))
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Logf("Lock request failed. err: %s", err)
return false
}
if resp.StatusCode != http.StatusOK {
t.Logf("HTTP status code is not OK. statusCode: %d", resp.StatusCode)
return false
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
return true
}, 20*time.Second, 500*time.Millisecond)
}

func unlockRequest(t *testing.T, lockHolder int, server string) {
t.Helper()
require.Eventually(t, func() bool {
req, err := http.NewRequest(http.MethodPut,
server+"/unlock",
bytes.NewBuffer([]byte(strconv.Itoa(lockHolder))))
require.NoError(t, err)
assert.Equal(t, lockHolder1, string(data))
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Logf("Unlock request failed. err: %s", err)
return false
}
if resp.StatusCode != http.StatusOK {
t.Logf("HTTP status code is not OK. statusCode: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 10*time.Microsecond)
}, 20*time.Second, 100*time.Millisecond)
}

func TestLockAndUnlock(t *testing.T) {
c := readConfig(t, "../config.yaml")

// Check the initial status.
checkLockHolder(t, int(agent.InvalidAgentID), c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])

// Lock and check.
lockHolder1 := 1
lockRequest(t, lockHolder1, c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])
checkLockHolder(t, lockHolder1, c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])

// Another client try to lock, but fails.
lockHolder2 := strconv.Itoa(2)
req, err = http.NewRequest(http.MethodPut,
lockHolder2 := 2
req, err := http.NewRequest(http.MethodPut,
c.WebEndpoints[rand.Intn(len(c.WebEndpoints))]+"/lock",
bytes.NewBuffer([]byte(lockHolder2)))
bytes.NewBuffer([]byte(strconv.Itoa(lockHolder2))))
require.NoError(t, err)
resp, err = http.DefaultClient.Do(req)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusConflict, resp.StatusCode)

// Unlock and check.
req, err = http.NewRequest(http.MethodPut,
c.WebEndpoints[rand.Intn(len(c.WebEndpoints))]+"/unlock",
bytes.NewBuffer([]byte(lockHolder1)))
require.NoError(t, err)
resp, err = http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
require.Eventually(t, func() bool {
resp, err := http.Get(c.WebEndpoints[rand.Intn(len(c.WebEndpoints))] + "/lock")
require.NoError(t, err)
if resp.StatusCode != http.StatusOK {
return false
}
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, strconv.Itoa(int(agent.InvalidLockHolderID)), string(data))
return true
}, 3*time.Second, 10*time.Microsecond)
unlockRequest(t, lockHolder1, c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])
checkLockHolder(t, int(agent.InvalidLockHolderID), c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])
}
62 changes: 19 additions & 43 deletions test/sigstop_test.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,26 @@
package test

import (
"bytes"
"io"
"log"
"math/rand"
"net/http"
"os/exec"
"strconv"
"strings"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func sendSignal(t *testing.T, sig syscall.Signal, pid int) {
t.Helper()
err := syscall.Kill(pid, sig)
require.NoError(t, err)
}

func TestSigStop(t *testing.T) {
c := readConfig(t, "../config.yaml")

lockHolder := strconv.Itoa(1)
require.Eventually(t, func() bool {
req, err := http.NewRequest(http.MethodPut,
c.WebEndpoints[rand.Intn(len(c.WebEndpoints))]+"/lock",
bytes.NewBuffer([]byte(lockHolder)))
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
return resp.StatusCode == http.StatusOK
}, 2*time.Second, 10*time.Microsecond)

require.Eventually(t, func() bool {
resp, err := http.Get(c.WebEndpoints[rand.Intn(len(c.WebEndpoints))] + "/lock")
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
data, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, lockHolder, string(data))
return true
}, 2*time.Second, 10*time.Microsecond)

func getAgentPIDs(t *testing.T) []int {
t.Helper()
cmd := exec.Command("pidof", "starfish")
var out strings.Builder
cmd.Stdout = &out
Expand All @@ -62,26 +36,28 @@ func TestSigStop(t *testing.T) {
require.NoError(t, err)
pids = append(pids, pid)
}
return pids
}

func TestSigStop(t *testing.T) {
c := readConfig(t, "../config.yaml")

lockHolder := 1
lockRequest(t, lockHolder, c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])
checkLockHolder(t, lockHolder, c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])

pids := getAgentPIDs(t)

for _, pid := range pids {
sendSignal(t, syscall.SIGSTOP, pid)
time.Sleep(10 * time.Second)
sendSignal(t, syscall.SIGCONT, pid)
for _, endpoint := range c.WebEndpoints {
require.Eventually(t, func() bool {
t.Logf("Stopped PID: %d", pid)
t.Logf("endpoint: %s", endpoint)
resp, err := http.Get(endpoint + "/lock")
require.NoError(t, err)
defer resp.Body.Close()
t.Logf("statusCode: %d", resp.StatusCode)
require.Equal(t, http.StatusOK, resp.StatusCode)
data, err := io.ReadAll(resp.Body)
require.NoError(t, err)
t.Logf("lockHolder: %s", string(data))
require.Equal(t, lockHolder, string(data))
return true
}, 2*time.Second, 20*time.Microsecond)
t.Logf("Stopped PID: %d", pid)
t.Logf("endpoint: %s", endpoint)
checkLockHolder(t, lockHolder, endpoint)
}
}

unlockRequest(t, lockHolder, c.WebEndpoints[rand.Intn(len(c.WebEndpoints))])
}
Loading

0 comments on commit 9647394

Please sign in to comment.