From 9f33aa4e4a53d25d118526ece2d3989292ef3c76 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Thu, 29 Jan 2026 16:36:30 +0200 Subject: [PATCH 01/19] salesforce implementation - phase 1 --- .../impl/salesforce/processor_salesforce.go | 214 +++++++++++ .../impl/salesforce/salesforcehttp/client.go | 221 ++++++++++++ .../http_metrics/http_metrics.go | 110 ++++++ .../salesforcehttp/salesforce_helper.go | 332 ++++++++++++++++++ .../impl/salesforce/salesforcehttp/types.go | 33 ++ 5 files changed, 910 insertions(+) create mode 100644 internal/impl/salesforce/processor_salesforce.go create mode 100644 internal/impl/salesforce/salesforcehttp/client.go create mode 100644 internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go create mode 100644 internal/impl/salesforce/salesforcehttp/salesforce_helper.go create mode 100644 internal/impl/salesforce/salesforcehttp/types.go diff --git a/internal/impl/salesforce/processor_salesforce.go b/internal/impl/salesforce/processor_salesforce.go new file mode 100644 index 0000000000..50a4b3b092 --- /dev/null +++ b/internal/impl/salesforce/processor_salesforce.go @@ -0,0 +1,214 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package salesforce provides a Benthos salesforceProcessor that integrates with the Salesforce APIs +// to fetch data based on input messages. It allows querying Salesforce resources +// such as .... TODO + +package salesforce + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// salesforceProcessor is the Benthos salesforceProcessor implementation for Salesforce queries. +// It holds the client state and orchestrates calls into the salesforcehttp package. +type salesforceProcessor struct { + log *service.Logger + client *salesforcehttp.Client +} + +// SObjectList is the response from all the available sObjects +type SObjectList struct { + Encoding string `json:"encoding"` + MaxBatchSize int `json:"maxBatchSize"` + Sobjects []SObject `json:"sobjects"` +} + +// SObject is the minimal representation of an sObject +type SObject struct { + Name string `json:"name"` +} + +// DescribeResult sObject result +type DescribeResult struct { + Fields []struct { + Name string `json:"name"` + } `json:"fields"` +} + +// QueryResult of the salesforce search query +type QueryResult struct { + TotalSize int `json:"totalSize"` + Done bool `json:"done"` +} + +// newSalesforceProcessorConfigSpec creates a new Configuration specification for the Salesforce processor +func newSalesforceProcessorConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Summary("Fetches data from Salesforce based on input messages"). + Description(`This salesforceProcessor takes input messages containing Salesforce queries and returns Salesforce data. + +Supports the following Salesforce resources: +- todo + +Configuration examples: + +` + "```configYAML" + ` +# Minimal configuration +pipeline: + processors: + - salesforce: + org_url: "https://your-domain.salesforce.com" + client_id: "${SALESFORCE_CLIENT_ID}" + client_secret: "${SALESFORCE_CLIENT_SECRET}" + +# Full configuration +pipeline: + processors: + - salesforce: + org_url: "https://your-domain.salesforce.com" + client_id: "${SALESFORCE_CLIENT_ID}" + client_secret: "${SALESFORCE_CLIENT_SECRET}" + restapi_version: "v64.0" + request_timeout: "30s" + max_retries: 50 +` + "```"). + Field(service.NewStringField("org_url"). + Description("Salesforce instance base URL (e.g., https://your-domain.salesforce.com)")). + Field(service.NewStringField("client_id"). + Description("Client ID for the Salesforce Connected App")). + Field(service.NewStringField("client_secret"). + Description("Client Secret for the Salesforce Connected App"). + Secret()). + Field(service.NewStringField("restapi_version"). + Description("Salesforce REST API version to use (example: v64.0). Default: v65.0"). + Default("v65.0")). + Field(service.NewDurationField("request_timeout"). + Description("HTTP request timeout"). + Default("30s")). + Field(service.NewIntField("max_retries"). + Description("Maximum number of retries in case of 429 HTTP Status Code"). + Default(10)) +} + +func newSalesforceProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*salesforceProcessor, error) { + + orgURL, err := conf.FieldString("org_url") + if err != nil { + return nil, err + } + + if _, err := url.ParseRequestURI(orgURL); err != nil { + return nil, errors.New("org_url is not a valid URL") + } + + clientId, err := conf.FieldString("client_id") + if err != nil { + return nil, err + } + + clientSecret, err := conf.FieldString("client_secret") + if err != nil { + return nil, err + } + + apiVersion, err := conf.FieldString("restapi_version") + if err != nil { + return nil, err + } + + timeout, err := conf.FieldDuration("request_timeout") + if err != nil { + return nil, err + } + + maxRetries, err := conf.FieldInt("max_retries") + if err != nil { + return nil, err + } + + httpClient := &http.Client{Timeout: timeout} + + salesforceHttp, err := salesforcehttp.NewClient(mgr.Logger(), orgURL, clientId, clientSecret, apiVersion, maxRetries, mgr.Metrics(), httpClient) + if err != nil { + return nil, err + } + + return &salesforceProcessor{ + client: salesforceHttp, + log: mgr.Logger(), + }, nil +} + +func (s *salesforceProcessor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) { + var batch service.MessageBatch + inputMsg, err := msg.AsBytes() + if err != nil { + return nil, err + } + s.log.Debugf("Fetching from Salesforce.. Input: %s", string(inputMsg)) + + res, err := s.client.GetAvailableResources(ctx) + if err != nil { + return nil, err + } + + m := service.NewMessage(res) + batch = append(batch, m) + + return batch, nil +} + +func extractFieldNames(describeJSON []byte) ([]string, error) { + var dr DescribeResult + if err := json.Unmarshal(describeJSON, &dr); err != nil { + return nil, err + } + + fields := make([]string, 0, len(dr.Fields)) + for _, f := range dr.Fields { + fields = append(fields, f.Name) + } + + return fields, nil +} + +func buildSOQL(objectName string, fields []string) string { + fieldList := strings.Join(fields, ",+") + return fmt.Sprintf("SELECT+%s+FROM+%s", fieldList, objectName) +} + +func (*salesforceProcessor) Close(context.Context) error { return nil } + +func init() { + if err := service.RegisterProcessor( + "salesforce", newSalesforceProcessorConfigSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + return newSalesforceProcessor(conf, mgr) + }, + ); err != nil { + panic(err) + } +} diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go new file mode 100644 index 0000000000..7c89fbcc0e --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -0,0 +1,221 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// client.go implements low-level interactions with the Salesforce REST API. +// It defines the base API path, provides a helper for making authenticated Salesforce API requests with retry +// and error handling, and exposes utilities for retrieving custom fields. +// +// These functions are primarily used by the Salesforce processor when preparing +// queries and resolving custom field identifiers. + +package salesforcehttp + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + + "github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp/http_metrics" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// salesforceAPIBasePath is the base path for Salesforce Rest API +const salesforceAPIBasePath = "/services" + +// This is the general function that calls Salesforce API on a specific URL using the URL object. +// It applies standard header parameters to all calls, Authorization, User-Agent and Accept. +// It uses the helper functions to check against possible response codes and handling the retry-after mechanism +func (s *Client) callSalesforceApi(ctx context.Context, u *url.URL) ([]byte, error) { + s.log.Debugf("API call: %s", u.String()) + + if s.bearerToken == "" { + err := s.updateAndSetBearerToken(ctx) + if err != nil { + return nil, err + } + } + + body, err := s.doSalesforceRequest(ctx, u) + if err == nil { + return body, nil + } + + // Check if it's an HTTPError + httpErr, ok := err.(*HTTPError) + if !ok { + return nil, err + } + + // Only refresh on 401 + if httpErr.StatusCode != http.StatusUnauthorized { + return nil, err + } + + s.log.Warn("Salesforce token expired, refreshing token...") + // Refresh token + if err := s.updateAndSetBearerToken(ctx); err != nil { + return nil, fmt.Errorf("failed to refresh token: %w", err) + } + + // Retry once + retryBody, retryErr := s.doSalesforceRequest(ctx, u) + + if retryErr != nil { + return nil, fmt.Errorf("request failed: %v", retryErr) + } + + return retryBody, nil +} + +func (s *Client) doSalesforceRequest(ctx context.Context, u *url.URL) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %v", err) + } + + req.Header.Set("Accept", "application/json") + req.Header.Set("User-Agent", "Redpanda-Connect") + req.Header.Set("Authorization", "Bearer "+s.bearerToken) + + return DoRequestWithRetries(ctx, s.httpClient, req, s.retryOpts) +} + +// Function to get the Bearer token from Salesforce Oauth2.0 endpoint using client credentials grant type along with client id and client secret +func (s *Client) updateAndSetBearerToken(ctx context.Context) error { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/oauth2/token") + if err != nil { + return fmt.Errorf("invalid URL: %v", err) + } + + query := apiUrl.Query() + query.Set("grant_type", "client_credentials") + query.Set("client_id", s.clientId) + query.Set("client_secret", s.clientSecret) + apiUrl.RawQuery = query.Encode() + + req, err := http.NewRequestWithContext(ctx, "POST", apiUrl.String(), nil) + if err != nil { + return fmt.Errorf("failed to create request: %v", err) + } + + req.Header.Set("Accept", "application/json") + req.Header.Set("User-Agent", "Redpanda-Connect") + + body, err := DoRequestWithRetries(ctx, s.httpClient, req, s.retryOpts) + if err != nil { + return fmt.Errorf("request failed: %v", err) + } + + var result SalesforceAuthResponse + if err := json.Unmarshal(body, &result); err != nil { + return fmt.Errorf("cannot map response to custom field struct: %w", err) + } + s.bearerToken = result.AccessToken + + return nil +} + +// GetAvailableResources function to call get available resources endpoint +// Used mainly for auth testing purposes +func (s *Client) GetAvailableResources(ctx context.Context) ([]byte, error) { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion) + if err != nil { + return nil, fmt.Errorf("invalid URL: %v", err) + } + + body, err := s.callSalesforceApi(ctx, apiUrl) + if err != nil { + return nil, err + } + + return body, nil +} + +// GetAllSObjectResources function to get available sObjects endpoint +func (s *Client) GetAllSObjectResources(ctx context.Context) ([]byte, error) { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/sobjects") + if err != nil { + return nil, fmt.Errorf("invalid URL: %v", err) + } + + body, err := s.callSalesforceApi(ctx, apiUrl) + if err != nil { + return nil, err + } + + return body, nil +} + +// GetSObjectResource function to call receive the description of the sObject +func (s *Client) GetSObjectResource(ctx context.Context, sObj string) ([]byte, error) { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/sobjects/" + sObj + "/describe") + if err != nil { + return nil, fmt.Errorf("invalid URL: %v", err) + } + + body, err := s.callSalesforceApi(ctx, apiUrl) + if err != nil { + return nil, err + } + + return body, nil +} + +// GetSObjectData function to call receive the description of the sObject +func (s *Client) GetSObjectData(ctx context.Context, query string) ([]byte, error) { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/query?q=" + query) + if err != nil { + return nil, fmt.Errorf("invalid URL: %v", err) + } + + body, err := s.callSalesforceApi(ctx, apiUrl) + if err != nil { + return nil, err + } + + return body, nil +} + +// Client is the implementation of Salesforce API queries. It holds the client state and orchestrates calls into the salesforcehttp package. +type Client struct { + orgURL string + clientId string + clientSecret string + apiVersion string + bearerToken string + httpClient *http.Client + retryOpts RetryOptions + log *service.Logger +} + +// NewClient is the constructor ofr a Client object +func NewClient(log *service.Logger, orgUrl, clientId, clientSecret, apiVersion string, maxRetries int, metrics *service.Metrics, httpClient *http.Client) (*Client, error) { + return &Client{ + log: log, + orgURL: orgUrl, + clientId: clientId, + clientSecret: clientSecret, + apiVersion: apiVersion, + retryOpts: RetryOptions{ + MaxRetries: maxRetries, + }, + httpClient: http_metrics.NewInstrumentedClient( + metrics, "salesforce_http", + httpClient), + bearerToken: "", + }, nil +} diff --git a/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go b/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go new file mode 100644 index 0000000000..e9243a5e21 --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go @@ -0,0 +1,110 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http_metrics + +import ( + "net/http" + "sync/atomic" + "time" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// Transport is a wrapper around an http.RoundTripper that tracks request metrics. +type Transport struct { + base http.RoundTripper + metrics *service.Metrics + ns string + + inflight *service.MetricGauge + inflightVal int64 // track ourselves; gauge uses Set() + total *service.MetricCounter + errors *service.MetricCounter + status2xx *service.MetricCounter + status4xx *service.MetricCounter + status5xx *service.MetricCounter + duration *service.MetricTimer +} + +// NewTransport creates new Transport with metrics instrumentation. It takes a metrics +// instance, namespace string for metric names, and an optional HTTP transport (if nil, creates a new one). +// The function returns Transport that tracks request metrics such as in-flight requests, response status codes, errors, and request duration. +func NewTransport(m *service.Metrics, namespace string, base http.RoundTripper) *Transport { + if base == nil { + base = http.DefaultTransport + } + return &Transport{ + base: base, + metrics: m, + ns: namespace, + inflight: m.NewGauge(namespace + "_in_flight"), + total: m.NewCounter(namespace + "_requests_total"), + errors: m.NewCounter(namespace + "_requests_errors"), + status2xx: m.NewCounter(namespace + "_responses_2xx"), + status4xx: m.NewCounter(namespace + "_responses_4xx"), + status5xx: m.NewCounter(namespace + "_responses_5xx"), + duration: m.NewTimer(namespace + "_request_duration"), + } +} + +// RoundTrip implements the http.RoundTripper interface. +func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { + start := time.Now() + + // in-flight ++ + atomic.AddInt64(&t.inflightVal, 1) + t.inflight.Set(atomic.LoadInt64(&t.inflightVal)) + + // always record end-of-request updates + defer func() { + // duration in nanoseconds (MetricTimer expects int64) + t.duration.Timing(time.Since(start).Nanoseconds()) + // in-flight -- + atomic.AddInt64(&t.inflightVal, -1) + t.inflight.Set(atomic.LoadInt64(&t.inflightVal)) + }() + + t.total.Incr(1) + + resp, err := t.base.RoundTrip(req) + if err != nil { + t.errors.Incr(1) + return nil, err + } + + switch code := resp.StatusCode; { + case code >= 200 && code < 300: + t.status2xx.Incr(1) + case code >= 400 && code < 500: + t.status4xx.Incr(1) + case code >= 500: + t.status5xx.Incr(1) + } + + return resp, nil +} + +// NewInstrumentedClient creates a new HTTP client with metrics instrumentation. It takes a metrics +// instance, namespace string for metric names, and an optional HTTP client (if nil, creates a new one). +// The function returns a clone of the input client with an instrumented transport that tracks request +// metrics such as in-flight requests, response status codes, errors, and request duration. +func NewInstrumentedClient(m *service.Metrics, namespace string, client *http.Client) *http.Client { + if client == nil { + client = &http.Client{} + } + clone := *client + clone.Transport = NewTransport(m, namespace, client.Transport) + return &clone +} diff --git a/internal/impl/salesforce/salesforcehttp/salesforce_helper.go b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go new file mode 100644 index 0000000000..5070d3390d --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go @@ -0,0 +1,332 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// salesforce_helper.go provides helpers for making HTTP requests +// with Salesforce-specific authentication and rate-limiting handling. +// It wraps HTTP responses to detect common error cases such as +// 401/403 Unauthorized, 429 Too Many Requests, and Salesforce’s +// X-Seraph-LoginReason header, which may indicate an authentication +// problem even on a 200 OK response. +// +// Overview +// - Provides helpers for interpreting Salesforce HTTP responses with a focus on +// authentication and rate-limiting signals. +// - Central entry point: CheckSalesforceAuth(resp) which examines an http.Response and +// returns a *SalesforceError for common Salesforce conditions: +// - 401 Unauthorized: Likely invalid credentials (email/API token). +// - 403 Forbidden: Authenticated but insufficient permissions. +// - 429 Too Many Requests: Salesforce is throttling; check Retry-After header. +// - On success (no issues detected), CheckSalesforceAuth returns nil. +// - On failure, SalesforceError includes StatusCode, Reason, Body, and Headers. +// +// When to use +// - Immediately after getting an *http.Response from Salesforce (e.g., client.Do(req)), +// before processing the body. This gives a consistent, structured way to react +// to auth and rate-limit conditions. +// +// Quick Start +// +// 1. Basic request and auth check +// +// ctx := context.Background() +// req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https:your-domain.atlassian.net/rest/api/3/myself", nil) +// req.SetBasicAuth("", "") +// req.Header.Set("Accept", "application/json") +// req.Header.Set("User-Agent", "YourApp/1.0") +// +// resp, err := http.DefaultClient.Do(req) +// if err != nil { +// transport or network error +// log.Fatalf("request failed: %v", err) +// } +// defer resp.Body.Close() +// +// if jerr := salesforce_helper.CheckSalesforceAuth(resp); jerr != nil { +// // This includes 401, 403, 429, and 200 + X-Seraph-LoginReason problems. +// // You can inspect the error for details. +// if je, ok := jerr.(*salesforce_helper.SalesforceError); ok { +// log.Printf("salesforce error: status=%d reason=%s", je.StatusCode, je.Reason) +// log.Printf("headers: %v", je.Headers) +// //Optionally log or parse a truncated version of je.Body. +// } +// return +// } +// +// // Safe to read/parse the response body here. +// data, _ := io.ReadAll(resp.Body) +// log.Printf("success: %s", string(data)) +// +// 2. Handling rate limiting (429) with Retry-After +// +// // If CheckSalesforceAuth returns a SalesforceError with StatusCode 429, look for Retry-After. +// jerr := salesforce_helper.CheckSalesforceAuth(resp) +// if jerr != nil { +// if je, ok := jerr.(*salesforce_helper.SalesforceError); ok && je.StatusCode == http.StatusTooManyRequests { +// retryAfter := je.Headers.Get("Retry-After") // integer seconds expected +// // Convert to a duration and sleep before retrying +// if secs, convErr := strconv.Atoi(strings.TrimSpace(retryAfter)); convErr == nil && secs >= 0 { +// time.Sleep(time.Duration(secs) * time.Second) +// //retry the request here +// } else { +// // Fallback: use a default backoff (e.g., exponential with jitter) before retry +// } +// } +// } +// +// +// 3. Centralized error handling +// +// jerr := salesforce_helper.CheckSalesforceAuth(resp) +// if jerr != nil { +// // Use errors.As to extract *SalesforceError +// var je *salesforce_helper.SalesforceError +// if errors.As(jerr, &je) { +// switch je.StatusCode { +// case http.StatusUnauthorized: +// // refresh token, prompt re-login, etc. +// case http.StatusForbidden: +// // insufficient permissions: inform user or adjust scopes +// case http.StatusTooManyRequests: +// // back off and retry later +// default: +// // 200 with X-Seraph-LoginReason or other 4xx/5xx +// } +// } +// return +// } +// // proceed to parse resp.Body +// +// 4. Example helper wrapping a Salesforce call +// +// func callSalesforce(ctx context.Context, client *http.Client, req *http.Request) ([]byte, *salesforce_helper.SalesforceError) { +// resp, err := client.Do(req.WithContext(ctx)) +// if err != nil { +// return nil, &salesforce_helper.SalesforceError{ +// StatusCode: 0, +// Reason: fmt.Sprintf("transport error: %v", err), +// } +// } +// defer resp.Body.Close() +// +// if jerr := salesforce_helper.CheckSalesforceAuth(resp); jerr != nil { +// if je, ok := jerr.(*salesforce_helper.SalesforceError); ok { +// return nil, je +// } +// // Wrap non-SalesforceError just in case (shouldn't happen). +// return nil, &salesforce_helper.SalesforceError{StatusCode: resp.StatusCode, Reason: jerr.Error()} +// } +// +// data, readErr := io.ReadAll(resp.Body) +// if readErr != nil { +// return nil, &salesforce_helper.SalesforceError{StatusCode: resp.StatusCode, Reason: fmt.Sprintf("read error: %v", readErr)} +// } +// return data, nil +// } +// +// Inspecting SalesforceError +// - On error, CheckSalesforceAuth returns *SalesforceError containing: +// - StatusCode: HTTP status (or 200 in header-signaled cases). +// - Reason: High-level explanation (or http.StatusText for generic 4xx/5xx). +// - Body: Response body string (caller may truncate before logging). +// - Headers: Cloned response headers for further inspection. +// +// - Example: +// +// if jerr := salesforce_helper.CheckSalesforceAuth(resp); jerr != nil { +// if je, ok := jerr.(*salesforce_helper.SalesforceError); ok { +// fmt.Printf("status=%d reason=%s\n", je.StatusCode, je.Reason) +// } +// } +// +// Best Practices +// - Always set a User-Agent to identify your application. +// - Use context timeouts or deadlines to avoid hanging requests. +// - Be mindful when logging bodies; they may contain sensitive or large content. +// - Respect Retry-After when present; otherwise choose a conservative backoff. +// - Prefer idempotent methods for automatic retries; confirm business safety for POST/PUT. +// - Consider centralizing Salesforce error handling via CheckSalesforceAuth in your HTTP layer. +// +// Dependencies +// - Standard library only: net/http, io, fmt. + +package salesforcehttp + +import ( + "context" + "fmt" + "io" + "math/rand" + "net/http" + "strconv" + "strings" + "time" +) + +// HTTPError wraps non-2xx responses with useful context. +type HTTPError struct { + StatusCode int + Reason string + Body string + Headers http.Header +} + +func (e *HTTPError) Error() string { + return fmt.Sprintf("http error: status=%d reason=%s", e.StatusCode, e.Reason) +} + +// AuthHeaderPolicy allows callers to declare a header that signals an auth problem +// even on 200 OK responses (e.g., "X-Seraph-LoginReason"). +type AuthHeaderPolicy struct { + HeaderName string // case-insensitive + IsProblem func(val string) bool // return true if the header value indicates auth failure +} + +// RetryOptions controls the behavior of DoRequestWithRetries. +type RetryOptions struct { + MaxRetries int // retries for 429 Too Many Requests (0 = no retry) + BaseDelay time.Duration // base backoff (default 500 ms) + MaxDelay time.Duration // cap backoff (default 30s) + AuthHeaderPolicy *AuthHeaderPolicy // optional header-based auth detection +} + +func backoffWithJitter(base, maxDuration time.Duration, attempt int) time.Duration { + if base <= 0 { + base = 500 * time.Millisecond + } + if maxDuration <= 0 { + maxDuration = 30 * time.Second + } + d := base << attempt + if d > maxDuration { + d = maxDuration + } + jitter := time.Duration(rand.Int63n(int64(d))) - d/2 + return d + jitter +} + +// DoRequestWithRetries executes req, handling: +// - Auth errors on 401/403 +// - Header-signaled auth problems on 200 (via AuthHeaderPolicy) +// - 429 with Retry-After or exponential backoff and jitter (up to MaxRetries) +// Other 4xx/5xx are returned as HTTPError without a retry. +func DoRequestWithRetries( + ctx context.Context, + client *http.Client, + req *http.Request, + opts RetryOptions, +) ([]byte, error) { + if client == nil { + client = http.DefaultClient + } + attempt := 0 + + for { + resp, err := client.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + // 1) Explicit auth/permission failures + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { + body, _ := io.ReadAll(resp.Body) + err := resp.Body.Close() + if err != nil { + return nil, err + } + return nil, &HTTPError{ + StatusCode: resp.StatusCode, + Reason: "authentication/authorization failure", + Body: string(body), + Headers: resp.Header.Clone(), + } + } + + // 2) 200 OK, but a header indicates an auth/login problem + if resp.StatusCode == http.StatusOK && opts.AuthHeaderPolicy != nil { + val := strings.TrimSpace(resp.Header.Get(opts.AuthHeaderPolicy.HeaderName)) + if val != "" && opts.AuthHeaderPolicy.IsProblem(val) { + body, _ := io.ReadAll(resp.Body) + err := resp.Body.Close() + if err != nil { + return nil, err + } + return nil, &HTTPError{ + StatusCode: resp.StatusCode, + Reason: fmt.Sprintf("auth/login issue indicated by %s=%q", opts.AuthHeaderPolicy.HeaderName, val), + Body: string(body), + Headers: resp.Header.Clone(), + } + } + } + + // 3) 429 Too Many Requests => retry with Retry-After or backoff + if resp.StatusCode == http.StatusTooManyRequests { + if attempt >= opts.MaxRetries { + body, _ := io.ReadAll(resp.Body) + err := resp.Body.Close() + if err != nil { + return nil, err + } + return nil, &HTTPError{ + StatusCode: resp.StatusCode, + Reason: "rate limit exceeded; retries exhausted", + Body: string(body), + Headers: resp.Header.Clone(), + } + } + + delay := backoffWithJitter(opts.BaseDelay, opts.MaxDelay, attempt) + if ra := strings.TrimSpace(resp.Header.Get("Retry-After")); ra != "" { + if secs, err := strconv.Atoi(ra); err == nil && secs >= 0 { + delay = time.Duration(secs) * time.Second + } + } + err := resp.Body.Close() + if err != nil { + return nil, err + } + + t := time.NewTimer(delay) + select { + case <-ctx.Done(): + t.Stop() + return nil, context.Canceled + case <-t.C: + } + attempt++ + continue + } + + // 4) Other non-2xx => return as error (no auto-retry here) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + err := resp.Body.Close() + if err != nil { + return nil, err + } + return nil, &HTTPError{ + StatusCode: resp.StatusCode, + Reason: http.StatusText(resp.StatusCode), + Body: string(body), + Headers: resp.Header.Clone(), + } + } + + defer resp.Body.Close() + + // Read the response body for context + bodyBytes, _ := io.ReadAll(resp.Body) + return bodyBytes, nil + } +} diff --git a/internal/impl/salesforce/salesforcehttp/types.go b/internal/impl/salesforce/salesforcehttp/types.go new file mode 100644 index 0000000000..539dc79613 --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/types.go @@ -0,0 +1,33 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// types.go defines core data structures, response models, and enums for the Jira processor. +// It includes input query types, API response DTOs, output message formats, and resource type constants. + +package salesforcehttp + +/*** Input / DTOs ***/ + +// SalesforceAuthResponse represents the response from the salesforce auth API +// We are using SalesforceAuthResponse in this context to get the whole auth object directly from Salesforce and parse the bearer token +type SalesforceAuthResponse struct { + AccessToken string `json:"access_token"` + Signature string `json:"signature"` + Scope string `json:"scope"` + InstanceUrl string `json:"instance_url"` + Id string `json:"id"` + TokenType string `json:"token_type"` + IssuedAt string `json:"issued_at"` + ApiInstanceUrl string `json:"api_instance_url"` +} From cb6d0e818ffe1d684a14ed558c81b7f24220b813 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Sun, 1 Feb 2026 23:12:03 +0200 Subject: [PATCH 02/19] add tests --- .../salesforce/processor_salesforce_test.go | 149 ++++++++++++++++++ .../salesforce/salesforcehttp/client_test.go | 122 ++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 internal/impl/salesforce/processor_salesforce_test.go create mode 100644 internal/impl/salesforce/salesforcehttp/client_test.go diff --git a/internal/impl/salesforce/processor_salesforce_test.go b/internal/impl/salesforce/processor_salesforce_test.go new file mode 100644 index 0000000000..0df50a5b5f --- /dev/null +++ b/internal/impl/salesforce/processor_salesforce_test.go @@ -0,0 +1,149 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package salesforce + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +func TestSalesforceProcessorConfigValidation(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + configYAML string + wantErrSub string + }{ + { + name: "missing org_url", + configYAML: ` +client_id: "abc" +client_secret: "xyz" +`, + wantErrSub: "org_url", + }, + { + name: "invalid org_url", + configYAML: ` +org_url: "not a url" +client_id: "abc" +client_secret: "xyz" +`, + wantErrSub: "org_url", + }, + { + name: "missing client_id", + configYAML: ` +org_url: "https://example.com" +client_secret: "xyz" +`, + wantErrSub: "client_id", + }, + { + name: "missing client_secret", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +`, + wantErrSub: "client_secret", + }, + { + name: "invalid restapi_version type", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +restapi_version: 123 +`, + wantErrSub: "restapi_version", + }, + { + name: "invalid request_timeout", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +request_timeout: "not-a-duration" +`, + wantErrSub: "request_timeout", + }, + { + name: "invalid max_retries", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +max_retries: "not-an-int" +`, + wantErrSub: "max_retries", + }, + { + name: "valid minimal config", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +`, + wantErrSub: "", + }, + { + name: "valid full config", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +restapi_version: "v64.0" +request_timeout: "10s" +max_retries: 5 +`, + wantErrSub: "", + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + env := service.NewEnvironment() + spec := newSalesforceProcessorConfigSpec() + + conf, err := spec.ParseYAML(tc.configYAML, env) + + var proc service.Processor + var procErr error + if err == nil { + proc, procErr = newSalesforceProcessor(conf, conf.Resources()) + } + + if tc.wantErrSub == "" { + require.NoError(t, err, "expected config to be valid") + require.NoError(t, procErr, "expected processor to initialize") + assert.NotNil(t, proc) + } else { + // Either config parsing OR processor creation must fail + if err != nil { + require.Contains(t, err.Error(), tc.wantErrSub) + } + if procErr != nil { + require.Contains(t, procErr.Error(), tc.wantErrSub) + } + } + }) + } +} diff --git a/internal/impl/salesforce/salesforcehttp/client_test.go b/internal/impl/salesforce/salesforcehttp/client_test.go new file mode 100644 index 0000000000..751def9c84 --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/client_test.go @@ -0,0 +1,122 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package salesforcehttp + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUpdateAndSetBearerToken_RealClient(t *testing.T) { + t.Parallel() + + // Fake Salesforce OAuth server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/services/oauth2/token", r.URL.Path) + + resp := map[string]string{"access_token": "abc123"} + _ = json.NewEncoder(w).Encode(resp) + })) + defer ts.Close() + + // log := service.NewLogger("test") + + client, err := NewClient( + nil, + ts.URL, + "id", + "secret", + "v65.0", + 1, + // service.NewMetrics(), + nil, + ts.Client(), + ) + require.NoError(t, err) + + err = client.updateAndSetBearerToken(context.Background()) + require.NoError(t, err) + assert.Equal(t, "abc123", client.bearerToken) +} + +func TestCallSalesforceApi_RefreshOn401_RealClient(t *testing.T) { + t.Parallel() + + callCount := 0 + tokenIssued := false + + // Fake Salesforce server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + + // Token refresh endpoint + case "/services/oauth2/token": + tokenIssued = true + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"access_token":"new-token"}`)) + return + + // Data endpoint + case "/services/data/v65.0": + callCount++ + + // First call → 401 Unauthorized + if callCount == 1 { + w.WriteHeader(http.StatusUnauthorized) + return + } + + // Second call → success + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"ok":true}`)) + return + + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer ts.Close() + + client, err := NewClient( + nil, + ts.URL, + "id", + "secret", + "v65.0", + 1, + nil, + ts.Client(), + ) + require.NoError(t, err) + + body, err := client.callSalesforceApi(context.Background(), mustParseURL(ts.URL+"/services/data/v65.0")) + require.NoError(t, err) + + assert.Equal(t, `{"ok":true}`, string(body)) + assert.Equal(t, 2, callCount) + assert.True(t, tokenIssued, "token refresh should have been called") +} + +func mustParseURL(s string) *url.URL { + u, _ := url.Parse(s) + return u +} From 19992c03af432d6ce10320c18dd89e3afebf5610 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Mon, 2 Feb 2026 12:14:49 +0200 Subject: [PATCH 03/19] fix lint --- .../impl/salesforce/processor_salesforce.go | 23 ------------------- .../salesforce/salesforcehttp/client_test.go | 4 ++-- 2 files changed, 2 insertions(+), 25 deletions(-) diff --git a/internal/impl/salesforce/processor_salesforce.go b/internal/impl/salesforce/processor_salesforce.go index 50a4b3b092..971077f393 100644 --- a/internal/impl/salesforce/processor_salesforce.go +++ b/internal/impl/salesforce/processor_salesforce.go @@ -20,12 +20,9 @@ package salesforce import ( "context" - "encoding/json" "errors" - "fmt" "net/http" "net/url" - "strings" "github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp" @@ -114,7 +111,6 @@ pipeline: } func newSalesforceProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*salesforceProcessor, error) { - orgURL, err := conf.FieldString("org_url") if err != nil { return nil, err @@ -181,25 +177,6 @@ func (s *salesforceProcessor) Process(ctx context.Context, msg *service.Message) return batch, nil } -func extractFieldNames(describeJSON []byte) ([]string, error) { - var dr DescribeResult - if err := json.Unmarshal(describeJSON, &dr); err != nil { - return nil, err - } - - fields := make([]string, 0, len(dr.Fields)) - for _, f := range dr.Fields { - fields = append(fields, f.Name) - } - - return fields, nil -} - -func buildSOQL(objectName string, fields []string) string { - fieldList := strings.Join(fields, ",+") - return fmt.Sprintf("SELECT+%s+FROM+%s", fieldList, objectName) -} - func (*salesforceProcessor) Close(context.Context) error { return nil } func init() { diff --git a/internal/impl/salesforce/salesforcehttp/client_test.go b/internal/impl/salesforce/salesforcehttp/client_test.go index 751def9c84..f27ddfb837 100644 --- a/internal/impl/salesforce/salesforcehttp/client_test.go +++ b/internal/impl/salesforce/salesforcehttp/client_test.go @@ -72,7 +72,7 @@ func TestCallSalesforceApi_RefreshOn401_RealClient(t *testing.T) { case "/services/oauth2/token": tokenIssued = true w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"access_token":"new-token"}`)) + _, _ = w.Write([]byte(`{"access_token":"new-token"}`)) return // Data endpoint @@ -87,7 +87,7 @@ func TestCallSalesforceApi_RefreshOn401_RealClient(t *testing.T) { // Second call → success w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"ok":true}`)) + _, _ = w.Write([]byte(`{"ok":true}`)) return default: From 0d42fb614a5534c6d01911e796ffae2ab40061ab Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Fri, 6 Feb 2026 15:56:27 +0200 Subject: [PATCH 04/19] Update internal/impl/salesforce/processor_salesforce.go Co-authored-by: Joseph Woodward --- internal/impl/salesforce/processor_salesforce.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/salesforce/processor_salesforce.go b/internal/impl/salesforce/processor_salesforce.go index 971077f393..f765ae1caa 100644 --- a/internal/impl/salesforce/processor_salesforce.go +++ b/internal/impl/salesforce/processor_salesforce.go @@ -1,4 +1,4 @@ -// Copyright 2024 Redpanda Data, Inc. +// Copyright 2026 Redpanda Data, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 815a971b46c687c2802540cf4df4a62a9ef75618 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Fri, 6 Feb 2026 16:20:41 +0200 Subject: [PATCH 05/19] Update internal/impl/salesforce/salesforcehttp/client.go Co-authored-by: Joseph Woodward --- internal/impl/salesforce/salesforcehttp/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go index 7c89fbcc0e..6226de778a 100644 --- a/internal/impl/salesforce/salesforcehttp/client.go +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -202,7 +202,7 @@ type Client struct { log *service.Logger } -// NewClient is the constructor ofr a Client object +// NewClient is the constructor for a Client object func NewClient(log *service.Logger, orgUrl, clientId, clientSecret, apiVersion string, maxRetries int, metrics *service.Metrics, httpClient *http.Client) (*Client, error) { return &Client{ log: log, From 0aac7fed681b97c63f3adb6bd4932f38e65244f7 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Fri, 6 Feb 2026 16:21:02 +0200 Subject: [PATCH 06/19] Update internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go Co-authored-by: Joseph Woodward --- .../impl/salesforce/salesforcehttp/http_metrics/http_metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go b/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go index e9243a5e21..994dc7bd62 100644 --- a/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go +++ b/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go @@ -1,4 +1,4 @@ -// Copyright 2024 Redpanda Data, Inc. +// Copyright 2026 Redpanda Data, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From f3d218c1830ef5bc67f1cf189b5d2b38e82ea4a8 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Fri, 6 Feb 2026 16:39:25 +0200 Subject: [PATCH 07/19] PR changes - rename method --- internal/impl/salesforce/salesforcehttp/client.go | 10 +++++----- internal/impl/salesforce/salesforcehttp/client_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go index 6226de778a..ad32a964c5 100644 --- a/internal/impl/salesforce/salesforcehttp/client.go +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -39,7 +39,7 @@ const salesforceAPIBasePath = "/services" // This is the general function that calls Salesforce API on a specific URL using the URL object. // It applies standard header parameters to all calls, Authorization, User-Agent and Accept. // It uses the helper functions to check against possible response codes and handling the retry-after mechanism -func (s *Client) callSalesforceApi(ctx context.Context, u *url.URL) ([]byte, error) { +func (s *Client) callSalesforceAPI(ctx context.Context, u *url.URL) ([]byte, error) { s.log.Debugf("API call: %s", u.String()) if s.bearerToken == "" { @@ -137,7 +137,7 @@ func (s *Client) GetAvailableResources(ctx context.Context) ([]byte, error) { return nil, fmt.Errorf("invalid URL: %v", err) } - body, err := s.callSalesforceApi(ctx, apiUrl) + body, err := s.callSalesforceAPI(ctx, apiUrl) if err != nil { return nil, err } @@ -152,7 +152,7 @@ func (s *Client) GetAllSObjectResources(ctx context.Context) ([]byte, error) { return nil, fmt.Errorf("invalid URL: %v", err) } - body, err := s.callSalesforceApi(ctx, apiUrl) + body, err := s.callSalesforceAPI(ctx, apiUrl) if err != nil { return nil, err } @@ -167,7 +167,7 @@ func (s *Client) GetSObjectResource(ctx context.Context, sObj string) ([]byte, e return nil, fmt.Errorf("invalid URL: %v", err) } - body, err := s.callSalesforceApi(ctx, apiUrl) + body, err := s.callSalesforceAPI(ctx, apiUrl) if err != nil { return nil, err } @@ -182,7 +182,7 @@ func (s *Client) GetSObjectData(ctx context.Context, query string) ([]byte, erro return nil, fmt.Errorf("invalid URL: %v", err) } - body, err := s.callSalesforceApi(ctx, apiUrl) + body, err := s.callSalesforceAPI(ctx, apiUrl) if err != nil { return nil, err } diff --git a/internal/impl/salesforce/salesforcehttp/client_test.go b/internal/impl/salesforce/salesforcehttp/client_test.go index f27ddfb837..027ecc88b8 100644 --- a/internal/impl/salesforce/salesforcehttp/client_test.go +++ b/internal/impl/salesforce/salesforcehttp/client_test.go @@ -108,7 +108,7 @@ func TestCallSalesforceApi_RefreshOn401_RealClient(t *testing.T) { ) require.NoError(t, err) - body, err := client.callSalesforceApi(context.Background(), mustParseURL(ts.URL+"/services/data/v65.0")) + body, err := client.callSalesforceAPI(context.Background(), mustParseURL(ts.URL+"/services/data/v65.0")) require.NoError(t, err) assert.Equal(t, `{"ok":true}`, string(body)) From 4c25033ac868e19c6ae6e953ad12d78b6a05d85d Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Fri, 6 Feb 2026 16:43:11 +0200 Subject: [PATCH 08/19] pr change - move init method --- .../impl/salesforce/processor_salesforce.go | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/impl/salesforce/processor_salesforce.go b/internal/impl/salesforce/processor_salesforce.go index f765ae1caa..a3a8255e9d 100644 --- a/internal/impl/salesforce/processor_salesforce.go +++ b/internal/impl/salesforce/processor_salesforce.go @@ -61,6 +61,17 @@ type QueryResult struct { Done bool `json:"done"` } +func init() { + if err := service.RegisterProcessor( + "salesforce", newSalesforceProcessorConfigSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + return newSalesforceProcessor(conf, mgr) + }, + ); err != nil { + panic(err) + } +} + // newSalesforceProcessorConfigSpec creates a new Configuration specification for the Salesforce processor func newSalesforceProcessorConfigSpec() *service.ConfigSpec { return service.NewConfigSpec(). @@ -178,14 +189,3 @@ func (s *salesforceProcessor) Process(ctx context.Context, msg *service.Message) } func (*salesforceProcessor) Close(context.Context) error { return nil } - -func init() { - if err := service.RegisterProcessor( - "salesforce", newSalesforceProcessorConfigSpec(), - func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { - return newSalesforceProcessor(conf, mgr) - }, - ); err != nil { - panic(err) - } -} From ba175b80c2fd905ae73377bdffc5f62eb94022f1 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Fri, 6 Feb 2026 17:20:24 +0200 Subject: [PATCH 09/19] PR changes - update auth request --- .../impl/salesforce/salesforcehttp/client.go | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go index ad32a964c5..7dc0a1fa64 100644 --- a/internal/impl/salesforce/salesforcehttp/client.go +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -27,6 +27,7 @@ import ( "fmt" "net/http" "net/url" + "strings" "github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp/http_metrics" @@ -101,17 +102,18 @@ func (s *Client) updateAndSetBearerToken(ctx context.Context) error { return fmt.Errorf("invalid URL: %v", err) } - query := apiUrl.Query() - query.Set("grant_type", "client_credentials") - query.Set("client_id", s.clientId) - query.Set("client_secret", s.clientSecret) - apiUrl.RawQuery = query.Encode() + // Build form-encoded body + form := url.Values{} + form.Set("grant_type", "client_credentials") + form.Set("client_id", s.clientID) + form.Set("client_secret", s.clientSecret) + + req, err := http.NewRequestWithContext(ctx, "POST", apiUrl.String(), strings.NewReader(form.Encode())) - req, err := http.NewRequestWithContext(ctx, "POST", apiUrl.String(), nil) if err != nil { return fmt.Errorf("failed to create request: %v", err) } - + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Accept", "application/json") req.Header.Set("User-Agent", "Redpanda-Connect") @@ -193,7 +195,7 @@ func (s *Client) GetSObjectData(ctx context.Context, query string) ([]byte, erro // Client is the implementation of Salesforce API queries. It holds the client state and orchestrates calls into the salesforcehttp package. type Client struct { orgURL string - clientId string + clientID string clientSecret string apiVersion string bearerToken string @@ -207,7 +209,7 @@ func NewClient(log *service.Logger, orgUrl, clientId, clientSecret, apiVersion s return &Client{ log: log, orgURL: orgUrl, - clientId: clientId, + clientID: clientId, clientSecret: clientSecret, apiVersion: apiVersion, retryOpts: RetryOptions{ From 0470914e2289429892c83473257c6f6d6662ab6c Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Tue, 10 Feb 2026 23:32:58 +0200 Subject: [PATCH 10/19] Update internal/impl/salesforce/salesforcehttp/types.go Co-authored-by: Joseph Woodward --- internal/impl/salesforce/salesforcehttp/types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/salesforce/salesforcehttp/types.go b/internal/impl/salesforce/salesforcehttp/types.go index 539dc79613..7002296c1d 100644 --- a/internal/impl/salesforce/salesforcehttp/types.go +++ b/internal/impl/salesforce/salesforcehttp/types.go @@ -1,4 +1,4 @@ -// Copyright 2024 Redpanda Data, Inc. +// Copyright 2026 Redpanda Data, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From cbd5d0ddef74dccc6625903c75937a4ff41ed90b Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Tue, 10 Feb 2026 23:33:13 +0200 Subject: [PATCH 11/19] Update internal/impl/salesforce/processor_salesforce_test.go Co-authored-by: Joseph Woodward --- internal/impl/salesforce/processor_salesforce_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/salesforce/processor_salesforce_test.go b/internal/impl/salesforce/processor_salesforce_test.go index 0df50a5b5f..ea748efdc3 100644 --- a/internal/impl/salesforce/processor_salesforce_test.go +++ b/internal/impl/salesforce/processor_salesforce_test.go @@ -1,4 +1,4 @@ -// Copyright 2024 Redpanda Data, Inc. +// Copyright 2026 Redpanda Data, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 6ce42fcfdfff8b3d36e401879b4006538ea601fc Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Tue, 10 Feb 2026 23:34:02 +0200 Subject: [PATCH 12/19] Update internal/impl/salesforce/salesforcehttp/salesforce_helper.go Co-authored-by: Joseph Woodward --- internal/impl/salesforce/salesforcehttp/salesforce_helper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/salesforce/salesforcehttp/salesforce_helper.go b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go index 5070d3390d..2e31ea24bc 100644 --- a/internal/impl/salesforce/salesforcehttp/salesforce_helper.go +++ b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go @@ -1,4 +1,4 @@ -// Copyright 2024 Redpanda Data, Inc. +// Copyright 2026 Redpanda Data, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From b0645c4200f7c6e3e76e7f412b662727518b5b46 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Tue, 10 Feb 2026 23:34:11 +0200 Subject: [PATCH 13/19] Update internal/impl/salesforce/salesforcehttp/client_test.go Co-authored-by: Joseph Woodward --- internal/impl/salesforce/salesforcehttp/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/salesforce/salesforcehttp/client_test.go b/internal/impl/salesforce/salesforcehttp/client_test.go index 027ecc88b8..10c8e1cbcd 100644 --- a/internal/impl/salesforce/salesforcehttp/client_test.go +++ b/internal/impl/salesforce/salesforcehttp/client_test.go @@ -1,4 +1,4 @@ -// Copyright 2024 Redpanda Data, Inc. +// Copyright 2026 Redpanda Data, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From c99e95a8ecf4be2b3b39fd07a24bb3ad0f0a135f Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Tue, 10 Feb 2026 23:34:22 +0200 Subject: [PATCH 14/19] Update internal/impl/salesforce/salesforcehttp/client.go Co-authored-by: Joseph Woodward --- internal/impl/salesforce/salesforcehttp/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go index 7dc0a1fa64..cd140c6ac5 100644 --- a/internal/impl/salesforce/salesforcehttp/client.go +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -1,4 +1,4 @@ -// Copyright 2024 Redpanda Data, Inc. +// Copyright 2026 Redpanda Data, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 54adbcc8ef55bbcd5315d79e8e320a729ddcb543 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Wed, 11 Feb 2026 14:55:28 +0200 Subject: [PATCH 15/19] change logging --- .../impl/salesforce/salesforcehttp/client.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go index cd140c6ac5..f38505ad96 100644 --- a/internal/impl/salesforce/salesforcehttp/client.go +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -76,7 +76,7 @@ func (s *Client) callSalesforceAPI(ctx context.Context, u *url.URL) ([]byte, err retryBody, retryErr := s.doSalesforceRequest(ctx, u) if retryErr != nil { - return nil, fmt.Errorf("request failed: %v", retryErr) + return nil, fmt.Errorf("request failed: %w", retryErr) } return retryBody, nil @@ -85,7 +85,7 @@ func (s *Client) callSalesforceAPI(ctx context.Context, u *url.URL) ([]byte, err func (s *Client) doSalesforceRequest(ctx context.Context, u *url.URL) ([]byte, error) { req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) if err != nil { - return nil, fmt.Errorf("failed to create request: %v", err) + return nil, fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Accept", "application/json") @@ -99,7 +99,7 @@ func (s *Client) doSalesforceRequest(ctx context.Context, u *url.URL) ([]byte, e func (s *Client) updateAndSetBearerToken(ctx context.Context) error { apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/oauth2/token") if err != nil { - return fmt.Errorf("invalid URL: %v", err) + return fmt.Errorf("invalid URL: %w", err) } // Build form-encoded body @@ -111,7 +111,7 @@ func (s *Client) updateAndSetBearerToken(ctx context.Context) error { req, err := http.NewRequestWithContext(ctx, "POST", apiUrl.String(), strings.NewReader(form.Encode())) if err != nil { - return fmt.Errorf("failed to create request: %v", err) + return fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.Header.Set("Accept", "application/json") @@ -119,7 +119,7 @@ func (s *Client) updateAndSetBearerToken(ctx context.Context) error { body, err := DoRequestWithRetries(ctx, s.httpClient, req, s.retryOpts) if err != nil { - return fmt.Errorf("request failed: %v", err) + return fmt.Errorf("request failed: %w", err) } var result SalesforceAuthResponse @@ -136,7 +136,7 @@ func (s *Client) updateAndSetBearerToken(ctx context.Context) error { func (s *Client) GetAvailableResources(ctx context.Context) ([]byte, error) { apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion) if err != nil { - return nil, fmt.Errorf("invalid URL: %v", err) + return nil, fmt.Errorf("invalid URL: %w", err) } body, err := s.callSalesforceAPI(ctx, apiUrl) @@ -151,7 +151,7 @@ func (s *Client) GetAvailableResources(ctx context.Context) ([]byte, error) { func (s *Client) GetAllSObjectResources(ctx context.Context) ([]byte, error) { apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/sobjects") if err != nil { - return nil, fmt.Errorf("invalid URL: %v", err) + return nil, fmt.Errorf("invalid URL: %w", err) } body, err := s.callSalesforceAPI(ctx, apiUrl) @@ -166,7 +166,7 @@ func (s *Client) GetAllSObjectResources(ctx context.Context) ([]byte, error) { func (s *Client) GetSObjectResource(ctx context.Context, sObj string) ([]byte, error) { apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/sobjects/" + sObj + "/describe") if err != nil { - return nil, fmt.Errorf("invalid URL: %v", err) + return nil, fmt.Errorf("invalid URL: %w", err) } body, err := s.callSalesforceAPI(ctx, apiUrl) @@ -181,7 +181,7 @@ func (s *Client) GetSObjectResource(ctx context.Context, sObj string) ([]byte, e func (s *Client) GetSObjectData(ctx context.Context, query string) ([]byte, error) { apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/query?q=" + query) if err != nil { - return nil, fmt.Errorf("invalid URL: %v", err) + return nil, fmt.Errorf("invalid URL: %w", err) } body, err := s.callSalesforceAPI(ctx, apiUrl) From 35905a69917a5d113281f03b2a09670273277cc9 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Wed, 11 Feb 2026 16:13:10 +0200 Subject: [PATCH 16/19] change metrics to label --- .../http_metrics/http_metrics.go | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go b/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go index 994dc7bd62..6667e5b36a 100644 --- a/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go +++ b/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go @@ -16,6 +16,7 @@ package http_metrics import ( "net/http" + "strconv" "sync/atomic" "time" @@ -29,13 +30,12 @@ type Transport struct { ns string inflight *service.MetricGauge - inflightVal int64 // track ourselves; gauge uses Set() - total *service.MetricCounter - errors *service.MetricCounter - status2xx *service.MetricCounter - status4xx *service.MetricCounter - status5xx *service.MetricCounter - duration *service.MetricTimer + inflightVal int64 + + total *service.MetricCounter + errors *service.MetricCounter + status *service.MetricCounter + duration *service.MetricTimer } // NewTransport creates new Transport with metrics instrumentation. It takes a metrics @@ -45,17 +45,16 @@ func NewTransport(m *service.Metrics, namespace string, base http.RoundTripper) if base == nil { base = http.DefaultTransport } + return &Transport{ - base: base, - metrics: m, - ns: namespace, - inflight: m.NewGauge(namespace + "_in_flight"), - total: m.NewCounter(namespace + "_requests_total"), - errors: m.NewCounter(namespace + "_requests_errors"), - status2xx: m.NewCounter(namespace + "_responses_2xx"), - status4xx: m.NewCounter(namespace + "_responses_4xx"), - status5xx: m.NewCounter(namespace + "_responses_5xx"), - duration: m.NewTimer(namespace + "_request_duration"), + base: base, + metrics: m, + ns: namespace, + inflight: m.NewGauge(namespace + "_in_flight"), + total: m.NewCounter(namespace + "_requests_total"), + errors: m.NewCounter(namespace + "_requests_errors"), + status: m.NewCounter(namespace+"_responses", "status_code"), + duration: m.NewTimer(namespace + "_request_duration"), } } @@ -84,14 +83,8 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { return nil, err } - switch code := resp.StatusCode; { - case code >= 200 && code < 300: - t.status2xx.Incr(1) - case code >= 400 && code < 500: - t.status4xx.Incr(1) - case code >= 500: - t.status5xx.Incr(1) - } + codeStr := strconv.Itoa(resp.StatusCode) + t.status.Incr(1, codeStr) return resp, nil } From 5b72f5b9270b726223214c126f8ea3311c5f10b3 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Fri, 13 Feb 2026 14:39:10 +0200 Subject: [PATCH 17/19] pr changes --- .../impl/salesforce/processor_salesforce.go | 14 ++++---- .../impl/salesforce/salesforcehttp/client.go | 34 +++++++++++++------ .../salesforce/salesforcehttp/client_test.go | 11 +++--- .../http_metrics/http_metrics.go | 2 +- .../salesforcehttp/salesforce_helper.go | 25 +++++++------- .../impl/salesforce/salesforcehttp/types.go | 2 +- 6 files changed, 47 insertions(+), 41 deletions(-) diff --git a/internal/impl/salesforce/processor_salesforce.go b/internal/impl/salesforce/processor_salesforce.go index a3a8255e9d..ba4f62242f 100644 --- a/internal/impl/salesforce/processor_salesforce.go +++ b/internal/impl/salesforce/processor_salesforce.go @@ -131,7 +131,7 @@ func newSalesforceProcessor(conf *service.ParsedConfig, mgr *service.Resources) return nil, errors.New("org_url is not a valid URL") } - clientId, err := conf.FieldString("client_id") + clientID, err := conf.FieldString("client_id") if err != nil { return nil, err } @@ -155,10 +155,13 @@ func newSalesforceProcessor(conf *service.ParsedConfig, mgr *service.Resources) if err != nil { return nil, err } + if maxRetries < 0 { + return nil, errors.New("max_retries must not be negative") + } httpClient := &http.Client{Timeout: timeout} - salesforceHttp, err := salesforcehttp.NewClient(mgr.Logger(), orgURL, clientId, clientSecret, apiVersion, maxRetries, mgr.Metrics(), httpClient) + salesforceHttp, err := salesforcehttp.NewClient(orgURL, clientID, clientSecret, apiVersion, maxRetries, httpClient, mgr.Logger(), mgr.Metrics()) if err != nil { return nil, err } @@ -169,13 +172,8 @@ func newSalesforceProcessor(conf *service.ParsedConfig, mgr *service.Resources) }, nil } -func (s *salesforceProcessor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) { +func (s *salesforceProcessor) Process(ctx context.Context, _ *service.Message) (service.MessageBatch, error) { var batch service.MessageBatch - inputMsg, err := msg.AsBytes() - if err != nil { - return nil, err - } - s.log.Debugf("Fetching from Salesforce.. Input: %s", string(inputMsg)) res, err := s.client.GetAvailableResources(ctx) if err != nil { diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go index f38505ad96..a301f79a00 100644 --- a/internal/impl/salesforce/salesforcehttp/client.go +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -28,6 +28,7 @@ import ( "net/http" "net/url" "strings" + "sync" "github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp/http_metrics" @@ -43,9 +44,12 @@ const salesforceAPIBasePath = "/services" func (s *Client) callSalesforceAPI(ctx context.Context, u *url.URL) ([]byte, error) { s.log.Debugf("API call: %s", u.String()) - if s.bearerToken == "" { - err := s.updateAndSetBearerToken(ctx) - if err != nil { + s.tokenMu.Lock() + needsToken := s.bearerToken == "" + s.tokenMu.Unlock() + + if needsToken { + if err := s.updateAndSetBearerToken(ctx); err != nil { return nil, err } } @@ -74,7 +78,6 @@ func (s *Client) callSalesforceAPI(ctx context.Context, u *url.URL) ([]byte, err // Retry once retryBody, retryErr := s.doSalesforceRequest(ctx, u) - if retryErr != nil { return nil, fmt.Errorf("request failed: %w", retryErr) } @@ -88,9 +91,13 @@ func (s *Client) doSalesforceRequest(ctx context.Context, u *url.URL) ([]byte, e return nil, fmt.Errorf("failed to create request: %w", err) } + s.tokenMu.Lock() + token := s.bearerToken + s.tokenMu.Unlock() + req.Header.Set("Accept", "application/json") req.Header.Set("User-Agent", "Redpanda-Connect") - req.Header.Set("Authorization", "Bearer "+s.bearerToken) + req.Header.Set("Authorization", "Bearer "+token) return DoRequestWithRetries(ctx, s.httpClient, req, s.retryOpts) } @@ -99,7 +106,7 @@ func (s *Client) doSalesforceRequest(ctx context.Context, u *url.URL) ([]byte, e func (s *Client) updateAndSetBearerToken(ctx context.Context) error { apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/oauth2/token") if err != nil { - return fmt.Errorf("invalid URL: %w", err) + return fmt.Errorf("invalid token endpoint URL: %w", err) } // Build form-encoded body @@ -109,7 +116,6 @@ func (s *Client) updateAndSetBearerToken(ctx context.Context) error { form.Set("client_secret", s.clientSecret) req, err := http.NewRequestWithContext(ctx, "POST", apiUrl.String(), strings.NewReader(form.Encode())) - if err != nil { return fmt.Errorf("failed to create request: %w", err) } @@ -126,7 +132,9 @@ func (s *Client) updateAndSetBearerToken(ctx context.Context) error { if err := json.Unmarshal(body, &result); err != nil { return fmt.Errorf("cannot map response to custom field struct: %w", err) } + s.tokenMu.Lock() s.bearerToken = result.AccessToken + s.tokenMu.Unlock() return nil } @@ -179,10 +187,13 @@ func (s *Client) GetSObjectResource(ctx context.Context, sObj string) ([]byte, e // GetSObjectData function to call receive the description of the sObject func (s *Client) GetSObjectData(ctx context.Context, query string) ([]byte, error) { - apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/query?q=" + query) + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/query") if err != nil { return nil, fmt.Errorf("invalid URL: %w", err) } + q := apiUrl.Query() + q.Set("q", query) + apiUrl.RawQuery = q.Encode() body, err := s.callSalesforceAPI(ctx, apiUrl) if err != nil { @@ -199,17 +210,18 @@ type Client struct { clientSecret string apiVersion string bearerToken string + tokenMu sync.Mutex httpClient *http.Client retryOpts RetryOptions log *service.Logger } // NewClient is the constructor for a Client object -func NewClient(log *service.Logger, orgUrl, clientId, clientSecret, apiVersion string, maxRetries int, metrics *service.Metrics, httpClient *http.Client) (*Client, error) { +func NewClient(orgURL, clientID, clientSecret, apiVersion string, maxRetries int, httpClient *http.Client, log *service.Logger, metrics *service.Metrics) (*Client, error) { return &Client{ log: log, - orgURL: orgUrl, - clientID: clientId, + orgURL: orgURL, + clientID: clientID, clientSecret: clientSecret, apiVersion: apiVersion, retryOpts: RetryOptions{ diff --git a/internal/impl/salesforce/salesforcehttp/client_test.go b/internal/impl/salesforce/salesforcehttp/client_test.go index 10c8e1cbcd..9f806fc3fd 100644 --- a/internal/impl/salesforce/salesforcehttp/client_test.go +++ b/internal/impl/salesforce/salesforcehttp/client_test.go @@ -38,18 +38,15 @@ func TestUpdateAndSetBearerToken_RealClient(t *testing.T) { })) defer ts.Close() - // log := service.NewLogger("test") - client, err := NewClient( - nil, ts.URL, "id", "secret", "v65.0", 1, - // service.NewMetrics(), - nil, ts.Client(), + nil, + nil, ) require.NoError(t, err) @@ -97,14 +94,14 @@ func TestCallSalesforceApi_RefreshOn401_RealClient(t *testing.T) { defer ts.Close() client, err := NewClient( - nil, ts.URL, "id", "secret", "v65.0", 1, - nil, ts.Client(), + nil, + nil, ) require.NoError(t, err) diff --git a/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go b/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go index 6667e5b36a..003e1aa51a 100644 --- a/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go +++ b/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go @@ -95,7 +95,7 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { // metrics such as in-flight requests, response status codes, errors, and request duration. func NewInstrumentedClient(m *service.Metrics, namespace string, client *http.Client) *http.Client { if client == nil { - client = &http.Client{} + client = http.DefaultClient } clone := *client clone.Transport = NewTransport(m, namespace, client.Transport) diff --git a/internal/impl/salesforce/salesforcehttp/salesforce_helper.go b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go index 2e31ea24bc..a7d206c281 100644 --- a/internal/impl/salesforce/salesforcehttp/salesforce_helper.go +++ b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go @@ -16,7 +16,7 @@ // with Salesforce-specific authentication and rate-limiting handling. // It wraps HTTP responses to detect common error cases such as // 401/403 Unauthorized, 429 Too Many Requests, and Salesforce’s -// X-Seraph-LoginReason header, which may indicate an authentication +// auth-related header, which may indicate an authentication // problem even on a 200 OK response. // // Overview @@ -24,7 +24,7 @@ // authentication and rate-limiting signals. // - Central entry point: CheckSalesforceAuth(resp) which examines an http.Response and // returns a *SalesforceError for common Salesforce conditions: -// - 401 Unauthorized: Likely invalid credentials (email/API token). +// - 401 Unauthorized: Likely invalid or expired access token. // - 403 Forbidden: Authenticated but insufficient permissions. // - 429 Too Many Requests: Salesforce is throttling; check Retry-After header. // - On success (no issues detected), CheckSalesforceAuth returns nil. @@ -40,10 +40,10 @@ // 1. Basic request and auth check // // ctx := context.Background() -// req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https:your-domain.atlassian.net/rest/api/3/myself", nil) -// req.SetBasicAuth("", "") +// req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://your-domain.salesforce.com/services/data/v65.0/sobjects", nil) +// req.Header.Set("Authorization", "Bearer ") // req.Header.Set("Accept", "application/json") -// req.Header.Set("User-Agent", "YourApp/1.0") +// req.Header.Set("User-Agent", "Redpanda-Connect") // // resp, err := http.DefaultClient.Do(req) // if err != nil { @@ -52,13 +52,12 @@ // } // defer resp.Body.Close() // -// if jerr := salesforce_helper.CheckSalesforceAuth(resp); jerr != nil { -// // This includes 401, 403, 429, and 200 + X-Seraph-LoginReason problems. +// if serr := salesforce_helper.CheckSalesforceAuth(resp); serr != nil { +// // This includes 401, 403, 429, and 200 + header-signaled problems. // // You can inspect the error for details. -// if je, ok := jerr.(*salesforce_helper.SalesforceError); ok { -// log.Printf("salesforce error: status=%d reason=%s", je.StatusCode, je.Reason) -// log.Printf("headers: %v", je.Headers) -// //Optionally log or parse a truncated version of je.Body. +// if se, ok := serr.(*salesforce_helper.SalesforceError); ok { +// log.Printf("salesforce error: status=%d reason=%s", se.StatusCode, se.Reason) +// log.Printf("headers: %v", se.Headers) // } // return // } @@ -100,7 +99,7 @@ // case http.StatusTooManyRequests: // // back off and retry later // default: -// // 200 with X-Seraph-LoginReason or other 4xx/5xx +// // 200 with auth-related or other 4xx/5xx // } // } // return @@ -186,7 +185,7 @@ func (e *HTTPError) Error() string { } // AuthHeaderPolicy allows callers to declare a header that signals an auth problem -// even on 200 OK responses (e.g., "X-Seraph-LoginReason"). +// even on 200 OK responses (e.g., "auth-related"). type AuthHeaderPolicy struct { HeaderName string // case-insensitive IsProblem func(val string) bool // return true if the header value indicates auth failure diff --git a/internal/impl/salesforce/salesforcehttp/types.go b/internal/impl/salesforce/salesforcehttp/types.go index 7002296c1d..248f2638f5 100644 --- a/internal/impl/salesforce/salesforcehttp/types.go +++ b/internal/impl/salesforce/salesforcehttp/types.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// types.go defines core data structures, response models, and enums for the Jira processor. +// types.go defines core data structures, response models, and enums for the Salesforce processor. // It includes input query types, API response DTOs, output message formats, and resource type constants. package salesforcehttp From f3a837ee7e27a0793852f1ba68bf4c03e360b12c Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Mon, 16 Feb 2026 17:45:21 +0200 Subject: [PATCH 18/19] PR changes --- .../impl/salesforce/salesforcehttp/client.go | 69 +++++++++---------- .../salesforce/salesforcehttp/client_test.go | 2 +- .../{http_metrics => metrics}/http_metrics.go | 2 +- .../salesforcehttp/salesforce_helper.go | 16 +++-- 4 files changed, 47 insertions(+), 42 deletions(-) rename internal/impl/salesforce/salesforcehttp/{http_metrics => metrics}/http_metrics.go (99%) diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go index a301f79a00..70b79cb69d 100644 --- a/internal/impl/salesforce/salesforcehttp/client.go +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -28,9 +28,9 @@ import ( "net/http" "net/url" "strings" - "sync" + "sync/atomic" - "github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp/http_metrics" + "github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp/metrics" "github.com/redpanda-data/benthos/v4/public/service" ) @@ -44,11 +44,7 @@ const salesforceAPIBasePath = "/services" func (s *Client) callSalesforceAPI(ctx context.Context, u *url.URL) ([]byte, error) { s.log.Debugf("API call: %s", u.String()) - s.tokenMu.Lock() - needsToken := s.bearerToken == "" - s.tokenMu.Unlock() - - if needsToken { + if s.getBearerToken() == "" { if err := s.updateAndSetBearerToken(ctx); err != nil { return nil, err } @@ -86,20 +82,18 @@ func (s *Client) callSalesforceAPI(ctx context.Context, u *url.URL) ([]byte, err } func (s *Client) doSalesforceRequest(ctx context.Context, u *url.URL) ([]byte, error) { - req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) + newReq := func() (*http.Request, error) { + req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/json") + req.Header.Set("User-Agent", "Redpanda-Connect") + req.Header.Set("Authorization", "Bearer "+s.getBearerToken()) + return req, nil } - s.tokenMu.Lock() - token := s.bearerToken - s.tokenMu.Unlock() - - req.Header.Set("Accept", "application/json") - req.Header.Set("User-Agent", "Redpanda-Connect") - req.Header.Set("Authorization", "Bearer "+token) - - return DoRequestWithRetries(ctx, s.httpClient, req, s.retryOpts) + return DoRequestWithRetries(ctx, s.httpClient, newReq, s.retryOpts) } // Function to get the Bearer token from Salesforce Oauth2.0 endpoint using client credentials grant type along with client id and client secret @@ -115,15 +109,19 @@ func (s *Client) updateAndSetBearerToken(ctx context.Context) error { form.Set("client_id", s.clientID) form.Set("client_secret", s.clientSecret) - req, err := http.NewRequestWithContext(ctx, "POST", apiUrl.String(), strings.NewReader(form.Encode())) - if err != nil { - return fmt.Errorf("failed to create request: %w", err) + encodedForm := form.Encode() + newReq := func() (*http.Request, error) { + req, err := http.NewRequestWithContext(ctx, "POST", apiUrl.String(), strings.NewReader(encodedForm)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "application/json") + req.Header.Set("User-Agent", "Redpanda-Connect") + return req, nil } - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - req.Header.Set("Accept", "application/json") - req.Header.Set("User-Agent", "Redpanda-Connect") - body, err := DoRequestWithRetries(ctx, s.httpClient, req, s.retryOpts) + body, err := DoRequestWithRetries(ctx, s.httpClient, newReq, s.retryOpts) if err != nil { return fmt.Errorf("request failed: %w", err) } @@ -132,9 +130,7 @@ func (s *Client) updateAndSetBearerToken(ctx context.Context) error { if err := json.Unmarshal(body, &result); err != nil { return fmt.Errorf("cannot map response to custom field struct: %w", err) } - s.tokenMu.Lock() - s.bearerToken = result.AccessToken - s.tokenMu.Unlock() + s.bearerToken.Store(result.AccessToken) return nil } @@ -209,15 +205,19 @@ type Client struct { clientID string clientSecret string apiVersion string - bearerToken string - tokenMu sync.Mutex + bearerToken atomic.Value httpClient *http.Client retryOpts RetryOptions log *service.Logger } +func (s *Client) getBearerToken() string { + v, _ := s.bearerToken.Load().(string) + return v +} + // NewClient is the constructor for a Client object -func NewClient(orgURL, clientID, clientSecret, apiVersion string, maxRetries int, httpClient *http.Client, log *service.Logger, metrics *service.Metrics) (*Client, error) { +func NewClient(orgURL, clientID, clientSecret, apiVersion string, maxRetries int, httpClient *http.Client, log *service.Logger, m *service.Metrics) (*Client, error) { return &Client{ log: log, orgURL: orgURL, @@ -227,9 +227,8 @@ func NewClient(orgURL, clientID, clientSecret, apiVersion string, maxRetries int retryOpts: RetryOptions{ MaxRetries: maxRetries, }, - httpClient: http_metrics.NewInstrumentedClient( - metrics, "salesforce_http", + httpClient: metrics.NewInstrumentedClient( + m, "salesforce_http", httpClient), - bearerToken: "", }, nil } diff --git a/internal/impl/salesforce/salesforcehttp/client_test.go b/internal/impl/salesforce/salesforcehttp/client_test.go index 9f806fc3fd..545340dfb6 100644 --- a/internal/impl/salesforce/salesforcehttp/client_test.go +++ b/internal/impl/salesforce/salesforcehttp/client_test.go @@ -52,7 +52,7 @@ func TestUpdateAndSetBearerToken_RealClient(t *testing.T) { err = client.updateAndSetBearerToken(context.Background()) require.NoError(t, err) - assert.Equal(t, "abc123", client.bearerToken) + assert.Equal(t, "abc123", client.getBearerToken()) } func TestCallSalesforceApi_RefreshOn401_RealClient(t *testing.T) { diff --git a/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go b/internal/impl/salesforce/salesforcehttp/metrics/http_metrics.go similarity index 99% rename from internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go rename to internal/impl/salesforce/salesforcehttp/metrics/http_metrics.go index 003e1aa51a..33a23c26e0 100644 --- a/internal/impl/salesforce/salesforcehttp/http_metrics/http_metrics.go +++ b/internal/impl/salesforce/salesforcehttp/metrics/http_metrics.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package http_metrics +package metrics import ( "net/http" diff --git a/internal/impl/salesforce/salesforcehttp/salesforce_helper.go b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go index a7d206c281..6f73d23129 100644 --- a/internal/impl/salesforce/salesforcehttp/salesforce_helper.go +++ b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go @@ -214,15 +214,18 @@ func backoffWithJitter(base, maxDuration time.Duration, attempt int) time.Durati return d + jitter } -// DoRequestWithRetries executes req, handling: +// DoRequestWithRetries executes a request built by newReq on each attempt, handling: // - Auth errors on 401/403 // - Header-signaled auth problems on 200 (via AuthHeaderPolicy) // - 429 with Retry-After or exponential backoff and jitter (up to MaxRetries) // Other 4xx/5xx are returned as HTTPError without a retry. +// +// newReq is called for every attempt so that the request body is fresh +// (POST bodies are consumed by the first Do and cannot be reused). func DoRequestWithRetries( ctx context.Context, client *http.Client, - req *http.Request, + newReq func() (*http.Request, error), opts RetryOptions, ) ([]byte, error) { if client == nil { @@ -231,6 +234,10 @@ func DoRequestWithRetries( attempt := 0 for { + req, err := newReq() + if err != nil { + return nil, fmt.Errorf("failed to build request: %w", err) + } resp, err := client.Do(req.WithContext(ctx)) if err != nil { return nil, err @@ -322,10 +329,9 @@ func DoRequestWithRetries( } } - defer resp.Body.Close() - - // Read the response body for context + // Read the response body and close immediately (not defer — we're in a loop) bodyBytes, _ := io.ReadAll(resp.Body) + resp.Body.Close() return bodyBytes, nil } } From 9fbdcbba6765e3cf319752b1611f28ffc55588e8 Mon Sep 17 00:00:00 2001 From: ness-david-dedu Date: Tue, 17 Feb 2026 09:02:51 +0200 Subject: [PATCH 19/19] fix lint --- internal/impl/salesforce/salesforcehttp/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go index 70b79cb69d..f905012d93 100644 --- a/internal/impl/salesforce/salesforcehttp/client.go +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -205,7 +205,7 @@ type Client struct { clientID string clientSecret string apiVersion string - bearerToken atomic.Value + bearerToken atomic.Value httpClient *http.Client retryOpts RetryOptions log *service.Logger