diff --git a/app/elapse_info.go b/app/elapse_info.go index 8febf5eec3..e3758ee722 100644 --- a/app/elapse_info.go +++ b/app/elapse_info.go @@ -2,11 +2,12 @@ package app import ( "fmt" - "github.com/okex/exchain/libs/system/trace" - "github.com/okex/exchain/libs/tendermint/libs/log" "strings" "sync" + "github.com/okex/exchain/libs/system/trace" + "github.com/okex/exchain/libs/tendermint/libs/log" + "github.com/spf13/viper" ) @@ -54,6 +55,7 @@ var ( trace.RunTx, trace.Prerun, trace.MempoolTxsCnt, + trace.Workload, } DefaultElapsedSchemas string diff --git a/libs/system/trace/schema.go b/libs/system/trace/schema.go index add536aadb..345cabca22 100644 --- a/libs/system/trace/schema.go +++ b/libs/system/trace/schema.go @@ -30,7 +30,6 @@ const ( HandlerDefer = "handler_defer" ) - const ( GasUsed = "GasUsed" Produce = "Produce" @@ -78,6 +77,8 @@ const ( IavlRuntime = "IavlRuntime" BlockPartsP2P = "BlockPartsP2P" + + Workload = "Workload" ) const ( diff --git a/libs/system/trace/trace.go b/libs/system/trace/trace.go index 060cc2ccaf..c87e682212 100644 --- a/libs/system/trace/trace.go +++ b/libs/system/trace/trace.go @@ -33,8 +33,10 @@ type Tracer struct { intervals []time.Duration elapsedTime time.Duration - pinMap map[string]time.Duration - enableSummary bool + pinMap map[string]time.Duration + enableSummary bool + + wls *WorkloadStatistic } func NewTracer(name string) *Tracer { @@ -50,6 +52,10 @@ func (t *Tracer) EnableSummary() { t.enableSummary = true } +func (t *Tracer) SetWorkloadStatistic(wls *WorkloadStatistic) { + t.wls = wls +} + func (t *Tracer) Pin(format string, args ...interface{}) { t.pinByFormat(fmt.Sprintf(format, args...)) } @@ -67,6 +73,10 @@ func (t *Tracer) pinByFormat(tag string) { now := time.Now() + if t.wls != nil { + t.wls.begin(tag, now) + } + if len(t.lastPin) > 0 { t.pins = append(t.pins, t.lastPin) duration := now.Sub(t.lastPinStartTime) @@ -74,6 +84,10 @@ func (t *Tracer) pinByFormat(tag string) { if t.enableSummary { insertElapse(t.lastPin, duration.Milliseconds()) } + + if t.wls != nil { + t.wls.end(t.lastPin, now) + } } t.lastPinStartTime = now t.lastPin = tag diff --git a/libs/system/trace/workload_statistic.go b/libs/system/trace/workload_statistic.go new file mode 100644 index 0000000000..29f8bf3da1 --- /dev/null +++ b/libs/system/trace/workload_statistic.go @@ -0,0 +1,228 @@ +package trace + +import ( + "fmt" + "strings" + "sync/atomic" + "time" + + "github.com/okex/exchain/libs/tendermint/libs/log" +) + +var ( + startupTime = time.Now() + + applyBlockWorkloadStatistic = newWorkloadStatistic( + []time.Duration{time.Hour, 2 * time.Hour, 4 * time.Hour, 8 * time.Hour}, []string{LastRun, Persist}) +) + +// TODO: think about a very long work which longer than a statistic period. + +// WorkloadStatistic accumulate workload for specific trace tags during some specific period. +// Everytime `Add` or `end` method be called, it record workload on corresponding `summaries` fields, +// and send this workload info to `shrinkLoop`, which will subtract this workload from `summaries` +// when the workload out of statistic period. To do that, `shrinkLoop` will record the workload and it's +// out-of-date timestamp; `shrinkLoop` also has a ticker promote current time once a second. +// If current time is larger or equal than recorded timestamp, it remove that workload and subtract +// it's value from `summaries`. +// +// NOTE: CAN NOT use `WorkloadStatistic` concurrently for those reasons: +// read/write almost all fields +// workload summary may be wrong if a work is still running(latestBegin.IsZero isn't true) +// +// calling sequence: +// 1. newWorkloadStatistic +// 2. calling begin/end before and after doing some work +// 3. calling summary to get a summary statistic +type WorkloadStatistic struct { + concernedTags map[string]struct{} + summaries []workloadSummary + + latestTag string + latestBegin time.Time + logger log.Logger + + workCh chan singleWorkInfo +} + +type workloadSummary struct { + period time.Duration + workload int64 +} + +type singleWorkInfo struct { + duration int64 + endTime time.Time +} + +func GetApplyBlockWorkloadSttistic() *WorkloadStatistic { + return applyBlockWorkloadStatistic +} + +func newWorkloadStatistic(periods []time.Duration, tags []string) *WorkloadStatistic { + concernedTags := toTagsMap(tags) + + workloads := make([]workloadSummary, 0, len(periods)) + for _, period := range periods { + workloads = append(workloads, workloadSummary{period, 0}) + } + + wls := &WorkloadStatistic{concernedTags: concernedTags, summaries: workloads, workCh: make(chan singleWorkInfo, 1000)} + go wls.shrinkLoop() + + return wls +} + +func (ws *WorkloadStatistic) SetLogger(logger log.Logger) { + ws.logger = logger +} + +func (ws *WorkloadStatistic) Add(tag string, dur time.Duration) { + if _, ok := ws.concernedTags[tag]; !ok { + return + } + + now := time.Now() + for i := range ws.summaries { + atomic.AddInt64(&ws.summaries[i].workload, int64(dur)) + } + + ws.workCh <- singleWorkInfo{int64(dur), now} +} + +func (ws *WorkloadStatistic) Format() string { + var sumItem []string + for _, summary := range ws.summary() { + sumItem = append(sumItem, fmt.Sprintf("%.2f", float64(summary.workload)/float64(summary.period))) + } + + return strings.Join(sumItem, "|") +} + +func (ws *WorkloadStatistic) begin(tag string, t time.Time) { + if _, ok := ws.concernedTags[tag]; !ok { + return + } + + ws.latestTag = tag + ws.latestBegin = t +} + +func (ws *WorkloadStatistic) end(tag string, t time.Time) { + if _, ok := ws.concernedTags[tag]; !ok { + return + } + + if ws.latestTag != tag { + ws.logger.Error("WorkloadStatistic", ": begin tag", ws.latestTag, "end tag", tag) + return + } + if ws.latestBegin.IsZero() { + ws.logger.Error("WorkloadStatistic", "begin is not called before end") + return + } + + dur := t.Sub(ws.latestBegin) + for i := range ws.summaries { + atomic.AddInt64(&ws.summaries[i].workload, int64(dur)) + } + + ws.workCh <- singleWorkInfo{int64(dur), t} + ws.latestBegin = time.Time{} +} + +type summaryInfo struct { + period time.Duration + workload time.Duration +} + +func (ws *WorkloadStatistic) summary() []summaryInfo { + if !ws.latestBegin.IsZero() { + ws.logger.Error("WorkloadStatistic", ": some work is still running when calling summary") + return nil + } + + startupDuration := time.Now().Sub(startupTime) + result := make([]summaryInfo, 0, len(ws.summaries)) + + for _, summary := range ws.summaries { + period := minDuration(startupDuration, summary.period) + result = append(result, summaryInfo{period, time.Duration(atomic.LoadInt64(&summary.workload))}) + } + return result +} + +func (ws *WorkloadStatistic) shrinkLoop() { + shrinkInfos := make([]map[int64]int64, 0, len(ws.summaries)) + for i := 0; i < len(ws.summaries); i++ { + shrinkInfos = append(shrinkInfos, make(map[int64]int64)) + } + + var latest int64 + ticker := time.NewTicker(time.Second) + + for { + select { + case singleWork := <-ws.workCh: + // `earliest` record the expired timestamp which is minimum. + // It's just for initialize `latest`. + earliest := int64(^uint64(0) >> 1) + + for sumIndex, summary := range ws.summaries { + expiredTS := singleWork.endTime.Add(summary.period).Unix() + if expiredTS < earliest { + earliest = expiredTS + } + + info := shrinkInfos[sumIndex] + // TODO: it makes recoding workload larger than actual value + // if a work begin before this period and end during this period + if _, ok := info[expiredTS]; !ok { + info[expiredTS] = singleWork.duration + } else { + info[expiredTS] += singleWork.duration + } + } + + if latest == 0 { + latest = earliest + } + case t := <-ticker.C: + current := t.Unix() + if latest == 0 { + latest = current + } + + // try to remove workload of every expired work. + // `latest` make sure even if ticker is not accurately, + // we can also remove the expired correctly. + for index, info := range shrinkInfos { + for i := latest; i < current+1; i++ { + w, ok := info[i] + if ok { + atomic.AddInt64(&ws.summaries[index].workload, -w) + delete(info, i) + } + } + } + + latest = current + } + } + +} + +func toTagsMap(keys []string) map[string]struct{} { + tags := make(map[string]struct{}) + for _, tag := range keys { + tags[tag] = struct{}{} + } + return tags +} + +func minDuration(d1 time.Duration, d2 time.Duration) time.Duration { + if d1 < d2 { + return d1 + } + return d2 +} diff --git a/libs/system/trace/workload_statistic_test.go b/libs/system/trace/workload_statistic_test.go new file mode 100644 index 0000000000..1facf49269 --- /dev/null +++ b/libs/system/trace/workload_statistic_test.go @@ -0,0 +1,38 @@ +package trace + +import ( + "testing" + "time" +) + +func TestWorkload(t *testing.T) { + abciWorkload := time.Second + lastRunWorkload := 2 * time.Minute + persistWorkload := time.Second + expectWorkload := int64((lastRunWorkload + persistWorkload).Seconds()) + + trc := NewTracer(ApplyBlock) + trc.EnableSummary() + trc.SetWorkloadStatistic(GetApplyBlockWorkloadSttistic()) + + defer func() { + GetElapsedInfo().AddInfo(RunTx, trc.Format()) + + time.Sleep(time.Second) + summary := GetApplyBlockWorkloadSttistic().summary() + for _, sum := range summary { + workload := int64(sum.workload.Seconds()) + if workload != expectWorkload { + t.Errorf("period %d: expect workload %v but got %v\n", sum.period, expectWorkload, workload) + } + } + }() + + trc.Pin(Abci) + time.Sleep(abciWorkload) + GetApplyBlockWorkloadSttistic().Add(LastRun, lastRunWorkload) + + trc.Pin(Persist) + time.Sleep(persistWorkload) + +} diff --git a/libs/tendermint/state/execution.go b/libs/tendermint/state/execution.go index 2bcf2444e3..41fb22764c 100644 --- a/libs/tendermint/state/execution.go +++ b/libs/tendermint/state/execution.go @@ -208,6 +208,7 @@ func (blockExec *BlockExecutor) ApplyBlock( } trc := trace.NewTracer(trace.ApplyBlock) trc.EnableSummary() + trc.SetWorkloadStatistic(trace.GetApplyBlockWorkloadSttistic()) dc := blockExec.deltaContext defer func() { @@ -215,6 +216,7 @@ func (blockExec *BlockExecutor) ApplyBlock( trace.GetElapsedInfo().AddInfo(trace.Tx, strconv.Itoa(len(block.Data.Txs))) trace.GetElapsedInfo().AddInfo(trace.BlockSize, strconv.Itoa(block.FastSize())) trace.GetElapsedInfo().AddInfo(trace.RunTx, trc.Format()) + trace.GetElapsedInfo().AddInfo(trace.Workload, trace.GetApplyBlockWorkloadSttistic().Format()) trace.GetElapsedInfo().SetElapsedTime(trc.GetElapsedTime()) now := time.Now().UnixNano() @@ -238,6 +240,7 @@ func (blockExec *BlockExecutor) ApplyBlock( abciResponses, duration, err := blockExec.runAbci(block, deltaInfo) trace.GetElapsedInfo().AddInfo(trace.LastRun, fmt.Sprintf("%dms", duration.Milliseconds())) + trace.GetApplyBlockWorkloadSttistic().Add(trace.LastRun, duration) if err != nil { return state, 0, ErrProxyAppConn(err) @@ -245,7 +248,6 @@ func (blockExec *BlockExecutor) ApplyBlock( fail.Fail() // XXX - // Save the results before we commit. blockExec.trySaveABCIResponsesAsync(block.Height, abciResponses) @@ -291,7 +293,6 @@ func (blockExec *BlockExecutor) ApplyBlock( fail.Fail() // XXX - // Update the app hash and save the state. state.AppHash = commitResp.Data blockExec.trySaveStateAsync(state)