Skip to content

Commit

Permalink
add comments, rename struct
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Jul 6, 2024
1 parent 365a5be commit 24e8017
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 21 deletions.
40 changes: 26 additions & 14 deletions distmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type memT struct {
mlocs []int
}

// DistMem represents an object for distributed memory read/writes.
type DistMem struct {
sync.Mutex

Expand All @@ -65,7 +66,7 @@ type DistMem struct {
mlock *sync.Mutex // local file lock
dlock *sync.Mutex // local file lock
wmtx *sync.Mutex // one active writer only
writer *writerT // writer object
writer *writer // writer object
wrefs int64 // writer reference count
rrefs int64 // reader reference count
on int32
Expand All @@ -74,7 +75,7 @@ type DistMem struct {
start time.Time
}

type writerT struct {
type writer struct {
sync.Mutex
lo bool // local write only
dm *DistMem
Expand All @@ -84,15 +85,18 @@ type writerT struct {
done chan struct{}
}

func (w *writerT) Err() error {
// Err returns the last recorded error during the write operation.
func (w *writer) Err() error {
w.Lock()
defer w.Unlock()
return w.err
}

func (w *writerT) Write(data []byte) { w.ch <- data }
// Write writes data to the underlying storage.
func (w *writer) Write(data []byte) { w.ch <- data }

func (w *writerT) Close() {
// Close closes the writer object.
func (w *writer) Close() {
if atomic.LoadInt32(&w.on) == 0 {
return
}
Expand All @@ -104,7 +108,7 @@ func (w *writerT) Close() {
w.dm.wmtx.Unlock()
}

func (w *writerT) start() {
func (w *writer) start() {
defer func() { w.done <- struct{}{} }()
atomic.StoreInt32(&w.on, 1)
ctx := context.Background()
Expand Down Expand Up @@ -280,14 +284,16 @@ type writerOptions struct {
LocalOnly bool
}

func (dm *DistMem) Writer(opts ...*writerOptions) (*writerT, error) {
// Writer returns a writer object for writing data to DistMem. The
// caller needs to call writer.Close() after use.
func (dm *DistMem) Writer(opts ...*writerOptions) (*writer, error) {
dm.wmtx.Lock()
var localOnly bool
if len(opts) > 0 {
localOnly = opts[0].LocalOnly
}

dm.writer = &writerT{
dm.writer = &writer{
lo: localOnly,
dm: dm,
ch: make(chan []byte),
Expand All @@ -299,7 +305,7 @@ func (dm *DistMem) Writer(opts ...*writerOptions) (*writerT, error) {
return dm.writer, nil
}

type readerT struct {
type reader struct {
sync.Mutex
lo bool // local read only
dm *DistMem
Expand All @@ -308,7 +314,8 @@ type readerT struct {
done chan struct{}
}

func (r *readerT) Read(out chan []byte) {
// Read reads the underlying data and streams them to the `out` channel.
func (r *reader) Read(out chan []byte) {
eg := new(errgroup.Group)
eg.Go(func() error {
atomic.StoreInt32(&r.on, 1)
Expand Down Expand Up @@ -414,13 +421,15 @@ func (r *readerT) Read(out chan []byte) {
r.done <- struct{}{}
}

func (r *readerT) Err() error {
// Err returns the last recorded error, if any, during the read operation.
func (r *reader) Err() error {
r.Lock()
defer r.Unlock()
return r.err
}

func (r *readerT) Close() {
// Close closes the reader object.
func (r *reader) Close() {
if atomic.LoadInt32(&r.on) == 0 {
return
}
Expand All @@ -434,13 +443,15 @@ type readerOptions struct {
LocalOnly bool
}

func (dm *DistMem) Reader(opts ...*readerOptions) (*readerT, error) {
// Reader returns a reader object for reading data from DistMem.
// The caller needs to call reader.Close() after use.
func (dm *DistMem) Reader(opts ...*readerOptions) (*reader, error) {
var localOnly bool
if len(opts) > 0 {
localOnly = opts[0].LocalOnly
}

reader := &readerT{
reader := &reader{
lo: localOnly,
dm: dm,
done: make(chan struct{}, 1),
Expand All @@ -450,6 +461,7 @@ func (dm *DistMem) Reader(opts ...*readerOptions) (*readerT, error) {
return reader, nil
}

// Close closes the DistMem object.
func (dm *DistMem) Close() {
if atomic.LoadInt32(&dm.on) == 0 {
return
Expand Down
17 changes: 11 additions & 6 deletions hedge.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ func (op *Op) NewSemaphore(ctx context.Context, name string, limit int) (*Semaph
return &Semaphore{name, limit, op}, nil
}

// NewDistMem returns an object for writing data to distributed memory and
// disk across the cluster. The order of writing is local memory, local
// disk, other pod's memory, other pod's disk, etc.
func (op *Op) NewDistMem(name string, opts ...*DistMemOptions) *DistMem {
op.dmsLock.Lock()
defer op.dmsLock.Unlock()
Expand Down Expand Up @@ -637,14 +640,16 @@ type PutOptions struct {
// If true, do a direct write, no need to fwd to leader.
DirectWrite bool

// If true, don't do an append-write; overwrite the latest. Note that even if you set this
// to true, if you do another Put the next time with this field set as false (default),
// the previous write will now be gone, or will now be part of the history.
// If true, don't do an append-write; overwrite the latest. Note that even
// if you set this to true, if you do another Put the next time with this
// field set as false (default), the previous write will now be gone, or
// will now be part of the history.
NoAppend bool
}

// Put saves a key/value to Op. This call will try to block, at least roughly until spindle's
// timeout, to wait for the leader's availability to do actual writes before returning.
// Put saves a key/value to Op. This call will try to block, at least roughly
// until spindle's timeout, to wait for the leader's availability to do actual
// writes before returning.
func (op *Op) Put(ctx context.Context, kv KeyValue, po ...PutOptions) error {
if op.logTable == "" {
return ErrNotSupported
Expand Down Expand Up @@ -939,7 +944,7 @@ type StreamBroadcastOutput struct {
Outs map[string]chan *StreamMessage
}

// StreamToLeader returns input and output channels for doing streaming broadcasts. Any node can broadcast messages,
// StreamBroadcast returns input and output channels for doing streaming broadcasts. Any node can broadcast messages,
// including the leader itself. Note that this is best-effort basis only; by the time you call this API, the handler
// might not have all the active members in record yet, as is the usual situation with k8s deployments, where pods
// come and go, and our internal heartbeat protocol hasn't been completed yet. This call will also block until it
Expand Down
2 changes: 1 addition & 1 deletion service.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *service) Broadcast(hs pb.Hedge_BroadcastServer) error {
func (s *service) DMemWrite(hs pb.Hedge_DMemWriteServer) error {
var err error
ctx := hs.Context()
var writer *writerT
var writer *writer

var count int

Expand Down

0 comments on commit 24e8017

Please sign in to comment.