Skip to content

Commit

Permalink
Update to using frequency mode to ON_CPU Profiling. (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored May 25, 2022
1 parent 3fd8498 commit 271e2cd
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 79 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Release Notes.
#### Features
* Support `OFF_CPU` Profiling.
* Introduce the `BTFHub` module.
* Update to using frequency mode to `ON_CPU` Profiling.

#### Bug Fixes

Expand Down
21 changes: 20 additions & 1 deletion bpf/profiling/oncpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,32 @@ char __license[] SEC("license") = "Dual MIT/GPL";

SEC("perf_event")
int do_perf_event(struct pt_regs *ctx) {
int monitor_pid;
asm("%0 = MONITOR_PID ll" : "=r"(monitor_pid));

// only match the same pid
__u64 id = bpf_get_current_pid_tgid();
__u32 tgid = id >> 32;
if (tgid != monitor_pid) {
return 0;
}

// create map key
struct key_t key = {};

// get stacks
key.kernel_stack_id = bpf_get_stackid(ctx, &stacks, 0);
key.user_stack_id = bpf_get_stackid(ctx, &stacks, BPF_F_USER_STACK);

bpf_perf_event_output(ctx, &counts, BPF_F_CURRENT_CPU, &key, sizeof(key));
__u32 *val;
val = bpf_map_lookup_elem(&counts, &key);
if (!val) {
__u32 count = 0;
bpf_map_update_elem(&counts, &key, &count, BPF_NOEXIST);
val = bpf_map_lookup_elem(&counts, &key);
if (!val)
return 0;
}
(*val) += 1;
return 0;
}
5 changes: 4 additions & 1 deletion bpf/profiling/oncpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ struct key_t {
};

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct key_t);
__type(value, __u32);
__uint(max_entries, 10000);
} counts SEC(".maps");

struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/profiling/task/offcpu/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, error) {
duration -= int64(existCounter.Deltas)
}
r.previousStacks[stack] = counter
if switchCount <= 0 {
continue
}

