Skip to content

Commit

Permalink
sos: rename DistMem to SoS, or Spillover-Store
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Jul 15, 2024
1 parent 4472aeb commit d2b2e7d
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 257 deletions.
32 changes: 16 additions & 16 deletions cmd/demo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func main() {
w.Write([]byte("OK"))
})

mux.HandleFunc("/distmem", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/sos", func(w http.ResponseWriter, r *http.Request) {
defer func(start time.Time) {
slog.Info("distmem:", "duration", time.Since(start))
}(time.Now())
Expand All @@ -309,14 +309,14 @@ func main() {
limit := 14_000 // 4 pods, all
// limit := 2_500

dm := func() *hedge.DistMem {
dm := op.NewDistMem(name, &hedge.DistMemOptions{
sos := func() *hedge.SoS {
sos := op.NewSoS(name, &hedge.SoSOptions{
MemLimit: 150_000,
DiskLimit: 120_000,
Expiration: 30,
})

writer, err := dm.Writer()
writer, err := sos.Writer()
if err != nil {
slog.Error("Writer failed:", "err", err)
return nil
Expand All @@ -331,16 +331,16 @@ func main() {
}

slog.Info("write_dm:", "i", limit, "n", n, "write_err", writer.Err())
return dm
return sos
}()

if dm == nil {
slog.Error("failed in creating DistMem object")
if sos == nil {
slog.Error("failed in creating SoS object")
return
}

func() {
reader, err := dm.Reader()
reader, err := sos.Reader()
if err != nil {
slog.Error(err.Error())
return
Expand Down Expand Up @@ -384,11 +384,11 @@ func main() {
slog.Info("read_dm:", "read_err", reader.Err())
}()

dm.Close()
sos.Close()
w.Write([]byte("OK"))
})

mux.HandleFunc("/dmlocal", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/soslocal", func(w http.ResponseWriter, r *http.Request) {
defer func(start time.Time) {
slog.Info("distmem:", "duration", time.Since(start))
}(time.Now())
Expand Down Expand Up @@ -419,13 +419,13 @@ func main() {

slog.Info("start distmem:", "name", name)

dm := func() *hedge.DistMem {
dm := op.NewDistMem(name, &hedge.DistMemOptions{
sos := func() *hedge.SoS {
sos := op.NewSoS(name, &hedge.SoSOptions{
MemLimit: 100_000,
Expiration: 30,
})

writer, err := dm.Writer()
writer, err := sos.Writer()
if err != nil {
slog.Error("Writer failed:", "err", err)
return nil
Expand Down Expand Up @@ -467,11 +467,11 @@ func main() {
"err", writer.Err(),
)

return dm
return sos
}()

func() {
reader, _ := dm.Reader()
reader, _ := sos.Reader()
out := make(chan []byte)
eg := new(errgroup.Group)
eg.Go(func() error {
Expand Down Expand Up @@ -503,7 +503,7 @@ func main() {
slog.Info("read_dm:", "read_err", reader.Err())
}()

dm.Close()
sos.Close()
w.Write([]byte("OK"))
})

Expand Down
29 changes: 15 additions & 14 deletions hedge.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ type Op struct {
broadcastStreamIn chan *StreamMessage
broadcastStreamOut chan *StreamMessage

dmsLock *sync.Mutex
dms map[string]*DistMem // distributed memory
sosLock *sync.Mutex
soss map[string]*SoS // distributed memory

*spindle.Lock // handles our distributed lock
members map[string]struct{} // key=id
Expand Down Expand Up @@ -551,18 +551,19 @@ func (op *Op) NewSemaphore(ctx context.Context, name string, limit int) (*Semaph
return &Semaphore{name, limit, op}, nil
}

// NewDistMem returns an object for writing data to distributed memory and
// disk across the cluster. The order of writing is local memory, local
// disk, other pod's memory, other pod's disk, etc.
func (op *Op) NewDistMem(name string, opts ...*DistMemOptions) *DistMem {
op.dmsLock.Lock()
defer op.dmsLock.Unlock()
if _, ok := op.dms[name]; ok {
return op.dms[name]
// NewSoS returns an object for writing data to spill-over
// storage across the cluster. The order of writing is local
// memory, local disk, other pod's memory, other pod's disk,
// and so on.
func (op *Op) NewSoS(name string, opts ...*SoSOptions) *SoS {
op.sosLock.Lock()
defer op.sosLock.Unlock()
if _, ok := op.soss[name]; ok {
return op.soss[name]
}

op.dms[name] = newDistMem(name, op, opts...)
return op.dms[name]
op.soss[name] = newSoS(name, op, opts...)
return op.soss[name]
}

// Get reads a key (or keys) from Op.
Expand Down Expand Up @@ -1321,8 +1322,8 @@ func New(client *spanner.Client, hostPort, lockTable, lockName, logTable string,
members: make(map[string]struct{}),
ensureCh: make(chan string),
ensureDone: make(chan struct{}, 1),
dmsLock: &sync.Mutex{},
dms: map[string]*DistMem{},
sosLock: &sync.Mutex{},
soss: map[string]*SoS{},
Lock: &spindle.Lock{}, // init later
}

Expand Down
46 changes: 23 additions & 23 deletions proto/v1/default.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions proto/v1/default.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ option go_package = "github.com/flowerinthenight/hedge/proto/v1";
service Hedge {
rpc Send(stream Payload) returns (stream Payload) {}
rpc Broadcast(stream Payload) returns (stream Payload) {}
rpc DMemWrite(stream Payload) returns (stream Payload) {}
rpc DMemRead(stream Payload) returns (stream Payload) {}
rpc DMemClose(Payload) returns (Payload) {}
rpc SoSWrite(stream Payload) returns (stream Payload) {}
rpc SoSRead(stream Payload) returns (stream Payload) {}
rpc SoSClose(Payload) returns (Payload) {}
}

message Payload {
Expand Down
Loading

0 comments on commit d2b2e7d

Please sign in to comment.