From 271e2cd5ae3abd96d127499ce19a55d59111d7be Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Wed, 25 May 2022 16:21:20 +0800 Subject: [PATCH] Update to using frequency mode to `ON_CPU` Profiling. (#37) --- CHANGES.md | 1 + bpf/profiling/oncpu.c | 21 ++++- bpf/profiling/oncpu.h | 5 +- pkg/profiling/task/offcpu/runner.go | 3 + pkg/profiling/task/oncpu/runner.go | 137 ++++++++++++---------------- 5 files changed, 88 insertions(+), 79 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index aaa79e31..858b912f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ Release Notes. #### Features * Support `OFF_CPU` Profiling. * Introduce the `BTFHub` module. +* Update to using frequency mode to `ON_CPU` Profiling. #### Bug Fixes diff --git a/bpf/profiling/oncpu.c b/bpf/profiling/oncpu.c index fa4321a6..0c258934 100644 --- a/bpf/profiling/oncpu.c +++ b/bpf/profiling/oncpu.c @@ -22,6 +22,16 @@ char __license[] SEC("license") = "Dual MIT/GPL"; SEC("perf_event") int do_perf_event(struct pt_regs *ctx) { + int monitor_pid; + asm("%0 = MONITOR_PID ll" : "=r"(monitor_pid)); + + // only match the same pid + __u64 id = bpf_get_current_pid_tgid(); + __u32 tgid = id >> 32; + if (tgid != monitor_pid) { + return 0; + } + // create map key struct key_t key = {}; @@ -29,6 +39,15 @@ int do_perf_event(struct pt_regs *ctx) { key.kernel_stack_id = bpf_get_stackid(ctx, &stacks, 0); key.user_stack_id = bpf_get_stackid(ctx, &stacks, BPF_F_USER_STACK); - bpf_perf_event_output(ctx, &counts, BPF_F_CURRENT_CPU, &key, sizeof(key)); + __u32 *val; + val = bpf_map_lookup_elem(&counts, &key); + if (!val) { + __u32 count = 0; + bpf_map_update_elem(&counts, &key, &count, BPF_NOEXIST); + val = bpf_map_lookup_elem(&counts, &key); + if (!val) + return 0; + } + (*val) += 1; return 0; } \ No newline at end of file diff --git a/bpf/profiling/oncpu.h b/bpf/profiling/oncpu.h index 6fc467d2..51c29401 100644 --- a/bpf/profiling/oncpu.h +++ b/bpf/profiling/oncpu.h @@ -21,7 +21,10 @@ struct key_t { }; struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, struct key_t); + __type(value, __u32); + __uint(max_entries, 10000); } counts SEC(".maps"); struct { diff --git a/pkg/profiling/task/offcpu/runner.go b/pkg/profiling/task/offcpu/runner.go index dca53432..d45a8992 100644 --- a/pkg/profiling/task/offcpu/runner.go +++ b/pkg/profiling/task/offcpu/runner.go @@ -185,6 +185,9 @@ func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, error) { duration -= int64(existCounter.Deltas) } r.previousStacks[stack] = counter + if switchCount <= 0 { + continue + } result = append(result, &v3.EBPFProfilingData{ Profiling: &v3.EBPFProfilingData_OffCPU{ diff --git a/pkg/profiling/task/oncpu/runner.go b/pkg/profiling/task/oncpu/runner.go index 7c9cc332..8051ce6f 100644 --- a/pkg/profiling/task/oncpu/runner.go +++ b/pkg/profiling/task/oncpu/runner.go @@ -20,18 +20,11 @@ package oncpu import ( - "bytes" "context" - "encoding/binary" - "errors" "fmt" - "os" "runtime" "time" - "github.com/cilium/ebpf" - "github.com/cilium/ebpf/perf" - "github.com/hashicorp/go-multierror" "github.com/apache/skywalking-rover/pkg/logger" @@ -61,14 +54,14 @@ type Runner struct { pid int32 processProfiling *profiling.Info kernelProfiling *profiling.Info - dumpPeriod time.Duration + dumpFrequency int64 // runtime perfEventFds []int - countReader *perf.Reader - stackCounter map[Event]int - stackMap *ebpf.Map + bpf *bpfObjects + stackCounter map[Event]uint32 flushDataNotify context.CancelFunc + stopChan chan bool } func NewRunner(config *base.TaskConfig) (base.ProfileTaskRunner, error) { @@ -83,9 +76,8 @@ func NewRunner(config *base.TaskConfig) (base.ProfileTaskRunner, error) { return nil, fmt.Errorf("the ON_CPU dump period could not be smaller than 1ms") } return &Runner{ - base: base.NewBaseRunner(), - dumpPeriod: dumpPeriod, - stackCounter: make(map[Event]int), + base: base.NewBaseRunner(), + dumpFrequency: time.Second.Milliseconds() / dumpPeriod.Milliseconds(), }, nil } @@ -101,25 +93,36 @@ func (r *Runner) Init(task *base.ProfilingTask, process api.ProcessInterface) er log.Warnf("could not analyze kernel profiling stats: %v", err) } r.kernelProfiling = kernelProfiling - r.stackCounter = make(map[Event]int) + r.stackCounter = make(map[Event]uint32) + r.stopChan = make(chan bool, 1) return nil } func (r *Runner) Run(ctx context.Context, notify base.ProfilingRunningSuccessNotify) error { // load bpf objs := bpfObjects{} - if err := loadBpfObjects(&objs, nil); err != nil { + spec, err := loadBpf() + if err != nil { return err } - defer objs.Close() - r.stackMap = objs.Stacks - - // init profiling data reader - rd, err := perf.NewReader(objs.Counts, os.Getpagesize()) - if err != nil { - return fmt.Errorf("creating perf event reader: %s", err) + // update the monitor pid + funcName := "do_perf_event" + replacedPid := false + for i, ins := range spec.Programs[funcName].Instructions { + if ins.Reference == "MONITOR_PID" { + spec.Programs[funcName].Instructions[i].Constant = int64(r.pid) + spec.Programs[funcName].Instructions[i].Offset = 0 + replacedPid = true + } + } + if !replacedPid { + return fmt.Errorf("replace the monitor pid failure") } - r.countReader = rd + if err1 := spec.LoadAndAssign(&objs, nil); err1 != nil { + log.Fatalf("loading objects: %s", err1) + } + defer objs.Close() + r.bpf = &objs // opened perf events perfEvents, err := r.openPerfEvent(objs.DoPerfEvent.FD()) @@ -131,48 +134,24 @@ func (r *Runner) Run(ctx context.Context, notify base.ProfilingRunningSuccessNot // notify start success notify() runtime.SetFinalizer(r, (*Runner).Stop) - - // read content - var event Event - for { - record, err := rd.Read() - if err != nil { - if errors.Is(err, perf.ErrClosed) { - return nil - } - log.Warnf("reading from perf event reader: %s", err) - continue - } - - if record.LostSamples != 0 { - log.Warnf("perf event ring buffer full, dropped %d samples", record.LostSamples) - continue - } - - // parse perf event data - if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil { - log.Errorf("parsing perf event error: %s", err) - continue - } - - r.stackCounter[event]++ - } + <-r.stopChan + return nil } func (r *Runner) openPerfEvent(perfFd int) ([]int, error) { eventAttr := &unix.PerfEventAttr{ - Type: unix.PERF_TYPE_SOFTWARE, - Config: unix.PERF_COUNT_SW_CPU_CLOCK, - Sample_type: unix.PERF_SAMPLE_RAW, - Sample: uint64(r.dumpPeriod.Nanoseconds()), - Wakeup: 1, + Type: unix.PERF_TYPE_SOFTWARE, + Config: unix.PERF_COUNT_SW_CPU_CLOCK, + Bits: unix.PerfBitFreq, + Sample: uint64(r.dumpFrequency), + Wakeup: 1, } fds := make([]int, 0) for cpuNum := 0; cpuNum < runtime.NumCPU(); cpuNum++ { fd, err := unix.PerfEventOpen( eventAttr, - int(r.pid), + -1, cpuNum, -1, 0, @@ -213,8 +192,8 @@ func (r *Runner) Stop() error { case <-time.After(5 * time.Second): } - if r.countReader != nil { - if err := r.countReader.Close(); err != nil { + if r.bpf != nil { + if err := r.bpf.Close(); err != nil { result = multierror.Append(result, err) } } @@ -223,30 +202,38 @@ func (r *Runner) Stop() error { } func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, error) { - existsCounters := r.flushStackCounter() - + var stack Event + var counter uint32 + iterate := r.bpf.Counts.Iterate() + stacks := r.bpf.Stacks result := make([]*v3.EBPFProfilingData, 0) stackSymbols := make([]uint64, 100) - for event, count := range existsCounters { + for iterate.Next(&stack, &counter) { metadatas := make([]*v3.EBPFProfilingStackMetadata, 0) // kernel stack - if d := r.base.GenerateProfilingData(r.kernelProfiling, event.KernelStackID, r.stackMap, + if d := r.base.GenerateProfilingData(r.kernelProfiling, stack.KernelStackID, stacks, v3.EBPFProfilingStackType_PROCESS_KERNEL_SPACE, stackSymbols); d != nil { metadatas = append(metadatas, d) } // user stack - if d := r.base.GenerateProfilingData(r.processProfiling, event.UserStackID, r.stackMap, + if d := r.base.GenerateProfilingData(r.processProfiling, stack.UserStackID, stacks, v3.EBPFProfilingStackType_PROCESS_USER_SPACE, stackSymbols); d != nil { metadatas = append(metadatas, d) } - // close the flush data notify if exists - if r.flushDataNotify != nil { - r.flushDataNotify() + if len(metadatas) == 0 { + continue } - if len(metadatas) == 0 { + // update the counters in memory + dumpCount := int32(counter) + existCounter := r.stackCounter[stack] + if existCounter > 0 { + dumpCount -= int32(existCounter) + } + r.stackCounter[stack] = counter + if dumpCount <= 0 { continue } @@ -254,22 +241,18 @@ func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, error) { Profiling: &v3.EBPFProfilingData_OnCPU{ OnCPU: &v3.EBPFOnCPUProfiling{ Stacks: metadatas, - DumpCount: int32(count), + DumpCount: dumpCount, }, }, }) } - return result, nil -} - -func (r *Runner) flushStackCounter() map[Event]int { - updateTo := make(map[Event]int) - updateToP := &updateTo + // close the flush data notify if exists + if r.flushDataNotify != nil { + r.flushDataNotify() + } - older := &r.stackCounter - *older, *updateToP = *updateToP, *older - return updateTo + return result, nil } func (r *Runner) closePerfEvent(fd int) error {