Skip to content

Commit

Permalink
Add writing mutex (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
jabielecki authored Jun 18, 2024
1 parent 951293b commit 5a70bc0
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const DefaultBackoffDelayFactor float64 = 3
// Client is an HTTP FMC client.
// Use fmc.NewClient to initiate a client.
// This will ensure proper cookie handling and processing of modifiers.
//
// Requests are protected from concurrent writing (concurrent DELETE/POST/PUT),
// across all API paths. Any GET requests, or requests from different clients
// are not protected against concurrent writing.
type Client struct {
// HttpClient is the *http.Client used for API requests.
HttpClient *http.Client
Expand Down Expand Up @@ -63,6 +67,9 @@ type Client struct {
Domains map[string]string

RateLimiterBucket *ratelimit.Bucket

// writingMutex protects against concurrent DELETE/POST/PUT requests towards the API.
writingMutex *sync.Mutex
}

// NewClient creates a new FMC HTTP client.
Expand Down Expand Up @@ -92,6 +99,7 @@ func NewClient(url, usr, pwd string, mods ...func(*Client)) (Client, error) {
BackoffDelayFactor: DefaultBackoffDelayFactor,
AuthenticationMutex: &sync.Mutex{},
RateLimiterBucket: ratelimit.NewBucketWithRate(1.66, 1), // 1.66 req/s == 100 req/min
writingMutex: &sync.Mutex{},
}

for _, mod := range mods {
Expand Down Expand Up @@ -181,15 +189,7 @@ func (client *Client) Do(req Req) (Res, error) {
var res Res

for attempts := 0; ; attempts++ {
client.RateLimiterBucket.Wait(1) // Block until rate limit token available
req.HttpReq.Body = io.NopCloser(bytes.NewBuffer(body))
if req.LogPayload {
log.Printf("[DEBUG] HTTP Request: %s, %s, %s", req.HttpReq.Method, req.HttpReq.URL, req.HttpReq.Body)
} else {
log.Printf("[DEBUG] HTTP Request: %s, %s", req.HttpReq.Method, req.HttpReq.URL)
}

httpRes, err := client.HttpClient.Do(req.HttpReq)
httpRes, err := client.do(req, body)
if err != nil {
if ok := client.Backoff(attempts); !ok {
log.Printf("[ERROR] HTTP Connection error occured: %+v", err)
Expand Down Expand Up @@ -244,6 +244,24 @@ func (client *Client) Do(req Req) (Res, error) {
return res, nil
}

func (client *Client) do(req Req, body []byte) (*http.Response, error) {
client.RateLimiterBucket.Wait(1) // Block until rate limit token available

if req.HttpReq.Method != "GET" {
client.writingMutex.Lock()
defer client.writingMutex.Unlock()
}

req.HttpReq.Body = io.NopCloser(bytes.NewBuffer(body))
if req.LogPayload {
log.Printf("[DEBUG] HTTP Request: %s, %s, %s", req.HttpReq.Method, req.HttpReq.URL, string(body))
} else {
log.Printf("[DEBUG] HTTP Request: %s, %s", req.HttpReq.Method, req.HttpReq.URL)
}

return client.HttpClient.Do(req.HttpReq)
}

// Get makes a GET request and returns a GJSON result.
// Results will be the raw data structure as returned by FMC
func (client *Client) Get(path string, mods ...func(*Req)) (Res, error) {
Expand Down Expand Up @@ -384,7 +402,7 @@ func (client *Client) Authenticate() error {

// Backoff waits following an exponential backoff algorithm
func (client *Client) Backoff(attempts int) bool {
log.Printf("[DEBUG] Begining backoff method: attempts %v on %v", attempts, client.MaxRetries)
log.Printf("[DEBUG] Beginning backoff method: attempt %v of %v", attempts, client.MaxRetries)
if attempts >= client.MaxRetries {
log.Printf("[DEBUG] Exit from backoff method with return value false")
return false
Expand Down

0 comments on commit 5a70bc0

Please sign in to comment.