Skip to content

Commit

Permalink
Remove worker hardware utilization code
Browse files Browse the repository at this point in the history
  • Loading branch information
3vilhamster committed Nov 11, 2024
1 parent 5ec4386 commit 4bc7808
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 90 deletions.
7 changes: 0 additions & 7 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,6 @@ const (
ReplaySkippedCounter = CadenceMetricsPrefix + "replay-skipped"
ReplayLatency = CadenceMetricsPrefix + "replay-latency"

NumCPUCores = CadenceMetricsPrefix + "num-cpu-cores"
CPUPercentage = CadenceMetricsPrefix + "cpu-percentage"
TotalMemory = CadenceMetricsPrefix + "total-memory"
MemoryUsedHeap = CadenceMetricsPrefix + "memory-used-heap"
MemoryUsedStack = CadenceMetricsPrefix + "memory-used-stack"
NumGoRoutines = CadenceMetricsPrefix + "num-go-routines"

EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size"
ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size"
ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota"
Expand Down
5 changes: 0 additions & 5 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ const (
testTagsContextKey = "cadence-testTags"
clientVersionTag = "cadence_client_version"
clientGauge = "client_version_metric"
clientHostTag = "cadence_client_host"
)

type (
Expand Down Expand Up @@ -330,9 +329,6 @@ func newWorkflowTaskWorkerInternal(
// 3) the result pushed to laTunnel will be send as task to workflow worker to process.
worker.taskQueueCh = laTunnel.resultCh

worker.options.host = params.Host
localActivityWorker.options.host = params.Host

return &workflowWorker{
executionParameters: params,
workflowService: service,
Expand Down Expand Up @@ -507,7 +503,6 @@ func newActivityTaskWorker(
workerParams.MetricsScope,
sessionTokenBucket,
)
base.options.host = workerParams.Host

return &activityWorker{
executionParameters: workerParams,
Expand Down
58 changes: 1 addition & 57 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ import (
"errors"
"fmt"
"os"
"runtime"
"sync"
"syscall"
"time"

"go.uber.org/cadence/internal/common/debug"

"github.com/shirou/gopsutil/cpu"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -50,7 +48,6 @@ import (
const (
retryPollOperationInitialInterval = 20 * time.Millisecond
retryPollOperationMaxInterval = 10 * time.Second
hardwareMetricsCollectInterval = 30 * time.Second
)

var (
Expand All @@ -59,8 +56,6 @@ var (

var errShutdown = errors.New("worker shutting down")

var collectHardwareUsageOnce sync.Once

type (
// resultHandler that returns result
resultHandler func(result []byte, err error)
Expand Down Expand Up @@ -223,7 +218,6 @@ func (bw *baseWorker) Start() {
// We want the emit function run once per host instead of run once per worker
// since the emit function is host level metric.
bw.shutdownWG.Add(1)
go bw.emitHardwareUsage()

bw.isWorkerStarted = true
traceLog(func() {
Expand Down Expand Up @@ -400,7 +394,7 @@ func (bw *baseWorker) Run() {
bw.Stop()
}

// Shutdown is a blocking call and cleans up all the resources associated with worker.
// Stop is a blocking call and cleans up all the resources associated with worker.
func (bw *baseWorker) Stop() {
if !bw.isWorkerStarted {
return
Expand All @@ -423,53 +417,3 @@ func (bw *baseWorker) Stop() {
}
return
}

func (bw *baseWorker) emitHardwareUsage() {
defer func() {
if p := recover(); p != nil {
bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1)
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
st := getStackTraceRaw(topLine, 7, 0)
bw.logger.Error("Unhandled panic in hardware emitting.",
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
}
}()
defer bw.shutdownWG.Done()
collectHardwareUsageOnce.Do(
func() {
ticker := time.NewTicker(hardwareMetricsCollectInterval)
for {
select {
case <-bw.shutdownCh:
ticker.Stop()
return
case <-ticker.C:
host := bw.options.host
scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host})

cpuPercent, err := cpu.Percent(0, false)
if err != nil {
bw.logger.Warn("Failed to get cpu percent", zap.Error(err))
return
}
cpuCores, err := cpu.Counts(false)
if err != nil {
bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
return
}
scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores))
scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0])

var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)

scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine()))
scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys))
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse))
scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse))
}
}
})

}
17 changes: 0 additions & 17 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,6 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithStrictNonDeterminism() {
worker.Stop()
}

func (s *internalWorkerTestSuite) TestCreateWorker_WithHost() {
worker := createWorkerWithHost(s.T(), s.service)
err := worker.Start()
require.NoError(s.T(), err)
time.Sleep(time.Millisecond * 200)
assert.Equal(s.T(), "test_host", worker.activityWorker.worker.options.host)
assert.Equal(s.T(), "test_host", worker.workflowWorker.worker.options.host)
worker.Stop()
}

func (s *internalWorkerTestSuite) TestCreateWorkerRun() {
// Create service endpoint
mockCtrl := gomock.NewController(s.T())
Expand Down Expand Up @@ -445,13 +435,6 @@ func createWorkerWithStrictNonDeterminismDisabled(
return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{DisableStrictNonDeterminismCheck: true}})
}

func createWorkerWithHost(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"})
}

func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) {
t := s.T()
mockService := s.service
Expand Down
4 changes: 0 additions & 4 deletions internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,6 @@ type (
// default: No provider
Authorization auth.AuthorizationProvider

// Optional: Host is just string on the machine running the client
// default: empty string
Host string

// Optional: See WorkerBugPorts for more details
//
// Deprecated: All bugports are always deprecated and may be removed at any time.
Expand Down

0 comments on commit 4bc7808

Please sign in to comment.