Skip to content

Commit

Permalink
Merge PR: workload(lastRun+persist) statistic (#2785)
Browse files Browse the repository at this point in the history
* applybolck workload statistic

* complete workload statistic

* fix bug  & change format

* remove BlockExecutor field

* fix atomic on lower go version

* remove assert

* make it more accurately if process started less than 1 hour

* make code easier to understand

* fix code style

Co-authored-by: Zhong Qiu <[email protected]>
Co-authored-by: yangzhe <[email protected]>
  • Loading branch information
3 people authored Nov 23, 2022
1 parent 64d75c3 commit 27cbd9c
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 7 deletions.
6 changes: 4 additions & 2 deletions app/elapse_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package app

import (
"fmt"
"github.com/okex/exchain/libs/system/trace"
"github.com/okex/exchain/libs/tendermint/libs/log"
"strings"
"sync"

"github.com/okex/exchain/libs/system/trace"
"github.com/okex/exchain/libs/tendermint/libs/log"

"github.com/spf13/viper"
)

Expand Down Expand Up @@ -54,6 +55,7 @@ var (
trace.RunTx,
trace.Prerun,
trace.MempoolTxsCnt,
trace.Workload,
}

DefaultElapsedSchemas string
Expand Down
3 changes: 2 additions & 1 deletion libs/system/trace/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ const (
HandlerDefer = "handler_defer"
)


const (
GasUsed = "GasUsed"
Produce = "Produce"
Expand Down Expand Up @@ -78,6 +77,8 @@ const (
IavlRuntime = "IavlRuntime"

BlockPartsP2P = "BlockPartsP2P"

Workload = "Workload"
)

const (
Expand Down
18 changes: 16 additions & 2 deletions libs/system/trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ type Tracer struct {
intervals []time.Duration
elapsedTime time.Duration

pinMap map[string]time.Duration
enableSummary bool
pinMap map[string]time.Duration
enableSummary bool

wls *WorkloadStatistic
}

func NewTracer(name string) *Tracer {
Expand All @@ -50,6 +52,10 @@ func (t *Tracer) EnableSummary() {
t.enableSummary = true
}

func (t *Tracer) SetWorkloadStatistic(wls *WorkloadStatistic) {
t.wls = wls
}

func (t *Tracer) Pin(format string, args ...interface{}) {
t.pinByFormat(fmt.Sprintf(format, args...))
}
Expand All @@ -67,13 +73,21 @@ func (t *Tracer) pinByFormat(tag string) {

now := time.Now()

if t.wls != nil {
t.wls.begin(tag, now)
}

if len(t.lastPin) > 0 {
t.pins = append(t.pins, t.lastPin)
duration := now.Sub(t.lastPinStartTime)
t.intervals = append(t.intervals, duration)
if t.enableSummary {
insertElapse(t.lastPin, duration.Milliseconds())
}

if t.wls != nil {
t.wls.end(t.lastPin, now)
}
}
t.lastPinStartTime = now
t.lastPin = tag
Expand Down
228 changes: 228 additions & 0 deletions libs/system/trace/workload_statistic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package trace

import (
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/okex/exchain/libs/tendermint/libs/log"
)

var (
startupTime = time.Now()

applyBlockWorkloadStatistic = newWorkloadStatistic(
[]time.Duration{time.Hour, 2 * time.Hour, 4 * time.Hour, 8 * time.Hour}, []string{LastRun, Persist})
)

// TODO: think about a very long work which longer than a statistic period.

// WorkloadStatistic accumulate workload for specific trace tags during some specific period.
// Everytime `Add` or `end` method be called, it record workload on corresponding `summaries` fields,
// and send this workload info to `shrinkLoop`, which will subtract this workload from `summaries`
// when the workload out of statistic period. To do that, `shrinkLoop` will record the workload and it's
// out-of-date timestamp; `shrinkLoop` also has a ticker promote current time once a second.
// If current time is larger or equal than recorded timestamp, it remove that workload and subtract
// it's value from `summaries`.
//
// NOTE: CAN NOT use `WorkloadStatistic` concurrently for those reasons:
// read/write almost all fields
// workload summary may be wrong if a work is still running(latestBegin.IsZero isn't true)
//
// calling sequence:
// 1. newWorkloadStatistic
// 2. calling begin/end before and after doing some work
// 3. calling summary to get a summary statistic
type WorkloadStatistic struct {
concernedTags map[string]struct{}
summaries []workloadSummary

latestTag string
latestBegin time.Time
logger log.Logger

workCh chan singleWorkInfo
}

type workloadSummary struct {
period time.Duration
workload int64
}

type singleWorkInfo struct {
duration int64
endTime time.Time
}

func GetApplyBlockWorkloadSttistic() *WorkloadStatistic {
return applyBlockWorkloadStatistic
}

func newWorkloadStatistic(periods []time.Duration, tags []string) *WorkloadStatistic {
concernedTags := toTagsMap(tags)

workloads := make([]workloadSummary, 0, len(periods))
for _, period := range periods {
workloads = append(workloads, workloadSummary{period, 0})
}

wls := &WorkloadStatistic{concernedTags: concernedTags, summaries: workloads, workCh: make(chan singleWorkInfo, 1000)}
go wls.shrinkLoop()

return wls
}

func (ws *WorkloadStatistic) SetLogger(logger log.Logger) {
ws.logger = logger
}

func (ws *WorkloadStatistic) Add(tag string, dur time.Duration) {
if _, ok := ws.concernedTags[tag]; !ok {
return
}

now := time.Now()
for i := range ws.summaries {
atomic.AddInt64(&ws.summaries[i].workload, int64(dur))
}

ws.workCh <- singleWorkInfo{int64(dur), now}
}

func (ws *WorkloadStatistic) Format() string {
var sumItem []string
for _, summary := range ws.summary() {
sumItem = append(sumItem, fmt.Sprintf("%.2f", float64(summary.workload)/float64(summary.period)))
}

return strings.Join(sumItem, "|")
}

func (ws *WorkloadStatistic) begin(tag string, t time.Time) {
if _, ok := ws.concernedTags[tag]; !ok {
return
}

ws.latestTag = tag
ws.latestBegin = t
}

func (ws *WorkloadStatistic) end(tag string, t time.Time) {
if _, ok := ws.concernedTags[tag]; !ok {
return
}

if ws.latestTag != tag {
ws.logger.Error("WorkloadStatistic", ": begin tag", ws.latestTag, "end tag", tag)
return
}
if ws.latestBegin.IsZero() {
ws.logger.Error("WorkloadStatistic", "begin is not called before end")
return
}

dur := t.Sub(ws.latestBegin)
for i := range ws.summaries {
atomic.AddInt64(&ws.summaries[i].workload, int64(dur))
}

ws.workCh <- singleWorkInfo{int64(dur), t}
ws.latestBegin = time.Time{}
}

type summaryInfo struct {
period time.Duration
workload time.Duration
}

func (ws *WorkloadStatistic) summary() []summaryInfo {
if !ws.latestBegin.IsZero() {
ws.logger.Error("WorkloadStatistic", ": some work is still running when calling summary")
return nil
}

startupDuration := time.Now().Sub(startupTime)
result := make([]summaryInfo, 0, len(ws.summaries))

for _, summary := range ws.summaries {
period := minDuration(startupDuration, summary.period)
result = append(result, summaryInfo{period, time.Duration(atomic.LoadInt64(&summary.workload))})
}
return result
}

func (ws *WorkloadStatistic) shrinkLoop() {
shrinkInfos := make([]map[int64]int64, 0, len(ws.summaries))
for i := 0; i < len(ws.summaries); i++ {
shrinkInfos = append(shrinkInfos, make(map[int64]int64))
}

var latest int64
ticker := time.NewTicker(time.Second)

for {
select {
case singleWork := <-ws.workCh:
// `earliest` record the expired timestamp which is minimum.
// It's just for initialize `latest`.
earliest := int64(^uint64(0) >> 1)

for sumIndex, summary := range ws.summaries {
expiredTS := singleWork.endTime.Add(summary.period).Unix()
if expiredTS < earliest {
earliest = expiredTS
}

info := shrinkInfos[sumIndex]
// TODO: it makes recoding workload larger than actual value
// if a work begin before this period and end during this period
if _, ok := info[expiredTS]; !ok {
info[expiredTS] = singleWork.duration
} else {
info[expiredTS] += singleWork.duration
}
}

if latest == 0 {
latest = earliest
}
case t := <-ticker.C:
current := t.Unix()
if latest == 0 {
latest = current
}

// try to remove workload of every expired work.
// `latest` make sure even if ticker is not accurately,
// we can also remove the expired correctly.
for index, info := range shrinkInfos {
for i := latest; i < current+1; i++ {
w, ok := info[i]
if ok {
atomic.AddInt64(&ws.summaries[index].workload, -w)
delete(info, i)
}
}
}

latest = current
}
}

}

func toTagsMap(keys []string) map[string]struct{} {
tags := make(map[string]struct{})
for _, tag := range keys {
tags[tag] = struct{}{}
}
return tags
}

func minDuration(d1 time.Duration, d2 time.Duration) time.Duration {
if d1 < d2 {
return d1
}
return d2
}
38 changes: 38 additions & 0 deletions libs/system/trace/workload_statistic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package trace

import (
"testing"
"time"
)

func TestWorkload(t *testing.T) {
abciWorkload := time.Second
lastRunWorkload := 2 * time.Minute
persistWorkload := time.Second
expectWorkload := int64((lastRunWorkload + persistWorkload).Seconds())

trc := NewTracer(ApplyBlock)
trc.EnableSummary()
trc.SetWorkloadStatistic(GetApplyBlockWorkloadSttistic())

defer func() {
GetElapsedInfo().AddInfo(RunTx, trc.Format())

time.Sleep(time.Second)
summary := GetApplyBlockWorkloadSttistic().summary()
for _, sum := range summary {
workload := int64(sum.workload.Seconds())
if workload != expectWorkload {
t.Errorf("period %d: expect workload %v but got %v\n", sum.period, expectWorkload, workload)
}
}
}()

trc.Pin(Abci)
time.Sleep(abciWorkload)
GetApplyBlockWorkloadSttistic().Add(LastRun, lastRunWorkload)

trc.Pin(Persist)
time.Sleep(persistWorkload)

}
5 changes: 3 additions & 2 deletions libs/tendermint/state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ func (blockExec *BlockExecutor) ApplyBlock(
}
trc := trace.NewTracer(trace.ApplyBlock)
trc.EnableSummary()
trc.SetWorkloadStatistic(trace.GetApplyBlockWorkloadSttistic())
dc := blockExec.deltaContext

defer func() {
trace.GetElapsedInfo().AddInfo(trace.Height, strconv.FormatInt(block.Height, 10))
trace.GetElapsedInfo().AddInfo(trace.Tx, strconv.Itoa(len(block.Data.Txs)))
trace.GetElapsedInfo().AddInfo(trace.BlockSize, strconv.Itoa(block.FastSize()))
trace.GetElapsedInfo().AddInfo(trace.RunTx, trc.Format())
trace.GetElapsedInfo().AddInfo(trace.Workload, trace.GetApplyBlockWorkloadSttistic().Format())
trace.GetElapsedInfo().SetElapsedTime(trc.GetElapsedTime())

now := time.Now().UnixNano()
Expand All @@ -238,14 +240,14 @@ func (blockExec *BlockExecutor) ApplyBlock(
abciResponses, duration, err := blockExec.runAbci(block, deltaInfo)

trace.GetElapsedInfo().AddInfo(trace.LastRun, fmt.Sprintf("%dms", duration.Milliseconds()))
trace.GetApplyBlockWorkloadSttistic().Add(trace.LastRun, duration)

if err != nil {
return state, 0, ErrProxyAppConn(err)
}

fail.Fail() // XXX


// Save the results before we commit.
blockExec.trySaveABCIResponsesAsync(block.Height, abciResponses)

Expand Down Expand Up @@ -291,7 +293,6 @@ func (blockExec *BlockExecutor) ApplyBlock(

fail.Fail() // XXX


// Update the app hash and save the state.
state.AppHash = commitResp.Data
blockExec.trySaveStateAsync(state)
Expand Down

0 comments on commit 27cbd9c

Please sign in to comment.