Skip to content

Commit 4a19620

Browse files
authored
Merge pull request #169 from deploymenttheory/dev
Refactor concurrency package to include new metrics and adjust concur…
2 parents 8fb326d + 79c9451 commit 4a19620

File tree

10 files changed

+389
-194
lines changed

10 files changed

+389
-194
lines changed

concurrency/const.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@ package concurrency
44
import "time"
55

66
const (
7+
// MaxConcurrency defines the upper limit of concurrent requests the system can handle.
8+
MaxConcurrency = 10
9+
10+
// MinConcurrency defines the lower limit of concurrent requests the system will maintain.
11+
MinConcurrency = 1
12+
13+
// EvaluationInterval specifies the frequency at which the system evaluates its performance metrics
14+
// to decide if concurrency adjustments are needed.
15+
EvaluationInterval = 1 * time.Minute
16+
717
// MaxAcceptableTTFB represents the maximum acceptable Time to First Byte (TTFB) in milliseconds.
818
// TTFB is the time taken for the server to start sending the first byte of data in response to a request.
919
// Adjustments in concurrency will be made if the TTFB exceeds this threshold.
@@ -22,4 +32,13 @@ const (
2232
// Error rate is calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests.
2333
// Adjustments in concurrency will be made if the error rate exceeds this threshold. A threshold of 0.1 (or 10%) is common.
2434
ErrorRateThreshold = 0.1
35+
36+
// Weight assigned to each metric feedback type
37+
WeightRateLimit = 0.5 // Weight for rate limit feedback, less if not all APIs provide this data
38+
WeightResponseCodes = 1.0 // Weight for server response codes
39+
WeightResponseTime = 1.5 // Higher weight for response time variability
40+
41+
// Thresholds for semaphore scaling actions
42+
ThresholdScaleDown = -1.5 // Threshold to decide scaling down
43+
ThresholdScaleUp = 1.5 // Threshold to decide scaling up
2544
)

concurrency/handler.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,26 @@ import (
66
"time"
77

88
"github.com/deploymenttheory/go-api-http-client/logger"
9-
)
10-
11-
// Constants and Data Structures:
12-
const (
13-
MaxConcurrency = 10 // Maximum allowed concurrent requests
14-
MinConcurrency = 1 // Minimum allowed concurrent requests
15-
EvaluationInterval = 1 * time.Minute // Time interval for evaluating metrics and adjusting concurrency
9+
"golang.org/x/sync/semaphore"
1610
)
1711

1812
// ConcurrencyHandler controls the number of concurrent HTTP requests.
13+
// type ConcurrencyHandler struct {
14+
// sem chan struct{}
15+
// logger logger.Logger
16+
// AcquisitionTimes []time.Duration
17+
// lock sync.Mutex
18+
// lastTokenAcquisitionTime time.Time
19+
// Metrics *ConcurrencyMetrics
20+
// }
21+
1922
type ConcurrencyHandler struct {
20-
sem chan struct{}
21-
logger logger.Logger
22-
AcquisitionTimes []time.Duration
23-
lock sync.Mutex
24-
lastTokenAcquisitionTime time.Time
25-
Metrics *ConcurrencyMetrics
23+
sem *semaphore.Weighted
24+
lock sync.RWMutex
25+
logger logger.Logger
26+
Metrics *ConcurrencyMetrics
27+
currentCapacity int64
28+
activePermits int64
2629
}
2730

2831
// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
@@ -60,12 +63,22 @@ type ConcurrencyMetrics struct {
6063
// concurrency limit, logger, and concurrency metrics. The ConcurrencyHandler ensures
6164
// no more than a certain number of concurrent requests are made.
6265
// It uses a semaphore to control concurrency.
63-
func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
66+
//
67+
// func NewConcurrencyHandler(limit int, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
68+
// return &ConcurrencyHandler{
69+
// sem: make(chan struct{}, limit),
70+
// logger: logger,
71+
// AcquisitionTimes: []time.Duration{},
72+
// Metrics: metrics,
73+
// }
74+
// }
75+
func NewConcurrencyHandler(limit int64, logger logger.Logger, metrics *ConcurrencyMetrics) *ConcurrencyHandler {
6476
return &ConcurrencyHandler{
65-
sem: make(chan struct{}, limit),
66-
logger: logger,
67-
AcquisitionTimes: []time.Duration{},
68-
Metrics: metrics,
77+
sem: semaphore.NewWeighted(limit),
78+
logger: logger,
79+
Metrics: metrics,
80+
currentCapacity: limit,
81+
activePermits: 0,
6982
}
7083
}
7184

concurrency/metrics.go

Lines changed: 124 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -27,49 +27,57 @@ import (
2727
//
2828
// It logs the specific reason for scaling decisions, helping in traceability and fine-tuning system performance.
2929
func (ch *ConcurrencyHandler) EvaluateAndAdjustConcurrency(resp *http.Response, responseTime time.Duration) {
30-
// Call monitoring functions
3130
rateLimitFeedback := ch.MonitorRateLimitHeaders(resp)
3231
responseCodeFeedback := ch.MonitorServerResponseCodes(resp)
3332
responseTimeFeedback := ch.MonitorResponseTimeVariability(responseTime)
3433

35-
// Log the feedback from each monitoring function for debugging
36-
ch.logger.Debug("Concurrency Adjustment Feedback",
37-
zap.Int("RateLimitFeedback", rateLimitFeedback),
38-
zap.Int("ResponseCodeFeedback", responseCodeFeedback),
39-
zap.Int("ResponseTimeFeedback", responseTimeFeedback))
40-
41-
// Determine overall action based on feedback
42-
suggestions := []int{rateLimitFeedback, responseCodeFeedback, responseTimeFeedback}
43-
scaleDownCount := 0
44-
scaleUpCount := 0
45-
46-
for _, suggestion := range suggestions {
47-
switch suggestion {
48-
case -1:
49-
scaleDownCount++
50-
case 1:
51-
scaleUpCount++
52-
}
53-
}
34+
// Compute the weighted feedback
35+
weightedFeedback := float64(rateLimitFeedback)*WeightRateLimit +
36+
float64(responseCodeFeedback)*WeightResponseCodes +
37+
float64(responseTimeFeedback)*WeightResponseTime
5438

55-
// Log the counts for scale down and up suggestions
56-
ch.logger.Info("Scaling Decision Counts",
57-
zap.Int("ScaleDownCount", scaleDownCount),
58-
zap.Int("ScaleUpCount", scaleUpCount))
39+
// Log the feedback and weighted result for debugging
40+
ch.logger.Debug("Concurrency Adjustment Feedback",
41+
zap.Float64("WeightedFeedback", weightedFeedback))
5942

60-
// Decide on scaling action
61-
if scaleDownCount > scaleUpCount {
62-
ch.logger.Info("Scaling down the concurrency", zap.String("Reason", "More signals suggested to decrease concurrency"))
43+
// Apply thresholds to determine scaling action
44+
if weightedFeedback <= ThresholdScaleDown {
45+
ch.logger.Info("Scaling down the concurrency", zap.Float64("WeightedFeedback", weightedFeedback))
6346
ch.ScaleDown()
64-
} else if scaleUpCount > scaleDownCount {
65-
ch.logger.Info("Scaling up the concurrency", zap.String("Reason", "More signals suggested to increase concurrency"))
47+
} else if weightedFeedback >= ThresholdScaleUp {
48+
ch.logger.Info("Scaling up the concurrency", zap.Float64("WeightedFeedback", weightedFeedback))
6649
ch.ScaleUp()
6750
} else {
68-
ch.logger.Info("No change in concurrency", zap.String("Reason", "Equal signals for both scaling up and down"))
51+
ch.logger.Info("Maintaining current concurrency level", zap.Float64("WeightedFeedback", weightedFeedback))
6952
}
7053
}
7154

7255
// MonitorRateLimitHeaders monitors the rate limit headers in the response and suggests a concurrency adjustment.
56+
// func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
57+
// remaining := resp.Header.Get("X-RateLimit-Remaining")
58+
// retryAfter := resp.Header.Get("Retry-After")
59+
// suggestion := 0
60+
61+
// if remaining != "" {
62+
// remainingValue, err := strconv.Atoi(remaining)
63+
// if err == nil && remainingValue < 10 {
64+
// // Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
65+
// suggestion = -1
66+
// }
67+
// }
68+
69+
// if retryAfter != "" {
70+
// // Suggest decrease concurrency if Retry-After is specified
71+
// suggestion = -1
72+
// } else {
73+
// // Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
74+
// if len(ch.sem) < MaxConcurrency && suggestion == 0 {
75+
// suggestion = 1
76+
// }
77+
// }
78+
79+
// return suggestion
80+
// }
7381
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
7482
remaining := resp.Header.Get("X-RateLimit-Remaining")
7583
retryAfter := resp.Header.Get("Retry-After")
@@ -78,99 +86,128 @@ func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) int {
7886
if remaining != "" {
7987
remainingValue, err := strconv.Atoi(remaining)
8088
if err == nil && remainingValue < 10 {
81-
// Suggest decrease concurrency if X-RateLimit-Remaining is below the threshold
8289
suggestion = -1
8390
}
8491
}
8592

8693
if retryAfter != "" {
87-
// Suggest decrease concurrency if Retry-After is specified
8894
suggestion = -1
89-
} else {
90-
// Suggest increase concurrency if currently below maximum limit and no other decrease suggestion has been made
91-
if len(ch.sem) < MaxConcurrency && suggestion == 0 {
92-
suggestion = 1
93-
}
9495
}
9596

9697
return suggestion
9798
}
9899

99100
// MonitorServerResponseCodes monitors the response status codes and suggests a concurrency adjustment.
101+
// func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
102+
// statusCode := resp.StatusCode
103+
104+
// // Lock the metrics to ensure thread safety
105+
// ch.Metrics.Lock.Lock()
106+
// defer ch.Metrics.Lock.Unlock()
107+
108+
// // Update the appropriate error count based on the response status code
109+
// switch {
110+
// case statusCode >= 500 && statusCode < 600:
111+
// ch.Metrics.TotalRateLimitErrors++
112+
// case statusCode >= 400 && statusCode < 500:
113+
// // Assuming 4xx errors as client errors
114+
// ch.Metrics.TotalRetries++
115+
// }
116+
117+
// // Calculate error rate
118+
// totalRequests := float64(ch.Metrics.TotalRequests)
119+
// totalErrors := float64(ch.Metrics.TotalRateLimitErrors + ch.Metrics.TotalRetries)
120+
// errorRate := totalErrors / totalRequests
121+
122+
// // Set the new error rate in the metrics
123+
// ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate
124+
125+
// // Determine action based on the error rate
126+
// if errorRate > ErrorRateThreshold {
127+
// // Suggest decrease concurrency
128+
// return -1
129+
// } else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
130+
// // Suggest increase concurrency if there is capacity
131+
// return 1
132+
// }
133+
// return 0
134+
// }
100135
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) int {
101136
statusCode := resp.StatusCode
102-
103-
// Lock the metrics to ensure thread safety
104137
ch.Metrics.Lock.Lock()
105138
defer ch.Metrics.Lock.Unlock()
106139

107-
// Update the appropriate error count based on the response status code
108-
switch {
109-
case statusCode >= 500 && statusCode < 600:
140+
if statusCode >= 500 {
110141
ch.Metrics.TotalRateLimitErrors++
111-
case statusCode >= 400 && statusCode < 500:
112-
// Assuming 4xx errors as client errors
142+
return -1
143+
} else if statusCode >= 400 {
113144
ch.Metrics.TotalRetries++
114-
}
115-
116-
// Calculate error rate
117-
totalRequests := float64(ch.Metrics.TotalRequests)
118-
totalErrors := float64(ch.Metrics.TotalRateLimitErrors + ch.Metrics.TotalRetries)
119-
errorRate := totalErrors / totalRequests
120-
121-
// Set the new error rate in the metrics
122-
ch.Metrics.ResponseCodeMetrics.ErrorRate = errorRate
123-
124-
// Determine action based on the error rate
125-
if errorRate > ErrorRateThreshold {
126-
// Suggest decrease concurrency
127145
return -1
128-
} else if errorRate <= ErrorRateThreshold && len(ch.sem) < MaxConcurrency {
129-
// Suggest increase concurrency if there is capacity
130-
return 1
131146
}
147+
132148
return 0
133149
}
134150

135151
// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
152+
// func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
153+
// ch.Metrics.Lock.Lock()
154+
// defer ch.Metrics.Lock.Unlock()
155+
156+
// // Update ResponseTimeVariability metrics
157+
// ch.Metrics.ResponseTimeVariability.Lock.Lock()
158+
// defer ch.Metrics.ResponseTimeVariability.Lock.Unlock()
159+
// ch.Metrics.ResponseTimeVariability.Total += responseTime
160+
// ch.Metrics.ResponseTimeVariability.Count++
161+
162+
// // Calculate average response time
163+
// ch.Metrics.ResponseTimeVariability.Average = ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)
164+
165+
// // Calculate variance of response times
166+
// ch.Metrics.ResponseTimeVariability.Variance = ch.calculateVariance(ch.Metrics.ResponseTimeVariability.Average, responseTime)
167+
168+
// // Calculate standard deviation of response times
169+
// stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)
170+
171+
// // Determine action based on standard deviation
172+
// if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
173+
// // Suggest decrease concurrency
174+
// return -1
175+
// } else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
176+
// // Suggest increase concurrency if there is capacity
177+
// return 1
178+
// }
179+
// return 0
180+
// }
136181
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
137182
ch.Metrics.Lock.Lock()
138183
defer ch.Metrics.Lock.Unlock()
139184

140-
// Update ResponseTimeVariability metrics
141-
ch.Metrics.ResponseTimeVariability.Lock.Lock()
142-
defer ch.Metrics.ResponseTimeVariability.Lock.Unlock()
185+
// Update total response time and count
143186
ch.Metrics.ResponseTimeVariability.Total += responseTime
144187
ch.Metrics.ResponseTimeVariability.Count++
145188

146-
// Calculate average response time
147-
ch.Metrics.ResponseTimeVariability.Average = ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)
189+
// Calculate the average response time
190+
averageResponseTime := ch.Metrics.ResponseTimeVariability.Total / time.Duration(ch.Metrics.ResponseTimeVariability.Count)
148191

149-
// Calculate variance of response times
150-
ch.Metrics.ResponseTimeVariability.Variance = ch.calculateVariance(ch.Metrics.ResponseTimeVariability.Average, responseTime)
192+
// Calculate variance
193+
variance := ch.calculateVariance(averageResponseTime, responseTime)
194+
// Calculate standard deviation
195+
stdDev := math.Sqrt(variance)
151196

152-
// Calculate standard deviation of response times
153-
stdDev := math.Sqrt(ch.Metrics.ResponseTimeVariability.Variance)
197+
// Convert MaxAcceptableResponseTimeVariability to seconds for comparison
198+
maxStdDev := MaxAcceptableResponseTimeVariability.Seconds()
154199

155-
// Determine action based on standard deviation
156-
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold {
157-
// Suggest decrease concurrency
158-
return -1
159-
} else if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < MaxConcurrency {
160-
// Suggest increase concurrency if there is capacity
161-
return 1
200+
if stdDev > maxStdDev {
201+
return -1 // Suggest to decrease concurrency if stdDev exceeds the maximum threshold
162202
}
163-
return 0
203+
return 1 // Suggest to increase concurrency if stdDev is within the acceptable range
164204
}
165205

166-
// calculateVariance calculates the variance of response times.
167-
func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duration, responseTime time.Duration) float64 {
168-
// Convert time.Duration values to seconds
169-
averageSeconds := averageResponseTime.Seconds()
170-
responseSeconds := responseTime.Seconds()
171-
172-
// Calculate variance
173-
variance := (float64(ch.Metrics.ResponseTimeVariability.Count-1)*math.Pow(averageSeconds-responseSeconds, 2) + ch.Metrics.ResponseTimeVariability.Variance) / float64(ch.Metrics.ResponseTimeVariability.Count)
174-
ch.Metrics.ResponseTimeVariability.Variance = variance
175-
return variance
206+
// calculateVariance calculates the variance between the average response time and a new sample.
207+
func (ch *ConcurrencyHandler) calculateVariance(average, newSample time.Duration) float64 {
208+
mean := average.Seconds() // Convert to seconds
209+
newValue := newSample.Seconds() // Convert to seconds
210+
newVariance := (float64(ch.Metrics.ResponseTimeVariability.Count-1)*math.Pow(mean-newValue, 2) + ch.Metrics.ResponseTimeVariability.Variance) / float64(ch.Metrics.ResponseTimeVariability.Count)
211+
ch.Metrics.ResponseTimeVariability.Variance = newVariance // Update the variance in metrics
212+
return newVariance
176213
}

0 commit comments

Comments
 (0)