From f3c8c57e164c0ab4d9e60b3ef7a3987e805d67ed Mon Sep 17 00:00:00 2001 From: ShocOne <62835948+ShocOne@users.noreply.github.com> Date: Thu, 18 Apr 2024 15:56:30 +0100 Subject: [PATCH 1/3] Refactor concurrency package to include new metrics and remove unused code --- concurrency/metrics.go | 143 +++++++++++++++++------------------------ concurrency/scale.go | 36 +++++++++++ httpclient/request.go | 10 +-- 3 files changed, 97 insertions(+), 92 deletions(-) create mode 100644 concurrency/scale.go diff --git a/concurrency/metrics.go b/concurrency/metrics.go index b0cc1f2..59eb04e 100644 --- a/concurrency/metrics.go +++ b/concurrency/metrics.go @@ -5,52 +5,66 @@ import ( "net/http" "strconv" "time" - - "go.uber.org/zap" ) -// MonitorRateLimitHeaders monitors the rate limit headers (X-RateLimit-Remaining and Retry-After) -// in the HTTP response and adjusts concurrency accordingly. -// If X-RateLimit-Remaining is below a threshold or Retry-After is specified, decrease concurrency. -// If neither condition is met, consider scaling up if concurrency is below the maximum limit. -// - Threshold for X-RateLimit-Remaining: 10 -// - Maximum concurrency: MaxConcurrency -func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) { - // Extract X-RateLimit-Remaining and Retry-After headers from the response +// EvaluateAndAdjustConcurrency evaluates the response from the server and adjusts the concurrency level accordingly. +func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration) { + // Call monitoring functions + rateLimitFeedback := ch.MonitorRateLimitHeaders(resp) + responseCodeFeedback := ch.MonitorServerResponseCodes(resp) + responseTimeFeedback := ch.MonitorResponseTimeVariability(responseTime) + + // Determine overall action based on feedback + suggestions := []int{rateLimitFeedback, responseCodeFeedback, responseTimeFeedback} + scaleDownCount := 0 + scaleUpCount := 0 + + for _, suggestion := range suggestions { + switch suggestion { + case -1: + scaleDownCount++ + case 1: + scaleUpCount++ + } + } + + // Decide on scaling action + if scaleDownCount > scaleUpCount { + ch.ScaleDown() + } else if scaleUpCount > scaleDownCount { + ch.ScaleUp() + } +} + +// MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment. +func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int { remaining := resp.Header.Get("X-RateLimit-Remaining") retryAfter := resp.Header.Get("Retry-After") + suggestion := 0 if remaining != "" { remainingValue, err := strconv.Atoi(remaining) if err == nil && remainingValue < 10 { - // Decrease concurrency if X-RateLimit-Remaining is below the threshold - if len(ch.sem) > MinConcurrency { - newSize := len(ch.sem) - 1 - ch.logger.Info("Reducing concurrency due to low X-RateLimit-Remaining", zap.Int("NewSize", newSize)) - ch.ResizeSemaphore(newSize) - } + // Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold + suggestion = -1 } } if retryAfter != "" { - // Decrease concurrency if Retry-After is specified - if len(ch.sem) > MinConcurrency { - newSize := len(ch.sem) - 1 - ch.logger.Info("Reducing concurrency due to Retry-After header", zap.Int("NewSize", newSize)) - ch.ResizeSemaphore(newSize) - } + // Suggest decrease concurrency if Retry-After is specified + suggestion = -1 } else { - // Scale up if concurrency is below the maximum limit - if len(ch.sem) < MaxConcurrency { - newSize := len(ch.sem) + 1 - ch.logger.Info("Increasing concurrency", zap.Int("NewSize", newSize)) - ch.ResizeSemaphore(newSize) + // Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made + if len(ch.sem) < MaxConcurrency && suggestion == 0 { + suggestion = 1 } } + + return suggestion } -// MonitorServerResponseCodes monitors server response codes and adjusts concurrency accordingly. -func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) { +// MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment. +func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int { statusCode := resp.StatusCode // Lock the metrics to ensure thread safety @@ -63,7 +77,6 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) { ch.Metrics.TotalRateLimitErrors++ case statusCode >= 400 && statusCode < 500: // Assuming 4xx errors as client errors - // Increase the TotalRetries count to indicate a client error ch.Metrics.TotalRetries++ } @@ -75,23 +88,19 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) { // Set the new error rate in the metrics ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate - // Check if the error rate exceeds the threshold and adjust concurrency accordingly - if errorRate > ErrorRateThreshold && len(ch.sem) > MinConcurrency { - // Decrease concurrency - newSize := len(ch.sem) - 1 - ch.logger.Info("Reducing request concurrency due to high error rate", zap.Int("NewSize", newSize)) - ch.ResizeSemaphore(newSize) + // Determine action based on the error rate + if errorRate > ErrorRateThreshold { + // Suggest decrease concurrency + return -1 } else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency { - // Scale up if error rate is below the threshold and concurrency is below the maximum limit - newSize := len(ch.sem) + 1 - ch.logger.Info("Increasing request concurrency due to low error rate", zap.Int("NewSize", newSize)) - ch.ResizeSemaphore(newSize) + // Suggest increase concurrency if there is capacity + return 1 } + return 0 } -// MonitorResponseTimeVariability calculates the standard deviation of response times -// and uses moving averages to smooth out fluctuations, adjusting concurrency accordingly. -func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) { +// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment. +func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int { ch.Metrics.Lock.Lock() defer ch.Metrics.Lock.Unlock() @@ -110,17 +119,15 @@ func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.D // Calculate standard deviation of response times stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance) - // Adjust concurrency based on response time variability - if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) > MinConcurrency { - newSize := len(ch.sem) - 1 - ch.logger.Info("Reducing request concurrency due to high response time variability", zap.Int("NewSize", newSize)) - ch.ResizeSemaphore(newSize) + // Determine action based on standard deviation + if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold { + // Suggest decrease concurrency + return -1 } else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency { - // Scale up if response time variability is below the threshold and concurrency is below the maximum limit - newSize := len(ch.sem) + 1 - ch.logger.Info("Increasing request concurrency due to low response time variability", zap.Int("NewSize", newSize)) - ch.ResizeSemaphore(newSize) + // Suggest increase concurrency if there is capacity + return 1 } + return 0 } // calculateVariance calculates the variance of response times. @@ -134,35 +141,3 @@ func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duratio ch.Metrics.ResponseTimeVariability.Variance = variance return variance } - -// MonitorNetworkLatency measures Time to First Byte (TTFB) and monitors network throughput, -// adjusting concurrency based on changes in network latency and throughput. -func (ch *ConcurrencyHandler) MonitorNetworkLatency(ttfb time.Duration, throughput float64) { - ch.Metrics.Lock.Lock() - defer ch.Metrics.Lock.Unlock() - - // Calculate the TTFB moving average - ch.Metrics.TTFB.Lock.Lock() - defer ch.Metrics.TTFB.Lock.Unlock() - ch.Metrics.TTFB.Total += ttfb - ch.Metrics.TTFB.Count++ - ttfbMovingAverage := ch.Metrics.TTFB.Total / time.Duration(ch.Metrics.TTFB.Count) - - // Calculate the throughput moving average - ch.Metrics.Throughput.Lock.Lock() - defer ch.Metrics.Throughput.Lock.Unlock() - ch.Metrics.Throughput.Total += throughput - ch.Metrics.Throughput.Count++ - throughputMovingAverage := ch.Metrics.Throughput.Total / float64(ch.Metrics.Throughput.Count) - - // Adjust concurrency based on TTFB and throughput moving averages - if ttfbMovingAverage > MaxAcceptableTTFB && len(ch.sem) > MinConcurrency { - newSize := len(ch.sem) - 1 - ch.logger.Info("Reducing request concurrency due to high TTFB", zap.Int("NewSize", newSize)) - ch.ResizeSemaphore(newSize) - } else if throughputMovingAverage > MaxAcceptableThroughput && len(ch.sem) < MaxConcurrency { - newSize := len(ch.sem) + 1 - ch.logger.Info("Increasing request concurrency due to high throughput", zap.Int("NewSize", newSize)) - ch.ResizeSemaphore(newSize) - } -} diff --git a/concurrency/scale.go b/concurrency/scale.go new file mode 100644 index 0000000..ee20528 --- /dev/null +++ b/concurrency/scale.go @@ -0,0 +1,36 @@ +package concurrency + +import "go.uber.org/zap" + +// ScaleDown reduces the concurrency level by one, down to the minimum limit. +func (ch *ConcurrencyHandler) ScaleDown() { + // Lock to ensure thread safety + ch.lock.Lock() + defer ch.lock.Unlock() + + // We must consider the capacity rather than the length of the semaphore channel + currentSize := cap(ch.sem) + if currentSize > MinConcurrency { + newSize := currentSize - 1 + ch.logger.Info("Reducing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize)) + ch.ResizeSemaphore(newSize) + } else { + ch.logger.Info("Concurrency already at minimum level; cannot reduce further", zap.Int("currentSize", currentSize)) + } +} + +// ScaleUp increases the concurrency level by one, up to the maximum limit. +func (ch *ConcurrencyHandler) ScaleUp() { + // Lock to ensure thread safety + ch.lock.Lock() + defer ch.lock.Unlock() + + currentSize := cap(ch.sem) + if currentSize < MaxConcurrency { + newSize := currentSize + 1 + ch.logger.Info("Increasing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize)) + ch.ResizeSemaphore(newSize) + } else { + ch.logger.Info("Concurrency already at maximum level; cannot increase further", zap.Int("currentSize", currentSize)) + } +} diff --git a/httpclient/request.go b/httpclient/request.go index c659e6c..80c4e4d 100644 --- a/httpclient/request.go +++ b/httpclient/request.go @@ -342,14 +342,8 @@ func (c *Client) executeRequest(method, endpoint string, body, out interface{}) // Calculate the duration between sending the request and receiving the response duration := time.Since(startTime) - // Monitor response time variability - c.ConcurrencyHandler.MonitorResponseTimeVariability(duration) - - // Monitor server response codes - c.ConcurrencyHandler.MonitorServerResponseCodes(resp) - - // Monitor rate limit headers - c.ConcurrencyHandler.MonitorRateLimitHeaders(resp) + // Evaluate and adjust concurrency based on the request's feedback + c.ConcurrencyHandler.EvaluateAndAdjustConcurrency(resp, duration) // Log outgoing cookies log.LogCookies("incoming", req, method, endpoint) From 4daab0768e5bfce8885806669b1e9c81cd5fe7ef Mon Sep 17 00:00:00 2001 From: ShocOne <62835948+ShocOne@users.noreply.github.com> Date: Thu, 18 Apr 2024 15:57:42 +0100 Subject: [PATCH 2/3] Refactor concurrency package to include new metrics and remove unused code --- concurrency/metrics.go | 1 + concurrency/resize.go | 1 - concurrency/scale.go | 1 + 3 files changed, 2 insertions(+), 1 deletion(-) diff --git a/concurrency/metrics.go b/concurrency/metrics.go index 59eb04e..ed9bb14 100644 --- a/concurrency/metrics.go +++ b/concurrency/metrics.go @@ -1,3 +1,4 @@ +// concurrency/metrics.go package concurrency import ( diff --git a/concurrency/resize.go b/concurrency/resize.go index ef55174..9402e93 100644 --- a/concurrency/resize.go +++ b/concurrency/resize.go @@ -1,5 +1,4 @@ // concurrency/resize.go - package concurrency // ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new diff --git a/concurrency/scale.go b/concurrency/scale.go index ee20528..73a8105 100644 --- a/concurrency/scale.go +++ b/concurrency/scale.go @@ -1,3 +1,4 @@ +// concurrency/scale.go package concurrency import "go.uber.org/zap" From cce6e692522e052bc3793d66dd55bae166e29f42 Mon Sep 17 00:00:00 2001 From: ShocOne <62835948+ShocOne@users.noreply.github.com> Date: Thu, 18 Apr 2024 16:02:12 +0100 Subject: [PATCH 3/3] Refactor concurrency package to include new metrics and adjust concurrency based on feedback --- concurrency/metrics.go | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/concurrency/metrics.go b/concurrency/metrics.go index ed9bb14..c041e54 100644 --- a/concurrency/metrics.go +++ b/concurrency/metrics.go @@ -6,15 +6,38 @@ import ( "net/http" "strconv" "time" + + "go.uber.org/zap" ) -// EvaluateAndAdjustConcurrency evaluates the response from the server and adjusts the concurrency level accordingly. +// EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time +// and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions: +// MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which +// provides feedback on different aspects of the response and system's current state. The function aggregates +// feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency. +// The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1), +// it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control +// decision-making, providing a systematic approach to managing request handling capacity based on real-time +// operational metrics. +// +// Parameters: +// +// resp - The HTTP response received from the server. +// responseTime - The time duration between sending the request and receiving the response. +// +// It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance. func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration) { // Call monitoring functions rateLimitFeedback := ch.MonitorRateLimitHeaders(resp) responseCodeFeedback := ch.MonitorServerResponseCodes(resp) responseTimeFeedback := ch.MonitorResponseTimeVariability(responseTime) + // Log the feedback from each monitoring function for debugging + ch.logger.Debug("Concurrency Adjustment Feedback", + zap.Int("RateLimitFeedback", rateLimitFeedback), + zap.Int("ResponseCodeFeedback", responseCodeFeedback), + zap.Int("ResponseTimeFeedback", responseTimeFeedback)) + // Determine overall action based on feedback suggestions := []int{rateLimitFeedback, responseCodeFeedback, responseTimeFeedback} scaleDownCount := 0 @@ -29,11 +52,20 @@ func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, } } + // Log the counts for scale down and up suggestions + ch.logger.Info("Scaling Decision Counts", + zap.Int("ScaleDownCount", scaleDownCount), + zap.Int("ScaleUpCount", scaleUpCount)) + // Decide on scaling action if scaleDownCount > scaleUpCount { + ch.logger.Info("Scaling down the concurrency", zap.String("Reason", "More signals suggested to decrease concurrency")) ch.ScaleDown() } else if scaleUpCount > scaleDownCount { + ch.logger.Info("Scaling up the concurrency", zap.String("Reason", "More signals suggested to increase concurrency")) ch.ScaleUp() + } else { + ch.logger.Info("No change in concurrency", zap.String("Reason", "Equal signals for both scaling up and down")) } }