Skip to content

implemented EvaluateAndAdjustConcurrency metrics decision maker #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 90 additions & 82 deletions concurrency/metrics.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// concurrency/metrics.go
package concurrency

import (
Expand All @@ -9,48 +10,94 @@ import (
"go.uber.org/zap"
)

// MonitorRateLimitHeaders monitors the rate limit headers (X-RateLimit-Remaining and Retry-After)
// in the HTTP response and adjusts concurrency accordingly.
// If X-RateLimit-Remaining is below a threshold or Retry-After is specified, decrease concurrency.
// If neither condition is met, consider scaling up if concurrency is below the maximum limit.
// - Threshold for X-RateLimit-Remaining: 10
// - Maximum concurrency: MaxConcurrency
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) {
// Extract X-RateLimit-Remaining and Retry-After headers from the response
// EvaluateAndAdjustConcurrency evaluates the HTTP response from a server along with the request's response time
// and adjusts the concurrency level of the system accordingly. It utilizes three monitoring functions:
// MonitorRateLimitHeaders, MonitorServerResponseCodes, and MonitorResponseTimeVariability, each of which
// provides feedback on different aspects of the response and system's current state. The function aggregates
// feedback from these monitoring functions to make a decision on whether to scale up or scale down the concurrency.
// The decision is based on a simple majority of suggestions: if more functions suggest scaling down (return -1),
// it scales down; if more suggest scaling up (return 1), it scales up. This method centralizes concurrency control
// decision-making, providing a systematic approach to managing request handling capacity based on real-time
// operational metrics.
//
// Parameters:
//
// resp - The HTTP response received from the server.
// responseTime - The time duration between sending the request and receiving the response.
//
// It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration) {
// Call monitoring functions
rateLimitFeedback := ch.MonitorRateLimitHeaders(resp)
responseCodeFeedback := ch.MonitorServerResponseCodes(resp)
responseTimeFeedback := ch.MonitorResponseTimeVariability(responseTime)

// Log the feedback from each monitoring function for debugging
ch.logger.Debug("Concurrency Adjustment Feedback",
zap.Int("RateLimitFeedback", rateLimitFeedback),
zap.Int("ResponseCodeFeedback", responseCodeFeedback),
zap.Int("ResponseTimeFeedback", responseTimeFeedback))

// Determine overall action based on feedback
suggestions := []int{rateLimitFeedback, responseCodeFeedback, responseTimeFeedback}
scaleDownCount := 0
scaleUpCount := 0

for _, suggestion := range suggestions {
switch suggestion {
case -1:
scaleDownCount++
case 1:
scaleUpCount++
}
}

// Log the counts for scale down and up suggestions
ch.logger.Info("Scaling Decision Counts",
zap.Int("ScaleDownCount", scaleDownCount),
zap.Int("ScaleUpCount", scaleUpCount))

// Decide on scaling action
if scaleDownCount > scaleUpCount {
ch.logger.Info("Scaling down the concurrency", zap.String("Reason", "More signals suggested to decrease concurrency"))
ch.ScaleDown()
} else if scaleUpCount > scaleDownCount {
ch.logger.Info("Scaling up the concurrency", zap.String("Reason", "More signals suggested to increase concurrency"))
ch.ScaleUp()
} else {
ch.logger.Info("No change in concurrency", zap.String("Reason", "Equal signals for both scaling up and down"))
}
}

// MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
remaining := resp.Header.Get("X-RateLimit-Remaining")
retryAfter := resp.Header.Get("Retry-After")
suggestion := 0

if remaining != "" {
remainingValue, err := strconv.Atoi(remaining)
if err == nil && remainingValue < 10 {
// Decrease concurrency if X-RateLimit-Remaining is below the threshold
if len(ch.sem) > MinConcurrency {
newSize := len(ch.sem) - 1
ch.logger.Info("Reducing concurrency due to low X-RateLimit-Remaining", zap.Int("NewSize", newSize))
ch.ResizeSemaphore(newSize)
}
// Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
suggestion = -1
}
}

if retryAfter != "" {
// Decrease concurrency if Retry-After is specified
if len(ch.sem) > MinConcurrency {
newSize := len(ch.sem) - 1
ch.logger.Info("Reducing concurrency due to Retry-After header", zap.Int("NewSize", newSize))
ch.ResizeSemaphore(newSize)
}
// Suggest decrease concurrency if Retry-After is specified
suggestion = -1
} else {
// Scale up if concurrency is below the maximum limit
if len(ch.sem) < MaxConcurrency {
newSize := len(ch.sem) + 1
ch.logger.Info("Increasing concurrency", zap.Int("NewSize", newSize))
ch.ResizeSemaphore(newSize)
// Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
if len(ch.sem) < MaxConcurrency && suggestion == 0 {
suggestion = 1
}
}

return suggestion
}

// MonitorServerResponseCodes monitors server response codes and adjusts concurrency accordingly.
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
// MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
statusCode := resp.StatusCode

// Lock the metrics to ensure thread safety
Expand All @@ -63,7 +110,6 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
ch.Metrics.TotalRateLimitErrors++
case statusCode >= 400 && statusCode < 500:
// Assuming 4xx errors as client errors
// Increase the TotalRetries count to indicate a client error
ch.Metrics.TotalRetries++
}

Expand All @@ -75,23 +121,19 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
// Set the new error rate in the metrics
ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate

// Check if the error rate exceeds the threshold and adjust concurrency accordingly
if errorRate > ErrorRateThreshold && len(ch.sem) > MinConcurrency {
// Decrease concurrency
newSize := len(ch.sem) - 1
ch.logger.Info("Reducing request concurrency due to high error rate", zap.Int("NewSize", newSize))
ch.ResizeSemaphore(newSize)
// Determine action based on the error rate
if errorRate > ErrorRateThreshold {
// Suggest decrease concurrency
return -1
} else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
// Scale up if error rate is below the threshold and concurrency is below the maximum limit
newSize := len(ch.sem) + 1
ch.logger.Info("Increasing request concurrency due to low error rate", zap.Int("NewSize", newSize))
ch.ResizeSemaphore(newSize)
// Suggest increase concurrency if there is capacity
return 1
}
return 0
}

// MonitorResponseTimeVariability calculates the standard deviation of response times
// and uses moving averages to smooth out fluctuations, adjusting concurrency accordingly.
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) {
// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
ch.Metrics.Lock.Lock()
defer ch.Metrics.Lock.Unlock()

Expand All @@ -110,17 +152,15 @@ func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.D
// Calculate standard deviation of response times
stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)

// Adjust concurrency based on response time variability
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) > MinConcurrency {
newSize := len(ch.sem) - 1
ch.logger.Info("Reducing request concurrency due to high response time variability", zap.Int("NewSize", newSize))
ch.ResizeSemaphore(newSize)
// Determine action based on standard deviation
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
// Suggest decrease concurrency
return -1
} else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
// Scale up if response time variability is below the threshold and concurrency is below the maximum limit
newSize := len(ch.sem) + 1
ch.logger.Info("Increasing request concurrency due to low response time variability", zap.Int("NewSize", newSize))
ch.ResizeSemaphore(newSize)
// Suggest increase concurrency if there is capacity
return 1
}
return 0
}

// calculateVariance calculates the variance of response times.
Expand All @@ -134,35 +174,3 @@ func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duratio
ch.Metrics.ResponseTimeVariability.Variance = variance
return variance
}

// MonitorNetworkLatency measures Time to First Byte (TTFB) and monitors network throughput,
// adjusting concurrency based on changes in network latency and throughput.
func (ch *ConcurrencyHandler) MonitorNetworkLatency(ttfb time.Duration, throughput float64) {
ch.Metrics.Lock.Lock()
defer ch.Metrics.Lock.Unlock()

// Calculate the TTFB moving average
ch.Metrics.TTFB.Lock.Lock()
defer ch.Metrics.TTFB.Lock.Unlock()
ch.Metrics.TTFB.Total += ttfb
ch.Metrics.TTFB.Count++
ttfbMovingAverage := ch.Metrics.TTFB.Total / time.Duration(ch.Metrics.TTFB.Count)

// Calculate the throughput moving average
ch.Metrics.Throughput.Lock.Lock()
defer ch.Metrics.Throughput.Lock.Unlock()
ch.Metrics.Throughput.Total += throughput
ch.Metrics.Throughput.Count++
throughputMovingAverage := ch.Metrics.Throughput.Total / float64(ch.Metrics.Throughput.Count)

// Adjust concurrency based on TTFB and throughput moving averages
if ttfbMovingAverage > MaxAcceptableTTFB && len(ch.sem) > MinConcurrency {
newSize := len(ch.sem) - 1
ch.logger.Info("Reducing request concurrency due to high TTFB", zap.Int("NewSize", newSize))
ch.ResizeSemaphore(newSize)
} else if throughputMovingAverage > MaxAcceptableThroughput && len(ch.sem) < MaxConcurrency {
newSize := len(ch.sem) + 1
ch.logger.Info("Increasing request concurrency due to high throughput", zap.Int("NewSize", newSize))
ch.ResizeSemaphore(newSize)
}
}
1 change: 0 additions & 1 deletion concurrency/resize.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// concurrency/resize.go

package concurrency

// ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new
Expand Down
37 changes: 37 additions & 0 deletions concurrency/scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// concurrency/scale.go
package concurrency

import "go.uber.org/zap"

// ScaleDown reduces the concurrency level by one, down to the minimum limit.
func (ch *ConcurrencyHandler) ScaleDown() {
// Lock to ensure thread safety
ch.lock.Lock()
defer ch.lock.Unlock()

// We must consider the capacity rather than the length of the semaphore channel
currentSize := cap(ch.sem)
if currentSize > MinConcurrency {
newSize := currentSize - 1
ch.logger.Info("Reducing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize))
ch.ResizeSemaphore(newSize)
} else {
ch.logger.Info("Concurrency already at minimum level; cannot reduce further", zap.Int("currentSize", currentSize))
}
}

// ScaleUp increases the concurrency level by one, up to the maximum limit.
func (ch *ConcurrencyHandler) ScaleUp() {
// Lock to ensure thread safety
ch.lock.Lock()
defer ch.lock.Unlock()

currentSize := cap(ch.sem)
if currentSize < MaxConcurrency {
newSize := currentSize + 1
ch.logger.Info("Increasing request concurrency", zap.Int("currentSize", currentSize), zap.Int("newSize", newSize))
ch.ResizeSemaphore(newSize)
} else {
ch.logger.Info("Concurrency already at maximum level; cannot increase further", zap.Int("currentSize", currentSize))
}
}
10 changes: 2 additions & 8 deletions httpclient/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,8 @@ func (c *Client) executeRequest(method, endpoint string, body, out interface{})
// Calculate the duration between sending the request and receiving the response
duration := time.Since(startTime)

// Monitor response time variability
c.ConcurrencyHandler.MonitorResponseTimeVariability(duration)

// Monitor server response codes
c.ConcurrencyHandler.MonitorServerResponseCodes(resp)

// Monitor rate limit headers
c.ConcurrencyHandler.MonitorRateLimitHeaders(resp)
// Evaluate and adjust concurrency based on the request's feedback
c.ConcurrencyHandler.EvaluateAndAdjustConcurrency(resp, duration)

// Log outgoing cookies
log.LogCookies("incoming", req, method, endpoint)
Expand Down