diff --git a/config.yaml b/config.yaml index 7c14645a8e..4df45ae15f 100644 --- a/config.yaml +++ b/config.yaml @@ -5,3 +5,11 @@ relabel_configs: - source_labels: [__meta_process_executable_compiler] target_label: compiler action: replace + +symbol_upload: + allowlist: [] +# - parca +# GPU workloads +# - ollama +# - cuda +# - torch diff --git a/config/config.go b/config/config.go index f988151d66..4bffee5a1c 100644 --- a/config/config.go +++ b/config/config.go @@ -27,6 +27,11 @@ var ErrEmptyConfig = errors.New("empty config") // Config holds all the configuration information for Parca Agent. type Config struct { RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` + SymbolUpload SymbolUpload `yaml:"symbol_upload,omitempty"` +} + +type SymbolUpload struct { + Allowlist []string `yaml:"allowlist,omitempty"` } func (c Config) String() string { diff --git a/main.go b/main.go index bd9b697eb4..e887c5f639 100644 --- a/main.go +++ b/main.go @@ -230,7 +230,7 @@ func mainWithExitCode() flags.ExitCode { log.Info("report sent successfully") - if exiterr, ok := err.(*exec.ExitError); ok { //nolint: errorlint + if exiterr, ok := err.(*exec.ExitError); ok { // nolint: errorlint return flags.ExitCode(exiterr.ExitCode()) } @@ -283,7 +283,10 @@ func mainWithExitCode() flags.ExitCode { return flags.Failure("Failed to parse the included tracers: %s", err) } - var relabelConfigs []*relabel.Config + var ( + relabelConfigs []*relabel.Config + symbolsUploadAllowlist []string + ) if f.ConfigPath == "" { log.Info("no config file provided, using default config") } else { @@ -297,11 +300,11 @@ func mainWithExitCode() flags.ExitCode { if cfgFile != nil { log.Infof("using config file: %s", f.ConfigPath) relabelConfigs = cfgFile.RelabelConfigs + symbolsUploadAllowlist = cfgFile.SymbolUpload.Allowlist } } - traceHandlerCacheSize := - traceCacheSize(f.Profiling.Duration, f.Profiling.CPUSamplingFrequency, uint16(presentCores)) + traceHandlerCacheSize := traceCacheSize(f.Profiling.Duration, f.Profiling.CPUSamplingFrequency, uint16(presentCores)) intervals := times.New(5*time.Second, f.Profiling.Duration, f.Profiling.ProbabilisticInterval) times.StartRealtimeSync(mainCtx, f.ClockSyncInterval) @@ -332,6 +335,7 @@ func mainWithExitCode() flags.ExitCode { f.Debuginfo.Strip, f.Debuginfo.UploadMaxParallel, f.Debuginfo.UploadDisable || isOfflineMode, + symbolsUploadAllowlist, int64(f.Profiling.CPUSamplingFrequency), traceHandlerCacheSize, f.Debuginfo.UploadQueueSize, @@ -478,8 +482,11 @@ func getTelemetryMetadata(numCPU int) map[string]string { // Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large // (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is // therefore used here. -func traceCacheSize(monitorInterval time.Duration, samplesPerSecond int, - presentCPUCores uint16) uint32 { +func traceCacheSize( + monitorInterval time.Duration, + samplesPerSecond int, + presentCPUCores uint16, +) uint32 { const ( traceCacheIntervals = 6 traceCacheMinSize = 65536 @@ -494,8 +501,11 @@ func traceCacheSize(monitorInterval time.Duration, samplesPerSecond int, return util.NextPowerOfTwo(size) } -func maxElementsPerInterval(monitorInterval time.Duration, samplesPerSecond int, - presentCPUCores uint16) uint32 { +func maxElementsPerInterval( + monitorInterval time.Duration, + samplesPerSecond int, + presentCPUCores uint16, +) uint32 { return uint32(samplesPerSecond) * uint32(monitorInterval.Seconds()) * uint32(presentCPUCores) } @@ -504,7 +514,8 @@ func getTracePipe() (*os.File, error) { "/sys/kernel/debug/tracing", "/sys/kernel/tracing", "/tracing", - "/trace"} { + "/trace", + } { t, err := os.Open(mnt + "/trace_pipe") if err == nil { return t, nil diff --git a/reporter/parca_reporter.go b/reporter/parca_reporter.go index 644b518672..6a8445cede 100644 --- a/reporter/parca_reporter.go +++ b/reporter/parca_reporter.go @@ -14,6 +14,7 @@ import ( "errors" "fmt" "io" + "log/slog" "os" "path" "strings" @@ -29,8 +30,6 @@ import ( "github.com/apache/arrow/go/v16/arrow/memory" lru "github.com/elastic/go-freelru" "github.com/klauspost/compress/zstd" - "github.com/parca-dev/parca-agent/metrics" - "github.com/parca-dev/parca-agent/reporter/metadata" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -42,6 +41,9 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf/xsync" otelmetrics "go.opentelemetry.io/ebpf-profiler/metrics" "go.opentelemetry.io/ebpf-profiler/reporter" + + "github.com/parca-dev/parca-agent/metrics" + "github.com/parca-dev/parca-agent/reporter/metadata" ) // Assert that we implement the full Reporter interface. @@ -117,6 +119,9 @@ type ParcaReporter struct { // disableSymbolUpload disables the symbol upload. disableSymbolUpload bool + // symbolUploadAllowlist is checked before uploading symbols. + symbolUploadAllowlist []string + // reportInterval is the interval at which to report data. reportInterval time.Duration @@ -167,8 +172,8 @@ func (r *ParcaReporter) SupportsReportTraceEvent() bool { return true } // ReportTraceEvent enqueues reported trace events for the OTLP reporter. func (r *ParcaReporter) ReportTraceEvent(trace *libpf.Trace, - meta *reporter.TraceEventMeta) { - + meta *reporter.TraceEventMeta, +) { // This is an LRU so we need to check every time if the stack is already // known, as it might have been evicted. if _, exists := r.stacks.Get(trace.Hash); !exists { @@ -266,7 +271,6 @@ func (r *ParcaReporter) ExecutableKnown(fileID libpf.FileID) bool { // ExecutableMetadata accepts a fileID with the corresponding filename // and caches this information. func (r *ParcaReporter) ExecutableMetadata(args *reporter.ExecutableMetadataArgs) { - if args.Interp != libpf.Native { r.executables.Add(args.FileID, metadata.ExecInfo{ FileName: args.FileName, @@ -275,6 +279,20 @@ func (r *ParcaReporter) ExecutableMetadata(args *reporter.ExecutableMetadataArgs return } + if len(r.symbolUploadAllowlist) > 0 { + var allowed bool + for _, s := range r.symbolUploadAllowlist { + if strings.Contains(args.FileName, s) { + log.Infof("executable found in allowlist, file: '%s' matches '%s'", args.FileName, s) + allowed = true + break + } + } + if !allowed { + log.Debugf("ignoring executable, not found in allowlist, file: %s", args.FileName) + } + } + // Always attempt to upload, the uploader is responsible for deduplication. if !r.disableSymbolUpload { r.uploader.Upload(context.TODO(), args.FileID, args.GnuBuildID, args.Open) @@ -363,8 +381,12 @@ func (r *ParcaReporter) ReportHostMetadata(metadataMap map[string]string) { } // ReportHostMetadataBlocking enqueues host metadata. -func (r *ParcaReporter) ReportHostMetadataBlocking(_ context.Context, - metadataMap map[string]string, _ int, _ time.Duration) error { +func (r *ParcaReporter) ReportHostMetadataBlocking( + _ context.Context, + _ map[string]string, + _ int, + _ time.Duration, +) error { // noop return nil } @@ -460,6 +482,7 @@ func New( stripTextSection bool, symbolUploadConcurrency int, disableSymbolUpload bool, + symbolUploadAllowlist []string, samplesPerSecond int64, cacheSize uint32, uploaderQueueSize uint32, @@ -524,21 +547,24 @@ func New( reg.MustRegister(sampleWriteRequestBytes) reg.MustRegister(stacktraceWriteRequestBytes) + slog.Info("reporter found allowlist", "list", symbolUploadAllowlist) + r := &ParcaReporter{ - stopSignal: make(chan libpf.Void), - client: nil, - executables: executables, - labels: labels, - frames: frames, - sampleWriter: NewSampleWriter(mem), - stacks: stacks, - mem: mem, - externalLabels: externalLabels, - samplesPerSecond: samplesPerSecond, - disableSymbolUpload: disableSymbolUpload, - reportInterval: reportInterval, - nodeName: nodeName, - relabelConfigs: relabelConfigs, + stopSignal: make(chan libpf.Void), + client: nil, + executables: executables, + labels: labels, + frames: frames, + sampleWriter: NewSampleWriter(mem), + stacks: stacks, + mem: mem, + externalLabels: externalLabels, + samplesPerSecond: samplesPerSecond, + disableSymbolUpload: disableSymbolUpload, + symbolUploadAllowlist: symbolUploadAllowlist, + reportInterval: reportInterval, + nodeName: nodeName, + relabelConfigs: relabelConfigs, metadataProviders: []metadata.MetadataProvider{ metadata.NewProcessMetadataProvider(), metadata.NewMainExecutableMetadataProvider(executables), @@ -575,8 +601,10 @@ func New( return r, nil } -const DATA_FILE_EXTENSION string = ".padata" -const DATA_FILE_COMPRESSED_EXTENSION string = ".padata.zst" +const ( + DATA_FILE_EXTENSION string = ".padata" + DATA_FILE_COMPRESSED_EXTENSION string = ".padata.zst" +) // initialScan inspects the storage directory to determine its size, and whether there are any // uncompressed files lying around. @@ -615,7 +643,7 @@ func initialScan(storagePath string) (map[string]uint64, []string, uint64, error } func compressFile(file io.Reader, fpath, compressedFpath string) error { - compressedLog, err := os.OpenFile(compressedFpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0660) + compressedLog, err := os.OpenFile(compressedFpath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o660) if err != nil { return fmt.Errorf("Failed to create compressed file %s for log rotation: %w", compressedFpath, err) } @@ -641,7 +669,7 @@ func compressFile(file io.Reader, fpath, compressedFpath string) error { func setupOfflineModeLog(fpath string) (*os.File, error) { // Open the log file - file, err := os.OpenFile(fpath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0660) + file, err := os.OpenFile(fpath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o660) if err != nil { return nil, fmt.Errorf("failed to create new offline mode file %s: %w", fpath, err) } @@ -661,7 +689,6 @@ func (r *ParcaReporter) rotateOfflineModeLog() error { logFile, err := setupOfflineModeLog(fpath) if err != nil { return fmt.Errorf("Failed to create new log %s for offline mode: %w", fpath, err) - } // We are connected to the new log, let's take the old one and compress it r.offlineModeLogMu.Lock() @@ -727,7 +754,7 @@ func (r *ParcaReporter) Start(mainCtx context.Context) error { } if r.offlineModeConfig != nil { - if err := os.MkdirAll(r.offlineModeConfig.StoragePath, 0770); err != nil { + if err := os.MkdirAll(r.offlineModeConfig.StoragePath, 0o770); err != nil { return fmt.Errorf("error creating offline mode storage: %v", err) } go func() { @@ -993,7 +1020,6 @@ func (r *ParcaReporter) reportDataToBackend(ctx context.Context, buf *bytes.Buff } rec, err = r.buildStacktraceRecord(ctx, stacktraceIDs) - if err != nil { return err }