Skip to content

Commit

Permalink
sos: add ref count on create
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Jul 23, 2024
1 parent 0cfc4de commit 7d199c4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
12 changes: 8 additions & 4 deletions cmd/demo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func main() {
sos := op.NewSoS(name, &hedge.SoSOptions{
MemLimit: 150_000,
DiskLimit: 120_000,
Expiration: 30,
Expiration: 5,
})

writer, err := sos.Writer()
Expand Down Expand Up @@ -448,9 +448,11 @@ func main() {
Usage float64 `json:"usage"`
}

// See $HOME/tmp/
locs, _ := os.ReadFile("readlocs")
ss := strings.Split(string(locs), " ")

// See $HOME/tmp/
ra, err := mmap.Open("readdata")
if err != nil {
slog.Error(err.Error())
Expand All @@ -469,8 +471,9 @@ func main() {

sos := func() *hedge.SoS {
sos := op.NewSoS(name, &hedge.SoSOptions{
MemLimit: 100_000,
Expiration: 30,
MemLimit: 10_000_000,
DiskLimit: 10_000_000,
Expiration: 5,
})

writer, err := sos.Writer()
Expand Down Expand Up @@ -511,6 +514,7 @@ func main() {

writer.Close()
slog.Info("total_write:",
"count", i,
"val", wt,
"err", writer.Err(),
)
Expand Down Expand Up @@ -541,7 +545,7 @@ func main() {
rt += len(d)
}

slog.Info("total_read:", "val", rt)
slog.Info("total_read:", "count", i, "val", rt)
return nil
})

Expand Down
13 changes: 10 additions & 3 deletions sos.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type SoS struct {
dlock *sync.Mutex // local file lock
wmtx *sync.Mutex // one active writer only
writer *Writer // writer object
refs atomic.Int64 // self reference count
wrefs atomic.Int64 // writer reference count
rrefs atomic.Int64 // reader reference count
on atomic.Int32
Expand Down Expand Up @@ -288,6 +289,8 @@ func (w *Writer) start() {
if w.sos.data[n].bb != nil {
w.sos.data[n].bufs = w.sos.data[n].bb.NewBinaryArray()
w.sos.data[n].bb.Release()
w.sos.data[n].bb = nil
// slog.Info("arrow: release(bb):", "node", n)
}
}

Expand Down Expand Up @@ -519,6 +522,7 @@ func (sos *SoS) Close() {
}
}

sos.refs.Add(-1)
sos.on.Store(0)
}

Expand Down Expand Up @@ -558,10 +562,11 @@ func (sos *SoS) cleaner() {
eg.Go(func() error {
started := sos.start
for {
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 1)
refs := sos.refs.Load()
wrefs := sos.wrefs.Load()
rrefs := sos.rrefs.Load()
if (wrefs + rrefs) > 0 {
if (refs + wrefs + rrefs) > 0 {
started = time.Now()
continue
}
Expand All @@ -574,7 +579,8 @@ func (sos *SoS) cleaner() {
for _, node := range sos.op.soss[sos.Name].nodes {
if sos.data[node].bufs != nil {
sos.data[node].bufs.Release()
// slog.Info("arrow: release:", "node", node)
sos.data[node].bufs = nil
// slog.Info("arrow: release(buf):", "node", node)
}
}
}()
Expand Down Expand Up @@ -636,6 +642,7 @@ func newSoS(name string, op *Op, opts ...*SoSOptions) *SoS {
sos.age = time.Hour * 1
}

sos.refs.Add(1)
sos.start = time.Now()
go sos.cleaner()
return sos
Expand Down

0 comments on commit 7d199c4

Please sign in to comment.