diff --git a/internal/impl/salesforce/processor_salesforce.go b/internal/impl/salesforce/processor_salesforce.go new file mode 100644 index 0000000000..ba4f62242f --- /dev/null +++ b/internal/impl/salesforce/processor_salesforce.go @@ -0,0 +1,189 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package salesforce provides a Benthos salesforceProcessor that integrates with the Salesforce APIs +// to fetch data based on input messages. It allows querying Salesforce resources +// such as .... TODO + +package salesforce + +import ( + "context" + "errors" + "net/http" + "net/url" + + "github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// salesforceProcessor is the Benthos salesforceProcessor implementation for Salesforce queries. +// It holds the client state and orchestrates calls into the salesforcehttp package. +type salesforceProcessor struct { + log *service.Logger + client *salesforcehttp.Client +} + +// SObjectList is the response from all the available sObjects +type SObjectList struct { + Encoding string `json:"encoding"` + MaxBatchSize int `json:"maxBatchSize"` + Sobjects []SObject `json:"sobjects"` +} + +// SObject is the minimal representation of an sObject +type SObject struct { + Name string `json:"name"` +} + +// DescribeResult sObject result +type DescribeResult struct { + Fields []struct { + Name string `json:"name"` + } `json:"fields"` +} + +// QueryResult of the salesforce search query +type QueryResult struct { + TotalSize int `json:"totalSize"` + Done bool `json:"done"` +} + +func init() { + if err := service.RegisterProcessor( + "salesforce", newSalesforceProcessorConfigSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + return newSalesforceProcessor(conf, mgr) + }, + ); err != nil { + panic(err) + } +} + +// newSalesforceProcessorConfigSpec creates a new Configuration specification for the Salesforce processor +func newSalesforceProcessorConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Summary("Fetches data from Salesforce based on input messages"). + Description(`This salesforceProcessor takes input messages containing Salesforce queries and returns Salesforce data. + +Supports the following Salesforce resources: +- todo + +Configuration examples: + +` + "```configYAML" + ` +# Minimal configuration +pipeline: + processors: + - salesforce: + org_url: "https://your-domain.salesforce.com" + client_id: "${SALESFORCE_CLIENT_ID}" + client_secret: "${SALESFORCE_CLIENT_SECRET}" + +# Full configuration +pipeline: + processors: + - salesforce: + org_url: "https://your-domain.salesforce.com" + client_id: "${SALESFORCE_CLIENT_ID}" + client_secret: "${SALESFORCE_CLIENT_SECRET}" + restapi_version: "v64.0" + request_timeout: "30s" + max_retries: 50 +` + "```"). + Field(service.NewStringField("org_url"). + Description("Salesforce instance base URL (e.g., https://your-domain.salesforce.com)")). + Field(service.NewStringField("client_id"). + Description("Client ID for the Salesforce Connected App")). + Field(service.NewStringField("client_secret"). + Description("Client Secret for the Salesforce Connected App"). + Secret()). + Field(service.NewStringField("restapi_version"). + Description("Salesforce REST API version to use (example: v64.0). Default: v65.0"). + Default("v65.0")). + Field(service.NewDurationField("request_timeout"). + Description("HTTP request timeout"). + Default("30s")). + Field(service.NewIntField("max_retries"). + Description("Maximum number of retries in case of 429 HTTP Status Code"). + Default(10)) +} + +func newSalesforceProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*salesforceProcessor, error) { + orgURL, err := conf.FieldString("org_url") + if err != nil { + return nil, err + } + + if _, err := url.ParseRequestURI(orgURL); err != nil { + return nil, errors.New("org_url is not a valid URL") + } + + clientID, err := conf.FieldString("client_id") + if err != nil { + return nil, err + } + + clientSecret, err := conf.FieldString("client_secret") + if err != nil { + return nil, err + } + + apiVersion, err := conf.FieldString("restapi_version") + if err != nil { + return nil, err + } + + timeout, err := conf.FieldDuration("request_timeout") + if err != nil { + return nil, err + } + + maxRetries, err := conf.FieldInt("max_retries") + if err != nil { + return nil, err + } + if maxRetries < 0 { + return nil, errors.New("max_retries must not be negative") + } + + httpClient := &http.Client{Timeout: timeout} + + salesforceHttp, err := salesforcehttp.NewClient(orgURL, clientID, clientSecret, apiVersion, maxRetries, httpClient, mgr.Logger(), mgr.Metrics()) + if err != nil { + return nil, err + } + + return &salesforceProcessor{ + client: salesforceHttp, + log: mgr.Logger(), + }, nil +} + +func (s *salesforceProcessor) Process(ctx context.Context, _ *service.Message) (service.MessageBatch, error) { + var batch service.MessageBatch + + res, err := s.client.GetAvailableResources(ctx) + if err != nil { + return nil, err + } + + m := service.NewMessage(res) + batch = append(batch, m) + + return batch, nil +} + +func (*salesforceProcessor) Close(context.Context) error { return nil } diff --git a/internal/impl/salesforce/processor_salesforce_test.go b/internal/impl/salesforce/processor_salesforce_test.go new file mode 100644 index 0000000000..ea748efdc3 --- /dev/null +++ b/internal/impl/salesforce/processor_salesforce_test.go @@ -0,0 +1,149 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package salesforce + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +func TestSalesforceProcessorConfigValidation(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + configYAML string + wantErrSub string + }{ + { + name: "missing org_url", + configYAML: ` +client_id: "abc" +client_secret: "xyz" +`, + wantErrSub: "org_url", + }, + { + name: "invalid org_url", + configYAML: ` +org_url: "not a url" +client_id: "abc" +client_secret: "xyz" +`, + wantErrSub: "org_url", + }, + { + name: "missing client_id", + configYAML: ` +org_url: "https://example.com" +client_secret: "xyz" +`, + wantErrSub: "client_id", + }, + { + name: "missing client_secret", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +`, + wantErrSub: "client_secret", + }, + { + name: "invalid restapi_version type", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +restapi_version: 123 +`, + wantErrSub: "restapi_version", + }, + { + name: "invalid request_timeout", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +request_timeout: "not-a-duration" +`, + wantErrSub: "request_timeout", + }, + { + name: "invalid max_retries", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +max_retries: "not-an-int" +`, + wantErrSub: "max_retries", + }, + { + name: "valid minimal config", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +`, + wantErrSub: "", + }, + { + name: "valid full config", + configYAML: ` +org_url: "https://example.com" +client_id: "abc" +client_secret: "xyz" +restapi_version: "v64.0" +request_timeout: "10s" +max_retries: 5 +`, + wantErrSub: "", + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + env := service.NewEnvironment() + spec := newSalesforceProcessorConfigSpec() + + conf, err := spec.ParseYAML(tc.configYAML, env) + + var proc service.Processor + var procErr error + if err == nil { + proc, procErr = newSalesforceProcessor(conf, conf.Resources()) + } + + if tc.wantErrSub == "" { + require.NoError(t, err, "expected config to be valid") + require.NoError(t, procErr, "expected processor to initialize") + assert.NotNil(t, proc) + } else { + // Either config parsing OR processor creation must fail + if err != nil { + require.Contains(t, err.Error(), tc.wantErrSub) + } + if procErr != nil { + require.Contains(t, procErr.Error(), tc.wantErrSub) + } + } + }) + } +} diff --git a/internal/impl/salesforce/salesforcehttp/client.go b/internal/impl/salesforce/salesforcehttp/client.go new file mode 100644 index 0000000000..f905012d93 --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/client.go @@ -0,0 +1,234 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// client.go implements low-level interactions with the Salesforce REST API. +// It defines the base API path, provides a helper for making authenticated Salesforce API requests with retry +// and error handling, and exposes utilities for retrieving custom fields. +// +// These functions are primarily used by the Salesforce processor when preparing +// queries and resolving custom field identifiers. + +package salesforcehttp + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "strings" + "sync/atomic" + + "github.com/redpanda-data/connect/v4/internal/impl/salesforce/salesforcehttp/metrics" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// salesforceAPIBasePath is the base path for Salesforce Rest API +const salesforceAPIBasePath = "/services" + +// This is the general function that calls Salesforce API on a specific URL using the URL object. +// It applies standard header parameters to all calls, Authorization, User-Agent and Accept. +// It uses the helper functions to check against possible response codes and handling the retry-after mechanism +func (s *Client) callSalesforceAPI(ctx context.Context, u *url.URL) ([]byte, error) { + s.log.Debugf("API call: %s", u.String()) + + if s.getBearerToken() == "" { + if err := s.updateAndSetBearerToken(ctx); err != nil { + return nil, err + } + } + + body, err := s.doSalesforceRequest(ctx, u) + if err == nil { + return body, nil + } + + // Check if it's an HTTPError + httpErr, ok := err.(*HTTPError) + if !ok { + return nil, err + } + + // Only refresh on 401 + if httpErr.StatusCode != http.StatusUnauthorized { + return nil, err + } + + s.log.Warn("Salesforce token expired, refreshing token...") + // Refresh token + if err := s.updateAndSetBearerToken(ctx); err != nil { + return nil, fmt.Errorf("failed to refresh token: %w", err) + } + + // Retry once + retryBody, retryErr := s.doSalesforceRequest(ctx, u) + if retryErr != nil { + return nil, fmt.Errorf("request failed: %w", retryErr) + } + + return retryBody, nil +} + +func (s *Client) doSalesforceRequest(ctx context.Context, u *url.URL) ([]byte, error) { + newReq := func() (*http.Request, error) { + req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/json") + req.Header.Set("User-Agent", "Redpanda-Connect") + req.Header.Set("Authorization", "Bearer "+s.getBearerToken()) + return req, nil + } + + return DoRequestWithRetries(ctx, s.httpClient, newReq, s.retryOpts) +} + +// Function to get the Bearer token from Salesforce Oauth2.0 endpoint using client credentials grant type along with client id and client secret +func (s *Client) updateAndSetBearerToken(ctx context.Context) error { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/oauth2/token") + if err != nil { + return fmt.Errorf("invalid token endpoint URL: %w", err) + } + + // Build form-encoded body + form := url.Values{} + form.Set("grant_type", "client_credentials") + form.Set("client_id", s.clientID) + form.Set("client_secret", s.clientSecret) + + encodedForm := form.Encode() + newReq := func() (*http.Request, error) { + req, err := http.NewRequestWithContext(ctx, "POST", apiUrl.String(), strings.NewReader(encodedForm)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "application/json") + req.Header.Set("User-Agent", "Redpanda-Connect") + return req, nil + } + + body, err := DoRequestWithRetries(ctx, s.httpClient, newReq, s.retryOpts) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + + var result SalesforceAuthResponse + if err := json.Unmarshal(body, &result); err != nil { + return fmt.Errorf("cannot map response to custom field struct: %w", err) + } + s.bearerToken.Store(result.AccessToken) + + return nil +} + +// GetAvailableResources function to call get available resources endpoint +// Used mainly for auth testing purposes +func (s *Client) GetAvailableResources(ctx context.Context) ([]byte, error) { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion) + if err != nil { + return nil, fmt.Errorf("invalid URL: %w", err) + } + + body, err := s.callSalesforceAPI(ctx, apiUrl) + if err != nil { + return nil, err + } + + return body, nil +} + +// GetAllSObjectResources function to get available sObjects endpoint +func (s *Client) GetAllSObjectResources(ctx context.Context) ([]byte, error) { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/sobjects") + if err != nil { + return nil, fmt.Errorf("invalid URL: %w", err) + } + + body, err := s.callSalesforceAPI(ctx, apiUrl) + if err != nil { + return nil, err + } + + return body, nil +} + +// GetSObjectResource function to call receive the description of the sObject +func (s *Client) GetSObjectResource(ctx context.Context, sObj string) ([]byte, error) { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/sobjects/" + sObj + "/describe") + if err != nil { + return nil, fmt.Errorf("invalid URL: %w", err) + } + + body, err := s.callSalesforceAPI(ctx, apiUrl) + if err != nil { + return nil, err + } + + return body, nil +} + +// GetSObjectData function to call receive the description of the sObject +func (s *Client) GetSObjectData(ctx context.Context, query string) ([]byte, error) { + apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/data/" + s.apiVersion + "/query") + if err != nil { + return nil, fmt.Errorf("invalid URL: %w", err) + } + q := apiUrl.Query() + q.Set("q", query) + apiUrl.RawQuery = q.Encode() + + body, err := s.callSalesforceAPI(ctx, apiUrl) + if err != nil { + return nil, err + } + + return body, nil +} + +// Client is the implementation of Salesforce API queries. It holds the client state and orchestrates calls into the salesforcehttp package. +type Client struct { + orgURL string + clientID string + clientSecret string + apiVersion string + bearerToken atomic.Value + httpClient *http.Client + retryOpts RetryOptions + log *service.Logger +} + +func (s *Client) getBearerToken() string { + v, _ := s.bearerToken.Load().(string) + return v +} + +// NewClient is the constructor for a Client object +func NewClient(orgURL, clientID, clientSecret, apiVersion string, maxRetries int, httpClient *http.Client, log *service.Logger, m *service.Metrics) (*Client, error) { + return &Client{ + log: log, + orgURL: orgURL, + clientID: clientID, + clientSecret: clientSecret, + apiVersion: apiVersion, + retryOpts: RetryOptions{ + MaxRetries: maxRetries, + }, + httpClient: metrics.NewInstrumentedClient( + m, "salesforce_http", + httpClient), + }, nil +} diff --git a/internal/impl/salesforce/salesforcehttp/client_test.go b/internal/impl/salesforce/salesforcehttp/client_test.go new file mode 100644 index 0000000000..545340dfb6 --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/client_test.go @@ -0,0 +1,119 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package salesforcehttp + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUpdateAndSetBearerToken_RealClient(t *testing.T) { + t.Parallel() + + // Fake Salesforce OAuth server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "/services/oauth2/token", r.URL.Path) + + resp := map[string]string{"access_token": "abc123"} + _ = json.NewEncoder(w).Encode(resp) + })) + defer ts.Close() + + client, err := NewClient( + ts.URL, + "id", + "secret", + "v65.0", + 1, + ts.Client(), + nil, + nil, + ) + require.NoError(t, err) + + err = client.updateAndSetBearerToken(context.Background()) + require.NoError(t, err) + assert.Equal(t, "abc123", client.getBearerToken()) +} + +func TestCallSalesforceApi_RefreshOn401_RealClient(t *testing.T) { + t.Parallel() + + callCount := 0 + tokenIssued := false + + // Fake Salesforce server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + + // Token refresh endpoint + case "/services/oauth2/token": + tokenIssued = true + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"access_token":"new-token"}`)) + return + + // Data endpoint + case "/services/data/v65.0": + callCount++ + + // First call → 401 Unauthorized + if callCount == 1 { + w.WriteHeader(http.StatusUnauthorized) + return + } + + // Second call → success + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"ok":true}`)) + return + + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer ts.Close() + + client, err := NewClient( + ts.URL, + "id", + "secret", + "v65.0", + 1, + ts.Client(), + nil, + nil, + ) + require.NoError(t, err) + + body, err := client.callSalesforceAPI(context.Background(), mustParseURL(ts.URL+"/services/data/v65.0")) + require.NoError(t, err) + + assert.Equal(t, `{"ok":true}`, string(body)) + assert.Equal(t, 2, callCount) + assert.True(t, tokenIssued, "token refresh should have been called") +} + +func mustParseURL(s string) *url.URL { + u, _ := url.Parse(s) + return u +} diff --git a/internal/impl/salesforce/salesforcehttp/metrics/http_metrics.go b/internal/impl/salesforce/salesforcehttp/metrics/http_metrics.go new file mode 100644 index 0000000000..33a23c26e0 --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/metrics/http_metrics.go @@ -0,0 +1,103 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "net/http" + "strconv" + "sync/atomic" + "time" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// Transport is a wrapper around an http.RoundTripper that tracks request metrics. +type Transport struct { + base http.RoundTripper + metrics *service.Metrics + ns string + + inflight *service.MetricGauge + inflightVal int64 + + total *service.MetricCounter + errors *service.MetricCounter + status *service.MetricCounter + duration *service.MetricTimer +} + +// NewTransport creates new Transport with metrics instrumentation. It takes a metrics +// instance, namespace string for metric names, and an optional HTTP transport (if nil, creates a new one). +// The function returns Transport that tracks request metrics such as in-flight requests, response status codes, errors, and request duration. +func NewTransport(m *service.Metrics, namespace string, base http.RoundTripper) *Transport { + if base == nil { + base = http.DefaultTransport + } + + return &Transport{ + base: base, + metrics: m, + ns: namespace, + inflight: m.NewGauge(namespace + "_in_flight"), + total: m.NewCounter(namespace + "_requests_total"), + errors: m.NewCounter(namespace + "_requests_errors"), + status: m.NewCounter(namespace+"_responses", "status_code"), + duration: m.NewTimer(namespace + "_request_duration"), + } +} + +// RoundTrip implements the http.RoundTripper interface. +func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { + start := time.Now() + + // in-flight ++ + atomic.AddInt64(&t.inflightVal, 1) + t.inflight.Set(atomic.LoadInt64(&t.inflightVal)) + + // always record end-of-request updates + defer func() { + // duration in nanoseconds (MetricTimer expects int64) + t.duration.Timing(time.Since(start).Nanoseconds()) + // in-flight -- + atomic.AddInt64(&t.inflightVal, -1) + t.inflight.Set(atomic.LoadInt64(&t.inflightVal)) + }() + + t.total.Incr(1) + + resp, err := t.base.RoundTrip(req) + if err != nil { + t.errors.Incr(1) + return nil, err + } + + codeStr := strconv.Itoa(resp.StatusCode) + t.status.Incr(1, codeStr) + + return resp, nil +} + +// NewInstrumentedClient creates a new HTTP client with metrics instrumentation. It takes a metrics +// instance, namespace string for metric names, and an optional HTTP client (if nil, creates a new one). +// The function returns a clone of the input client with an instrumented transport that tracks request +// metrics such as in-flight requests, response status codes, errors, and request duration. +func NewInstrumentedClient(m *service.Metrics, namespace string, client *http.Client) *http.Client { + if client == nil { + client = http.DefaultClient + } + clone := *client + clone.Transport = NewTransport(m, namespace, client.Transport) + return &clone +} diff --git a/internal/impl/salesforce/salesforcehttp/salesforce_helper.go b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go new file mode 100644 index 0000000000..6f73d23129 --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/salesforce_helper.go @@ -0,0 +1,337 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// salesforce_helper.go provides helpers for making HTTP requests +// with Salesforce-specific authentication and rate-limiting handling. +// It wraps HTTP responses to detect common error cases such as +// 401/403 Unauthorized, 429 Too Many Requests, and Salesforce’s +// auth-related header, which may indicate an authentication +// problem even on a 200 OK response. +// +// Overview +// - Provides helpers for interpreting Salesforce HTTP responses with a focus on +// authentication and rate-limiting signals. +// - Central entry point: CheckSalesforceAuth(resp) which examines an http.Response and +// returns a *SalesforceError for common Salesforce conditions: +// - 401 Unauthorized: Likely invalid or expired access token. +// - 403 Forbidden: Authenticated but insufficient permissions. +// - 429 Too Many Requests: Salesforce is throttling; check Retry-After header. +// - On success (no issues detected), CheckSalesforceAuth returns nil. +// - On failure, SalesforceError includes StatusCode, Reason, Body, and Headers. +// +// When to use +// - Immediately after getting an *http.Response from Salesforce (e.g., client.Do(req)), +// before processing the body. This gives a consistent, structured way to react +// to auth and rate-limit conditions. +// +// Quick Start +// +// 1. Basic request and auth check +// +// ctx := context.Background() +// req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https://your-domain.salesforce.com/services/data/v65.0/sobjects", nil) +// req.Header.Set("Authorization", "Bearer ") +// req.Header.Set("Accept", "application/json") +// req.Header.Set("User-Agent", "Redpanda-Connect") +// +// resp, err := http.DefaultClient.Do(req) +// if err != nil { +// transport or network error +// log.Fatalf("request failed: %v", err) +// } +// defer resp.Body.Close() +// +// if serr := salesforce_helper.CheckSalesforceAuth(resp); serr != nil { +// // This includes 401, 403, 429, and 200 + header-signaled problems. +// // You can inspect the error for details. +// if se, ok := serr.(*salesforce_helper.SalesforceError); ok { +// log.Printf("salesforce error: status=%d reason=%s", se.StatusCode, se.Reason) +// log.Printf("headers: %v", se.Headers) +// } +// return +// } +// +// // Safe to read/parse the response body here. +// data, _ := io.ReadAll(resp.Body) +// log.Printf("success: %s", string(data)) +// +// 2. Handling rate limiting (429) with Retry-After +// +// // If CheckSalesforceAuth returns a SalesforceError with StatusCode 429, look for Retry-After. +// jerr := salesforce_helper.CheckSalesforceAuth(resp) +// if jerr != nil { +// if je, ok := jerr.(*salesforce_helper.SalesforceError); ok && je.StatusCode == http.StatusTooManyRequests { +// retryAfter := je.Headers.Get("Retry-After") // integer seconds expected +// // Convert to a duration and sleep before retrying +// if secs, convErr := strconv.Atoi(strings.TrimSpace(retryAfter)); convErr == nil && secs >= 0 { +// time.Sleep(time.Duration(secs) * time.Second) +// //retry the request here +// } else { +// // Fallback: use a default backoff (e.g., exponential with jitter) before retry +// } +// } +// } +// +// +// 3. Centralized error handling +// +// jerr := salesforce_helper.CheckSalesforceAuth(resp) +// if jerr != nil { +// // Use errors.As to extract *SalesforceError +// var je *salesforce_helper.SalesforceError +// if errors.As(jerr, &je) { +// switch je.StatusCode { +// case http.StatusUnauthorized: +// // refresh token, prompt re-login, etc. +// case http.StatusForbidden: +// // insufficient permissions: inform user or adjust scopes +// case http.StatusTooManyRequests: +// // back off and retry later +// default: +// // 200 with auth-related or other 4xx/5xx +// } +// } +// return +// } +// // proceed to parse resp.Body +// +// 4. Example helper wrapping a Salesforce call +// +// func callSalesforce(ctx context.Context, client *http.Client, req *http.Request) ([]byte, *salesforce_helper.SalesforceError) { +// resp, err := client.Do(req.WithContext(ctx)) +// if err != nil { +// return nil, &salesforce_helper.SalesforceError{ +// StatusCode: 0, +// Reason: fmt.Sprintf("transport error: %v", err), +// } +// } +// defer resp.Body.Close() +// +// if jerr := salesforce_helper.CheckSalesforceAuth(resp); jerr != nil { +// if je, ok := jerr.(*salesforce_helper.SalesforceError); ok { +// return nil, je +// } +// // Wrap non-SalesforceError just in case (shouldn't happen). +// return nil, &salesforce_helper.SalesforceError{StatusCode: resp.StatusCode, Reason: jerr.Error()} +// } +// +// data, readErr := io.ReadAll(resp.Body) +// if readErr != nil { +// return nil, &salesforce_helper.SalesforceError{StatusCode: resp.StatusCode, Reason: fmt.Sprintf("read error: %v", readErr)} +// } +// return data, nil +// } +// +// Inspecting SalesforceError +// - On error, CheckSalesforceAuth returns *SalesforceError containing: +// - StatusCode: HTTP status (or 200 in header-signaled cases). +// - Reason: High-level explanation (or http.StatusText for generic 4xx/5xx). +// - Body: Response body string (caller may truncate before logging). +// - Headers: Cloned response headers for further inspection. +// +// - Example: +// +// if jerr := salesforce_helper.CheckSalesforceAuth(resp); jerr != nil { +// if je, ok := jerr.(*salesforce_helper.SalesforceError); ok { +// fmt.Printf("status=%d reason=%s\n", je.StatusCode, je.Reason) +// } +// } +// +// Best Practices +// - Always set a User-Agent to identify your application. +// - Use context timeouts or deadlines to avoid hanging requests. +// - Be mindful when logging bodies; they may contain sensitive or large content. +// - Respect Retry-After when present; otherwise choose a conservative backoff. +// - Prefer idempotent methods for automatic retries; confirm business safety for POST/PUT. +// - Consider centralizing Salesforce error handling via CheckSalesforceAuth in your HTTP layer. +// +// Dependencies +// - Standard library only: net/http, io, fmt. + +package salesforcehttp + +import ( + "context" + "fmt" + "io" + "math/rand" + "net/http" + "strconv" + "strings" + "time" +) + +// HTTPError wraps non-2xx responses with useful context. +type HTTPError struct { + StatusCode int + Reason string + Body string + Headers http.Header +} + +func (e *HTTPError) Error() string { + return fmt.Sprintf("http error: status=%d reason=%s", e.StatusCode, e.Reason) +} + +// AuthHeaderPolicy allows callers to declare a header that signals an auth problem +// even on 200 OK responses (e.g., "auth-related"). +type AuthHeaderPolicy struct { + HeaderName string // case-insensitive + IsProblem func(val string) bool // return true if the header value indicates auth failure +} + +// RetryOptions controls the behavior of DoRequestWithRetries. +type RetryOptions struct { + MaxRetries int // retries for 429 Too Many Requests (0 = no retry) + BaseDelay time.Duration // base backoff (default 500 ms) + MaxDelay time.Duration // cap backoff (default 30s) + AuthHeaderPolicy *AuthHeaderPolicy // optional header-based auth detection +} + +func backoffWithJitter(base, maxDuration time.Duration, attempt int) time.Duration { + if base <= 0 { + base = 500 * time.Millisecond + } + if maxDuration <= 0 { + maxDuration = 30 * time.Second + } + d := base << attempt + if d > maxDuration { + d = maxDuration + } + jitter := time.Duration(rand.Int63n(int64(d))) - d/2 + return d + jitter +} + +// DoRequestWithRetries executes a request built by newReq on each attempt, handling: +// - Auth errors on 401/403 +// - Header-signaled auth problems on 200 (via AuthHeaderPolicy) +// - 429 with Retry-After or exponential backoff and jitter (up to MaxRetries) +// Other 4xx/5xx are returned as HTTPError without a retry. +// +// newReq is called for every attempt so that the request body is fresh +// (POST bodies are consumed by the first Do and cannot be reused). +func DoRequestWithRetries( + ctx context.Context, + client *http.Client, + newReq func() (*http.Request, error), + opts RetryOptions, +) ([]byte, error) { + if client == nil { + client = http.DefaultClient + } + attempt := 0 + + for { + req, err := newReq() + if err != nil { + return nil, fmt.Errorf("failed to build request: %w", err) + } + resp, err := client.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + // 1) Explicit auth/permission failures + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { + body, _ := io.ReadAll(resp.Body) + err := resp.Body.Close() + if err != nil { + return nil, err + } + return nil, &HTTPError{ + StatusCode: resp.StatusCode, + Reason: "authentication/authorization failure", + Body: string(body), + Headers: resp.Header.Clone(), + } + } + + // 2) 200 OK, but a header indicates an auth/login problem + if resp.StatusCode == http.StatusOK && opts.AuthHeaderPolicy != nil { + val := strings.TrimSpace(resp.Header.Get(opts.AuthHeaderPolicy.HeaderName)) + if val != "" && opts.AuthHeaderPolicy.IsProblem(val) { + body, _ := io.ReadAll(resp.Body) + err := resp.Body.Close() + if err != nil { + return nil, err + } + return nil, &HTTPError{ + StatusCode: resp.StatusCode, + Reason: fmt.Sprintf("auth/login issue indicated by %s=%q", opts.AuthHeaderPolicy.HeaderName, val), + Body: string(body), + Headers: resp.Header.Clone(), + } + } + } + + // 3) 429 Too Many Requests => retry with Retry-After or backoff + if resp.StatusCode == http.StatusTooManyRequests { + if attempt >= opts.MaxRetries { + body, _ := io.ReadAll(resp.Body) + err := resp.Body.Close() + if err != nil { + return nil, err + } + return nil, &HTTPError{ + StatusCode: resp.StatusCode, + Reason: "rate limit exceeded; retries exhausted", + Body: string(body), + Headers: resp.Header.Clone(), + } + } + + delay := backoffWithJitter(opts.BaseDelay, opts.MaxDelay, attempt) + if ra := strings.TrimSpace(resp.Header.Get("Retry-After")); ra != "" { + if secs, err := strconv.Atoi(ra); err == nil && secs >= 0 { + delay = time.Duration(secs) * time.Second + } + } + err := resp.Body.Close() + if err != nil { + return nil, err + } + + t := time.NewTimer(delay) + select { + case <-ctx.Done(): + t.Stop() + return nil, context.Canceled + case <-t.C: + } + attempt++ + continue + } + + // 4) Other non-2xx => return as error (no auto-retry here) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + err := resp.Body.Close() + if err != nil { + return nil, err + } + return nil, &HTTPError{ + StatusCode: resp.StatusCode, + Reason: http.StatusText(resp.StatusCode), + Body: string(body), + Headers: resp.Header.Clone(), + } + } + + // Read the response body and close immediately (not defer — we're in a loop) + bodyBytes, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return bodyBytes, nil + } +} diff --git a/internal/impl/salesforce/salesforcehttp/types.go b/internal/impl/salesforce/salesforcehttp/types.go new file mode 100644 index 0000000000..248f2638f5 --- /dev/null +++ b/internal/impl/salesforce/salesforcehttp/types.go @@ -0,0 +1,33 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// types.go defines core data structures, response models, and enums for the Salesforce processor. +// It includes input query types, API response DTOs, output message formats, and resource type constants. + +package salesforcehttp + +/*** Input / DTOs ***/ + +// SalesforceAuthResponse represents the response from the salesforce auth API +// We are using SalesforceAuthResponse in this context to get the whole auth object directly from Salesforce and parse the bearer token +type SalesforceAuthResponse struct { + AccessToken string `json:"access_token"` + Signature string `json:"signature"` + Scope string `json:"scope"` + InstanceUrl string `json:"instance_url"` + Id string `json:"id"` + TokenType string `json:"token_type"` + IssuedAt string `json:"issued_at"` + ApiInstanceUrl string `json:"api_instance_url"` +}