Skip to content

salesforce implementation - phase 1#3939

Open
ness-david-dedu wants to merge 19 commits intoredpanda-data:mainfrom
ness-david-dedu:feature/salesforce-phase-1
Open

salesforce implementation - phase 1#3939
ness-david-dedu wants to merge 19 commits intoredpanda-data:mainfrom
ness-david-dedu:feature/salesforce-phase-1

Conversation

@ness-david-dedu
Copy link

No description provided.

@CLAassistant
Copy link

CLAassistant commented Jan 29, 2026

CLA assistant check
All committers have signed the CLA.


func (*salesforceProcessor) Close(context.Context) error { return nil }

func init() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: It's a common convention to put init functions at the top of the file under constants/variables as the first function so it's lifecycle is clear to readers. You'll see the same convention used throughout other components.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


query := apiUrl.Query()
query.Set("grant_type", "client_credentials")
query.Set("client_id", s.clientId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
query.Set("client_id", s.clientId)
query.Set("client_id", s.clientID)

Minor: Follow the Go convention of uppercase acronyms.

Copy link
Contributor

@josephwoodward josephwoodward Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, aren't these supposed to be sent in the request-body instead of the query string?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, moved to request body

// This is the general function that calls Salesforce API on a specific URL using the URL object.
// It applies standard header parameters to all calls, Authorization, User-Agent and Accept.
// It uses the helper functions to check against possible response codes and handling the retry-after mechanism
func (s *Client) callSalesforceApi(ctx context.Context, u *url.URL) ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (s *Client) callSalesforceApi(ctx context.Context, u *url.URL) ([]byte, error) {
func (s *Client) callSalesforceAPI(ctx context.Context, u *url.URL) ([]byte, error) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

retryBody, retryErr := s.doSalesforceRequest(ctx, u)

if retryErr != nil {
return nil, fmt.Errorf("request failed: %v", retryErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this module will change with the connector, it's probably better to wrap the error with %w instead of %v. Same for other instances I see exist in these files.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

inflightVal int64 // track ourselves; gauge uses Set()
total *service.MetricCounter
errors *service.MetricCounter
status2xx *service.MetricCounter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use a metric per status code in other components? My understanding is it's better to have a status code as a label? It'd be worth looking at some other components capturing HTTP metrics to see if there's a pattern. This is definitely something we should look to standardise though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to using a label

ness-david-dedu and others added 7 commits February 10, 2026 23:32
Co-authored-by: Joseph Woodward <joseph.woodward@xeuse.com>
Co-authored-by: Joseph Woodward <joseph.woodward@xeuse.com>
Co-authored-by: Joseph Woodward <joseph.woodward@xeuse.com>
Co-authored-by: Joseph Woodward <joseph.woodward@xeuse.com>
Co-authored-by: Joseph Woodward <joseph.woodward@xeuse.com>
// See the License for the specific language governing permissions and
// limitations under the License.

// types.go defines core data structures, response models, and enums for the Jira processor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// types.go defines core data structures, response models, and enums for the Jira processor.
// types.go defines core data structures, response models, and enums for the Salesforce processor.

if err != nil {
return nil, fmt.Errorf("invalid URL: %w", err)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove the q=query param from the concatenated string and encode/escape it properly:

 q := apiUrl.Query()                                                                                                                                          
 q.Set("q", query)                                                                                                                                            
 apiUrl.RawQuery = q.Encode() 

// 1. Basic request and auth check
//
// ctx := context.Background()
// req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "https:your-domain.atlassian.net/rest/api/3/myself", nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is from the Jira example? We should update the documentation to a working salesforce example.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// metrics such as in-flight requests, response status codes, errors, and request duration.
func NewInstrumentedClient(m *service.Metrics, namespace string, client *http.Client) *http.Client {
if client == nil {
client = &http.Client{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to use http.Client here instead of http.DefaultClient as you have done in DoRequestWithRetries?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// NewClient is the constructor for a Client object
func NewClient(log *service.Logger, orgUrl, clientId, clientSecret, apiVersion string, maxRetries int, metrics *service.Metrics, httpClient *http.Client) (*Client, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewClient(log *service.Logger, orgUrl, clientId, clientSecret, apiVersion string, maxRetries int, metrics *service.Metrics, httpClient *http.Client) (*Client, error) {
func NewClient(log *service.Logger, orgURL, clientID, clientSecret, apiVersion string, maxRetries int, metrics *service.Metrics, httpClient *http.Client) (*Client, error) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, arguments are usually supplied in terms of importance and relevance, so something like logger and metrics would be at the end, for instance:

httpClient *http.Client, orgUrl, clientId, clientSecret, apiVersion string, maxRetries int, metrics *service.Metrics, log *service.Logger

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the arguments order:

  1. orgURL, clientID, clientSecret — connection identity
  2. apiVersion, maxRetries — behavioral config
  3. httpClient, log, metrics — infrastructure/dependencies

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


req.Header.Set("Accept", "application/json")
req.Header.Set("User-Agent", "Redpanda-Connect")
req.Header.Set("Authorization", "Bearer "+s.bearerToken)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to guard token access with a mutex to prevent data races with access to the token?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err != nil {
return nil, err
}
s.log.Debugf("Fetching from Salesforce.. Input: %s", string(inputMsg))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this will go once the processor implementation is complete?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

return nil, err
}

maxRetries, err := conf.FieldInt("max_retries")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably validate this for negative retries.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

func (s *Client) updateAndSetBearerToken(ctx context.Context) error {
apiUrl, err := url.Parse(s.orgURL + salesforceAPIBasePath + "/oauth2/token")
if err != nil {
return fmt.Errorf("invalid URL: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add a bit more context to these kinds of errors, such as "invalid token endpoint url" to make it clearer to readers what part is invalid.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}))
defer ts.Close()

// log := service.NewLogger("test")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these comments needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

clientID string
clientSecret string
apiVersion string
bearerToken string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd maybe suggest having an atomic string here instead of a mutex since it seems to only be getted and setted rather than used in a multiple line interaction.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

attempt := 0

for {
resp, err := client.Do(req.WithContext(ctx))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the first iteration this request will have already flushed its body, and so it'll be empty if this is a POST request. Instead you'll need to reconstruct the request for each iteration. A quick solution is to change the req param into a reqBuilder func() *http.Request.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

defer resp.Body.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're inside a loop I would move this below the read and make it immediate

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// See the License for the specific language governing permissions and
// limitations under the License.

package http_metrics
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't generally use underscores in package names, I would simply name this metrics.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants