Skip to content

Refactor concurrency package to include new metrics and adjust concur… #171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions concurrency/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@ package concurrency
import "time"

const (
// MaxConcurrency defines the upper limit of concurrent requests the system can handle.
MaxConcurrency = 10

// MinConcurrency defines the lower limit of concurrent requests the system will maintain.
MinConcurrency = 1

// EvaluationInterval specifies the frequency at which the system evaluates its performance metrics
// to decide if concurrency adjustments are needed.
EvaluationInterval = 1 * time.Minute

// MaxAcceptableTTFB represents the maximum acceptable Time to First Byte (TTFB) in milliseconds.
// TTFB is the time taken for the server to start sending the first byte of data in response to a request.
// Adjustments in concurrency will be made if the TTFB exceeds this threshold.
Expand All @@ -32,13 +22,4 @@ const (
// Error rate is calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests.
// Adjustments in concurrency will be made if the error rate exceeds this threshold. A threshold of 0.1 (or 10%) is common.
ErrorRateThreshold = 0.1

// Weight assigned to each metric feedback type
WeightRateLimit = 0.5 // Weight for rate limit feedback, less if not all APIs provide this data
WeightResponseCodes = 1.0 // Weight for server response codes
WeightResponseTime = 1.5 // Higher weight for response time variability

// Thresholds for semaphore scaling actions
ThresholdScaleDown = -1.5 // Threshold to decide scaling down
ThresholdScaleUp = 1.5 // Threshold to decide scaling up
)
49 changes: 18 additions & 31 deletions concurrency/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,23 @@ import (
"time"

"github.com/deploymenttheory/go-api-http-client/logger"
"golang.org/x/sync/semaphore"
)

// ConcurrencyHandler controls the number of concurrent HTTP requests.
// type ConcurrencyHandler struct {
// sem chan struct{}
// logger logger.Logger
// AcquisitionTimes []time.Duration
// lock sync.Mutex
// lastTokenAcquisitionTime time.Time
// Metrics *ConcurrencyMetrics
// }
// Constants and Data Structures:
const (
MaxConcurrency = 10 // Maximum allowed concurrent requests
MinConcurrency = 1 // Minimum allowed concurrent requests
EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency
)

// ConcurrencyHandler controls the number of concurrent HTTP requests.
type ConcurrencyHandler struct {
sem *semaphore.Weighted
lock sync.RWMutex
logger logger.Logger
Metrics *ConcurrencyMetrics
currentCapacity int64
activePermits int64
sem chan struct{}
logger logger.Logger
AcquisitionTimes []time.Duration
lock sync.Mutex
lastTokenAcquisitionTime time.Time
Metrics *ConcurrencyMetrics
}

// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
Expand Down Expand Up @@ -63,22 +60,12 @@ type ConcurrencyMetrics struct {
// concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures
// no more than a certain number of concurrent requests are made.
// It uses a semaphore to control concurrency.
//
// func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
// return &ConcurrencyHandler{
// sem: make(chan struct{}, limit),
// logger: logger,
// AcquisitionTimes: []time.Duration{},
// Metrics: metrics,
// }
// }
func NewConcurrencyHandler(limit int64, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
return &ConcurrencyHandler{
sem: semaphore.NewWeighted(limit),
logger: logger,
Metrics: metrics,
currentCapacity: limit,
activePermits: 0,
sem: make(chan struct{}, limit),
logger: logger,
AcquisitionTimes: []time.Duration{},
Metrics: metrics,
}
}

Expand Down
211 changes: 87 additions & 124 deletions concurrency/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,57 +27,49 @@ import (
//
// It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration) {
// Call monitoring functions
rateLimitFeedback := ch.MonitorRateLimitHeaders(resp)
responseCodeFeedback := ch.MonitorServerResponseCodes(resp)
responseTimeFeedback := ch.MonitorResponseTimeVariability(responseTime)

// Compute the weighted feedback
weightedFeedback := float64(rateLimitFeedback)*WeightRateLimit +
float64(responseCodeFeedback)*WeightResponseCodes +
float64(responseTimeFeedback)*WeightResponseTime

// Log the feedback and weighted result for debugging
// Log the feedback from each monitoring function for debugging
ch.logger.Debug("Concurrency Adjustment Feedback",
zap.Float64("WeightedFeedback", weightedFeedback))
zap.Int("RateLimitFeedback", rateLimitFeedback),
zap.Int("ResponseCodeFeedback", responseCodeFeedback),
zap.Int("ResponseTimeFeedback", responseTimeFeedback))

// Determine overall action based on feedback
suggestions := []int{rateLimitFeedback, responseCodeFeedback, responseTimeFeedback}
scaleDownCount := 0
scaleUpCount := 0

for _, suggestion := range suggestions {
switch suggestion {
case -1:
scaleDownCount++
case 1:
scaleUpCount++
}
}

// Apply thresholds to determine scaling action
if weightedFeedback <= ThresholdScaleDown {
ch.logger.Info("Scaling down the concurrency", zap.Float64("WeightedFeedback", weightedFeedback))
// Log the counts for scale down and up suggestions
ch.logger.Info("Scaling Decision Counts",
zap.Int("ScaleDownCount", scaleDownCount),
zap.Int("ScaleUpCount", scaleUpCount))

// Decide on scaling action
if scaleDownCount > scaleUpCount {
ch.logger.Info("Scaling down the concurrency", zap.String("Reason", "More signals suggested to decrease concurrency"))
ch.ScaleDown()
} else if weightedFeedback >= ThresholdScaleUp {
ch.logger.Info("Scaling up the concurrency", zap.Float64("WeightedFeedback", weightedFeedback))
} else if scaleUpCount > scaleDownCount {
ch.logger.Info("Scaling up the concurrency", zap.String("Reason", "More signals suggested to increase concurrency"))
ch.ScaleUp()
} else {
ch.logger.Info("Maintaining current concurrency level", zap.Float64("WeightedFeedback", weightedFeedback))
ch.logger.Info("No change in concurrency", zap.String("Reason", "Equal signals for both scaling up and down"))
}
}

// MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
// func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
// remaining := resp.Header.Get("X-RateLimit-Remaining")
// retryAfter := resp.Header.Get("Retry-After")
// suggestion := 0

// if remaining != "" {
// remainingValue, err := strconv.Atoi(remaining)
// if err == nil && remainingValue < 10 {
// // Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
// suggestion = -1
// }
// }

// if retryAfter != "" {
// // Suggest decrease concurrency if Retry-After is specified
// suggestion = -1
// } else {
// // Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
// if len(ch.sem) < MaxConcurrency && suggestion == 0 {
// suggestion = 1
// }
// }

// return suggestion
// }
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
remaining := resp.Header.Get("X-RateLimit-Remaining")
retryAfter := resp.Header.Get("Retry-After")
Expand All @@ -86,128 +78,99 @@ func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
if remaining != "" {
remainingValue, err := strconv.Atoi(remaining)
if err == nil && remainingValue < 10 {
// Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
suggestion = -1
}
}

if retryAfter != "" {
// Suggest decrease concurrency if Retry-After is specified
suggestion = -1
} else {
// Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
if len(ch.sem) < MaxConcurrency && suggestion == 0 {
suggestion = 1
}
}

return suggestion
}

// MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.
// func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
// statusCode := resp.StatusCode

// // Lock the metrics to ensure thread safety
// ch.Metrics.Lock.Lock()
// defer ch.Metrics.Lock.Unlock()

// // Update the appropriate error count based on the response status code
// switch {
// case statusCode >= 500 && statusCode < 600:
// ch.Metrics.TotalRateLimitErrors++
// case statusCode >= 400 && statusCode < 500:
// // Assuming 4xx errors as client errors
// ch.Metrics.TotalRetries++
// }

// // Calculate error rate
// totalRequests := float64(ch.Metrics.TotalRequests)
// totalErrors := float64(ch.Metrics.TotalRateLimitErrors + ch.Metrics.TotalRetries)
// errorRate := totalErrors / totalRequests

// // Set the new error rate in the metrics
// ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate

// // Determine action based on the error rate
// if errorRate > ErrorRateThreshold {
// // Suggest decrease concurrency
// return -1
// } else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
// // Suggest increase concurrency if there is capacity
// return 1
// }
// return 0
// }
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
statusCode := resp.StatusCode

// Lock the metrics to ensure thread safety
ch.Metrics.Lock.Lock()
defer ch.Metrics.Lock.Unlock()

if statusCode >= 500 {
// Update the appropriate error count based on the response status code
switch {
case statusCode >= 500 && statusCode < 600:
ch.Metrics.TotalRateLimitErrors++
return -1
} else if statusCode >= 400 {
case statusCode >= 400 && statusCode < 500:
// Assuming 4xx errors as client errors
ch.Metrics.TotalRetries++
return -1
}

// Calculate error rate
totalRequests := float64(ch.Metrics.TotalRequests)
totalErrors := float64(ch.Metrics.TotalRateLimitErrors + ch.Metrics.TotalRetries)
errorRate := totalErrors / totalRequests

// Set the new error rate in the metrics
ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate

// Determine action based on the error rate
if errorRate > ErrorRateThreshold {
// Suggest decrease concurrency
return -1
} else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
// Suggest increase concurrency if there is capacity
return 1
}
return 0
}

// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
// func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
// ch.Metrics.Lock.Lock()
// defer ch.Metrics.Lock.Unlock()

// // Update ResponseTimeVariability metrics
// ch.Metrics.ResponseTimeVariability.Lock.Lock()
// defer ch.Metrics.ResponseTimeVariability.Lock.Unlock()
// ch.Metrics.ResponseTimeVariability.Total += responseTime
// ch.Metrics.ResponseTimeVariability.Count++

// // Calculate average response time
// ch.Metrics.ResponseTimeVariability.Average = ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)

// // Calculate variance of response times
// ch.Metrics.ResponseTimeVariability.Variance = ch.calculateVariance(ch.Metrics.ResponseTimeVariability.Average, responseTime)

// // Calculate standard deviation of response times
// stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)

// // Determine action based on standard deviation
// if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
// // Suggest decrease concurrency
// return -1
// } else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
// // Suggest increase concurrency if there is capacity
// return 1
// }
// return 0
// }
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
ch.Metrics.Lock.Lock()
defer ch.Metrics.Lock.Unlock()

// Update total response time and count
// Update ResponseTimeVariability metrics
ch.Metrics.ResponseTimeVariability.Lock.Lock()
defer ch.Metrics.ResponseTimeVariability.Lock.Unlock()
ch.Metrics.ResponseTimeVariability.Total += responseTime
ch.Metrics.ResponseTimeVariability.Count++

// Calculate the average response time
averageResponseTime := ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)
// Calculate average response time
ch.Metrics.ResponseTimeVariability.Average = ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)

// Calculate variance
variance := ch.calculateVariance(averageResponseTime, responseTime)
// Calculate standard deviation
stdDev := math.Sqrt(variance)
// Calculate variance of response times
ch.Metrics.ResponseTimeVariability.Variance = ch.calculateVariance(ch.Metrics.ResponseTimeVariability.Average, responseTime)

// Convert MaxAcceptableResponseTimeVariability to seconds for comparison
maxStdDev := MaxAcceptableResponseTimeVariability.Seconds()
// Calculate standard deviation of response times
stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)

if stdDev > maxStdDev {
return -1 // Suggest to decrease concurrency if stdDev exceeds the maximum threshold
// Determine action based on standard deviation
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
// Suggest decrease concurrency
return -1
} else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
// Suggest increase concurrency if there is capacity
return 1
}
return 1 // Suggest to increase concurrency if stdDev is within the acceptable range
return 0
}

// calculateVariance calculates the variance between the average response time and a new sample.
func (ch *ConcurrencyHandler) calculateVariance(average, newSample time.Duration) float64 {
mean := average.Seconds() // Convert to seconds
newValue := newSample.Seconds() // Convert to seconds
newVariance := (float64(ch.Metrics.ResponseTimeVariability.Count-1)*math.Pow(mean-newValue, 2) + ch.Metrics.ResponseTimeVariability.Variance) / float64(ch.Metrics.ResponseTimeVariability.Count)
ch.Metrics.ResponseTimeVariability.Variance = newVariance // Update the variance in metrics
return newVariance
// calculateVariance calculates the variance of response times.
func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duration, responseTime time.Duration) float64 {
// Convert time.Duration values to seconds
averageSeconds := averageResponseTime.Seconds()
responseSeconds := responseTime.Seconds()

// Calculate variance
variance := (float64(ch.Metrics.ResponseTimeVariability.Count-1)*math.Pow(averageSeconds-responseSeconds, 2) + ch.Metrics.ResponseTimeVariability.Variance) / float64(ch.Metrics.ResponseTimeVariability.Count)
ch.Metrics.ResponseTimeVariability.Variance = variance
return variance
}
Loading
Loading