result = append(result, &v3.EBPFProfilingData{
Profiling: &v3.EBPFProfilingData_OffCPU{
Expand Down
137 changes: 60 additions & 77 deletions pkg/profiling/task/oncpu/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,11 @@
package oncpu

import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"runtime"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"

"github.com/hashicorp/go-multierror"

"github.com/apache/skywalking-rover/pkg/logger"
Expand Down Expand Up @@ -61,14 +54,14 @@ type Runner struct {
pid int32
processProfiling *profiling.Info
kernelProfiling *profiling.Info
dumpPeriod time.Duration
dumpFrequency int64

// runtime
perfEventFds []int
countReader *perf.Reader
stackCounter map[Event]int
stackMap *ebpf.Map
bpf *bpfObjects
stackCounter map[Event]uint32
flushDataNotify context.CancelFunc
stopChan chan bool
}

func NewRunner(config *base.TaskConfig) (base.ProfileTaskRunner, error) {
Expand All @@ -83,9 +76,8 @@ func NewRunner(config *base.TaskConfig) (base.ProfileTaskRunner, error) {
return nil, fmt.Errorf("the ON_CPU dump period could not be smaller than 1ms")
}
return &Runner{
base: base.NewBaseRunner(),
dumpPeriod: dumpPeriod,
stackCounter: make(map[Event]int),
base: base.NewBaseRunner(),
dumpFrequency: time.Second.Milliseconds() / dumpPeriod.Milliseconds(),
}, nil
}

Expand All @@ -101,25 +93,36 @@ func (r *Runner) Init(task *base.ProfilingTask, process api.ProcessInterface) er
log.Warnf("could not analyze kernel profiling stats: %v", err)
}
r.kernelProfiling = kernelProfiling
r.stackCounter = make(map[Event]int)
r.stackCounter = make(map[Event]uint32)
r.stopChan = make(chan bool, 1)
return nil
}

func (r *Runner) Run(ctx context.Context, notify base.ProfilingRunningSuccessNotify) error {
// load bpf
objs := bpfObjects{}
if err := loadBpfObjects(&objs, nil); err != nil {
spec, err := loadBpf()
if err != nil {
return err
}
defer objs.Close()
r.stackMap = objs.Stacks

// init profiling data reader
rd, err := perf.NewReader(objs.Counts, os.Getpagesize())
if err != nil {
return fmt.Errorf("creating perf event reader: %s", err)
// update the monitor pid
funcName := "do_perf_event"
replacedPid := false
for i, ins := range spec.Programs[funcName].Instructions {
if ins.Reference == "MONITOR_PID" {
spec.Programs[funcName].Instructions[i].Constant = int64(r.pid)
spec.Programs[funcName].Instructions[i].Offset = 0
replacedPid = true
}
}
if !replacedPid {
return fmt.Errorf("replace the monitor pid failure")
}
r.countReader = rd
if err1 := spec.LoadAndAssign(&objs, nil); err1 != nil {
log.Fatalf("loading objects: %s", err1)
}
defer objs.Close()
r.bpf = &objs

// opened perf events
perfEvents, err := r.openPerfEvent(objs.DoPerfEvent.FD())
Expand All @@ -131,48 +134,24 @@ func (r *Runner) Run(ctx context.Context, notify base.ProfilingRunningSuccessNot
// notify start success
notify()
runtime.SetFinalizer(r, (*Runner).Stop)

// read content
var event Event
for {
record, err := rd.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
return nil
}
log.Warnf("reading from perf event reader: %s", err)
continue
}

if record.LostSamples != 0 {
log.Warnf("perf event ring buffer full, dropped %d samples", record.LostSamples)
continue
}

// parse perf event data
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Errorf("parsing perf event error: %s", err)
continue
}

r.stackCounter[event]++
}
<-r.stopChan
return nil
}

func (r *Runner) openPerfEvent(perfFd int) ([]int, error) {
eventAttr := &unix.PerfEventAttr{
Type: unix.PERF_TYPE_SOFTWARE,
Config: unix.PERF_COUNT_SW_CPU_CLOCK,
Sample_type: unix.PERF_SAMPLE_RAW,
Sample: uint64(r.dumpPeriod.Nanoseconds()),
Wakeup: 1,
Type: unix.PERF_TYPE_SOFTWARE,
Config: unix.PERF_COUNT_SW_CPU_CLOCK,
Bits: unix.PerfBitFreq,
Sample: uint64(r.dumpFrequency),
Wakeup: 1,
}

fds := make([]int, 0)
for cpuNum := 0; cpuNum < runtime.NumCPU(); cpuNum++ {
fd, err := unix.PerfEventOpen(
eventAttr,
int(r.pid),
-1,
cpuNum,
-1,
0,
Expand Down Expand Up @@ -213,8 +192,8 @@ func (r *Runner) Stop() error {
case <-time.After(5 * time.Second):
}

if r.countReader != nil {
if err := r.countReader.Close(); err != nil {
if r.bpf != nil {
if err := r.bpf.Close(); err != nil {
result = multierror.Append(result, err)
}
}
Expand All @@ -223,53 +202,57 @@ func (r *Runner) Stop() error {
}

func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, error) {
existsCounters := r.flushStackCounter()

var stack Event
var counter uint32
iterate := r.bpf.Counts.Iterate()
stacks := r.bpf.Stacks
result := make([]*v3.EBPFProfilingData, 0)
stackSymbols := make([]uint64, 100)
for event, count := range existsCounters {
for iterate.Next(&stack, &counter) {
metadatas := make([]*v3.EBPFProfilingStackMetadata, 0)
// kernel stack
if d := r.base.GenerateProfilingData(r.kernelProfiling, event.KernelStackID, r.stackMap,
if d := r.base.GenerateProfilingData(r.kernelProfiling, stack.KernelStackID, stacks,
v3.EBPFProfilingStackType_PROCESS_KERNEL_SPACE, stackSymbols); d != nil {
metadatas = append(metadatas, d)
}

// user stack
if d := r.base.GenerateProfilingData(r.processProfiling, event.UserStackID, r.stackMap,
if d := r.base.GenerateProfilingData(r.processProfiling, stack.UserStackID, stacks,
v3.EBPFProfilingStackType_PROCESS_USER_SPACE, stackSymbols); d != nil {
metadatas = append(metadatas, d)
}

// close the flush data notify if exists
if r.flushDataNotify != nil {
r.flushDataNotify()
if len(metadatas) == 0 {
continue
}

if len(metadatas) == 0 {
// update the counters in memory
dumpCount := int32(counter)
existCounter := r.stackCounter[stack]
if existCounter > 0 {
dumpCount -= int32(existCounter)
}
r.stackCounter[stack] = counter
if dumpCount <= 0 {
continue
}

result = append(result, &v3.EBPFProfilingData{
Profiling: &v3.EBPFProfilingData_OnCPU{
OnCPU: &v3.EBPFOnCPUProfiling{
Stacks: metadatas,
DumpCount: int32(count),
DumpCount: dumpCount,
},
},
})
}

return result, nil
}

func (r *Runner) flushStackCounter() map[Event]int {
updateTo := make(map[Event]int)
updateToP := &updateTo
// close the flush data notify if exists
if r.flushDataNotify != nil {
r.flushDataNotify()
}

older := &r.stackCounter
*older, *updateToP = *updateToP, *older
return updateTo
return result, nil
}

func (r *Runner) closePerfEvent(fd int) error {
Expand Down

0 comments on commit 271e2cd

Please sign in to comment.