From 7f7e7b2b2fb28b03966f6a3cb8357afe57c8109c Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Fri, 2 Feb 2024 01:21:40 -0800 Subject: [PATCH] Fix linter (#30) --- .golangci.yml | 61 +++++++++++++++++---- integration-tests/batch_quickstart_test.go | 20 +++---- pinot/brokerSelector.go | 1 + pinot/connection.go | 2 + pinot/connectionFactory.go | 7 ++- pinot/connectionFactory_test.go | 8 ++- pinot/connection_test.go | 18 ++++-- pinot/controllerBasedBrokerSelector.go | 34 ++++++------ pinot/controllerBasedBrokerSelector_test.go | 38 ++++++++----- pinot/dynamicBrokerSelector.go | 25 ++++----- pinot/dynamicBrokerSelector_test.go | 4 +- pinot/json.go | 4 +- pinot/jsonAsyncHTTPClientTransport.go | 14 ++++- pinot/jsonAsyncHTTPClientTransport_test.go | 8 ++- pinot/response.go | 30 ++++++++-- pinot/response_test.go | 21 +++++-- pinot/simplebrokerselector.go | 3 +- pinot/tableAwareBrokerSelector.go | 1 + pinot/tableAwareBrokerSelector_test.go | 4 +- 19 files changed, 210 insertions(+), 93 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 66709e9..a4718da 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -9,24 +9,65 @@ output: linters: enable: + - errcheck + - gofmt + - goimports + - gosec - govet - - whitespace - - disable: - - godox - ineffassign - - errcheck - staticcheck - - gosimple - - stylecheck - - wsl + - typecheck - revive + - unused + disable: + - gochecknoinits + - gochecknoglobals -linter-settings: +linters-settings: govet: # report about shadowed variables check-shadowing: true # enable or disable analyzers by name # run `go tool vet help` to see all analyzers - enable-all: true \ No newline at end of file + enable-all: true + golint: + min-confidence: 0.8 + gocritic: + enabled-checks: + - appendCombine + - argOrder + - badCond + - dupBranchBody + - dupCase + - dupSubExpr + - elseif + - hugeParam + - initClause + - rangeValCopy + - sloppyLen + - typeSwitchVar + - underef + - unlambda + - unslice + gofmt: + simplify: true + goimports: + local-prefixes: github.com/myorg/mypackage + errcheck: + check-type-assertions: true + check-blank: true + +issues: + exclude-use-default: false + exclude-rules: + - linters: + - govet + text: "composite literal uses unkeyed fields" + - linters: + - golint + text: "should have comment or be unexported" + - linters: + - staticcheck + text: "SA5001: should check returned error before deferring" + diff --git a/integration-tests/batch_quickstart_test.go b/integration-tests/batch_quickstart_test.go index 510876c..c0f0d08 100644 --- a/integration-tests/batch_quickstart_test.go +++ b/integration-tests/batch_quickstart_test.go @@ -52,7 +52,7 @@ func getPinotClientFromBroker() *pinot.Connection { return pinotClient } -func getCustomHttpClient() *http.Client { +func getCustomHTTPClient() *http.Client { httpClient := &http.Client{ Timeout: 15 * time.Second, Transport: &http.Transport{ @@ -69,24 +69,24 @@ func getCustomHttpClient() *http.Client { return httpClient } -func getPinotClientFromZookeeperAndCustomHttpClient() *pinot.Connection { - pinotClient, err := pinot.NewFromZookeeperAndClient([]string{"localhost:" + zookeeperPort}, "", "QuickStartCluster", getCustomHttpClient()) +func getPinotClientFromZookeeperAndCustomHTTPClient() *pinot.Connection { + pinotClient, err := pinot.NewFromZookeeperAndClient([]string{"localhost:" + zookeeperPort}, "", "QuickStartCluster", getCustomHTTPClient()) if err != nil { log.Fatalln(err) } return pinotClient } -func getPinotClientFromControllerAndCustomHttpClient() *pinot.Connection { - pinotClient, err := pinot.NewFromControllerAndClient("localhost:"+controllerPort, getCustomHttpClient()) +func getPinotClientFromControllerAndCustomHTTPClient() *pinot.Connection { + pinotClient, err := pinot.NewFromControllerAndClient("localhost:"+controllerPort, getCustomHTTPClient()) if err != nil { log.Fatalln(err) } return pinotClient } -func getPinotClientFromBrokerAndCustomHttpClient() *pinot.Connection { - pinotClient, err := pinot.NewFromBrokerListAndClient([]string{"localhost:" + brokerPort}, getCustomHttpClient()) +func getPinotClientFromBrokerAndCustomHTTPClient() *pinot.Connection { + pinotClient, err := pinot.NewFromBrokerListAndClient([]string{"localhost:" + brokerPort}, getCustomHTTPClient()) if err != nil { log.Fatalln(err) } @@ -101,9 +101,9 @@ func TestSendingQueriesToPinot(t *testing.T) { getPinotClientFromZookeeper(), getPinotClientFromController(), getPinotClientFromBroker(), - getPinotClientFromZookeeperAndCustomHttpClient(), - getPinotClientFromControllerAndCustomHttpClient(), - getPinotClientFromBrokerAndCustomHttpClient(), + getPinotClientFromZookeeperAndCustomHTTPClient(), + getPinotClientFromControllerAndCustomHTTPClient(), + getPinotClientFromBrokerAndCustomHTTPClient(), } table := "baseballStats" diff --git a/pinot/brokerSelector.go b/pinot/brokerSelector.go index e499bc2..132c0e3 100644 --- a/pinot/brokerSelector.go +++ b/pinot/brokerSelector.go @@ -1,3 +1,4 @@ +// Package pinot provides a client for Pinot, a real-time distributed OLAP datastore. package pinot type brokerSelector interface { diff --git a/pinot/connection.go b/pinot/connection.go index 2df08a9..5fae778 100644 --- a/pinot/connection.go +++ b/pinot/connection.go @@ -30,10 +30,12 @@ func (c *Connection) ExecuteSQL(table string, query string) (*BrokerResponse, er return brokerResp, err } +// OpenTrace for the connection func (c *Connection) OpenTrace() { c.trace = true } +// CloseTrace for the connection func (c *Connection) CloseTrace() { c.trace = false } diff --git a/pinot/connectionFactory.go b/pinot/connectionFactory.go index fb2934a..efa8e0d 100644 --- a/pinot/connectionFactory.go +++ b/pinot/connectionFactory.go @@ -4,6 +4,8 @@ import ( "fmt" "net/http" "strings" + + log "github.com/sirupsen/logrus" ) const ( @@ -100,7 +102,10 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con } if conn != nil { // TODO: error handling results into `make test` failure. - conn.brokerSelector.init() + if err := conn.brokerSelector.init(); err != nil { + log.Errorf("Failed to initialize broker selector: %v", err) + return conn, err + } return conn, nil } return nil, fmt.Errorf( diff --git a/pinot/connectionFactory_test.go b/pinot/connectionFactory_test.go index b39f8ab..2e62818 100644 --- a/pinot/connectionFactory_test.go +++ b/pinot/connectionFactory_test.go @@ -13,7 +13,8 @@ func TestPinotClients(t *testing.T) { assert.NotNil(t, pinotClient1) assert.NotNil(t, pinotClient1.brokerSelector) assert.NotNil(t, pinotClient1.transport) - assert.Nil(t, err) + // Since there is no zk setup, so an error will be raised + assert.NotNil(t, err) pinotClient2, err := NewWithConfig(&ClientConfig{ ZkConfig: &ZookeeperConfig{ ZookeeperPath: []string{"localhost:2181"}, @@ -27,11 +28,14 @@ func TestPinotClients(t *testing.T) { assert.NotNil(t, pinotClient2) assert.NotNil(t, pinotClient2.brokerSelector) assert.NotNil(t, pinotClient2.transport) - assert.Nil(t, err) + // Since there is no zk setup, so an error will be raised + assert.NotNil(t, err) pinotClient3, err := NewFromController("localhost:9000") assert.NotNil(t, pinotClient3) assert.NotNil(t, pinotClient3.brokerSelector) assert.NotNil(t, pinotClient3.transport) + // Since there is no controller setup, so an error will be raised + assert.NotNil(t, err) _, err = NewWithConfig(&ClientConfig{}) assert.NotNil(t, err) assert.True(t, strings.Contains(err.Error(), "please specify")) diff --git a/pinot/connection_test.go b/pinot/connection_test.go index a0816fb..241db5e 100644 --- a/pinot/connection_test.go +++ b/pinot/connection_test.go @@ -113,8 +113,9 @@ func TestConnectionWithControllerBasedBrokerSelector(t *testing.T) { func TestSendingQueryWithTraceOpen(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var request map[string]string - json.NewDecoder(r.Body).Decode(&request) + err := json.NewDecoder(r.Body).Decode(&request) assert.Equal(t, request["trace"], "true") + assert.Nil(t, err) })) defer ts.Close() pinotClient, err := NewFromBrokerList([]string{ts.URL}) @@ -123,13 +124,16 @@ func TestSendingQueryWithTraceOpen(t *testing.T) { assert.NotNil(t, pinotClient.transport) assert.Nil(t, err) pinotClient.OpenTrace() - pinotClient.ExecuteSQL("", "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10") + resp, err := pinotClient.ExecuteSQL("", "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10") + assert.Nil(t, resp) + assert.NotNil(t, err) } func TestSendingQueryWithTraceClose(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var request map[string]string - json.NewDecoder(r.Body).Decode(&request) + err := json.NewDecoder(r.Body).Decode(&request) + assert.Nil(t, err) _, ok := request["trace"] assert.False(t, ok) })) @@ -139,8 +143,12 @@ func TestSendingQueryWithTraceClose(t *testing.T) { assert.NotNil(t, pinotClient.brokerSelector) assert.NotNil(t, pinotClient.transport) assert.Nil(t, err) - pinotClient.ExecuteSQL("", "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10") + resp, err := pinotClient.ExecuteSQL("", "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10") + assert.Nil(t, resp) + assert.NotNil(t, err) pinotClient.OpenTrace() pinotClient.CloseTrace() - pinotClient.ExecuteSQL("", "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10") + resp, err = pinotClient.ExecuteSQL("", "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10") + assert.Nil(t, resp) + assert.NotNil(t, err) } diff --git a/pinot/controllerBasedBrokerSelector.go b/pinot/controllerBasedBrokerSelector.go index a415562..f09c72b 100644 --- a/pinot/controllerBasedBrokerSelector.go +++ b/pinot/controllerBasedBrokerSelector.go @@ -21,6 +21,7 @@ var ( } ) +// HTTPClient is an interface for http.Client type HTTPClient interface { Do(req *http.Request) (*http.Response, error) } @@ -28,7 +29,7 @@ type HTTPClient interface { type controllerBasedSelector struct { client HTTPClient config *ControllerConfig - controllerAPIReqUrl string + controllerAPIReqURL string tableAwareBrokerSelector } @@ -36,19 +37,15 @@ func (s *controllerBasedSelector) init() error { if s.config.UpdateFreqMs == 0 { s.config.UpdateFreqMs = defaultUpdateFreqMs } - u, err := getControllerRequestUrl(s.config.ControllerAddress) + var err error + s.controllerAPIReqURL, err = getControllerRequestURL(s.config.ControllerAddress) if err != nil { - log.Error(err) + log.Errorf("An error occurred when parsing controller address: %v", err) return err } - s.controllerAPIReqUrl = u - err = s.updateBrokerData() - if err != nil { - log.Errorf( - "An error occurred when fetching broker data from controller API for the first time, Error: %v", - err, - ) + if err = s.updateBrokerData(); err != nil { + log.Errorf("An error occurred when fetching broker data from controller API for the first time, Error: %v", err) return err } go s.setupInterval() @@ -73,7 +70,7 @@ func (s *controllerBasedSelector) setupInterval() { } } -func getControllerRequestUrl(controllerAddress string) (string, error) { +func getControllerRequestURL(controllerAddress string) (string, error) { tokenized := strings.Split(controllerAddress, "://") addressWithScheme := controllerAddress if len(tokenized) > 1 { @@ -91,12 +88,9 @@ func getControllerRequestUrl(controllerAddress string) (string, error) { } func (s *controllerBasedSelector) createControllerRequest() (*http.Request, error) { - r, err := http.NewRequest("GET", s.controllerAPIReqUrl, nil) + r, err := http.NewRequest("GET", s.controllerAPIReqURL, nil) if err != nil { - return r, fmt.Errorf( - "Caught exception when creating controller API request: %v", - err, - ) + return r, fmt.Errorf("Caught exception when creating controller API request: %v", err) } for k, v := range controllerDefaultHTTPHeader { r.Header.Add(k, v) @@ -116,14 +110,18 @@ func (s *controllerBasedSelector) updateBrokerData() error { if err != nil { return fmt.Errorf("Got exceptions while sending controller API request: %v", err) } - defer resp.Body.Close() + defer func() { + if err := resp.Body.Close(); err != nil { + log.Error("Unable to close response body. ", err) + } + }() if resp.StatusCode == http.StatusOK { bodyBytes, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("An error occurred when reading controller API response: %v", err) } var c controllerResponse - if err = decodeJsonWithNumber(bodyBytes, &c); err != nil { + if err = decodeJSONWithNumber(bodyBytes, &c); err != nil { return fmt.Errorf("An error occurred when decoding controller API response: %v", err) } s.rwMux.Lock() diff --git a/pinot/controllerBasedBrokerSelector_test.go b/pinot/controllerBasedBrokerSelector_test.go index 2e98373..d3a28ea 100644 --- a/pinot/controllerBasedBrokerSelector_test.go +++ b/pinot/controllerBasedBrokerSelector_test.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "reflect" "strings" @@ -18,7 +17,7 @@ type MockHTTPClientSuccess struct { statusCode int } -func (m *MockHTTPClientSuccess) Do(req *http.Request) (*http.Response, error) { +func (m *MockHTTPClientSuccess) Do(_ *http.Request) (*http.Response, error) { r := &http.Response{} r.StatusCode = m.statusCode r.Body = m.body @@ -29,7 +28,7 @@ type MockHTTPClientFailure struct { err error } -func (m *MockHTTPClientFailure) Do(req *http.Request) (*http.Response, error) { +func (m *MockHTTPClientFailure) Do(_ *http.Request) (*http.Response, error) { return &http.Response{}, m.err } @@ -40,13 +39,13 @@ func TestControllerBasedBrokerSelectorInit(t *testing.T) { }, client: &MockHTTPClientSuccess{ statusCode: 200, - body: ioutil.NopCloser(strings.NewReader("{}")), + body: io.NopCloser(strings.NewReader("{}")), }, } err := s.init() assert.Nil(t, err) assert.Equal(t, s.config.UpdateFreqMs, 1000) - assert.Equal(t, s.controllerAPIReqUrl, "http://localhost:9000/v2/brokers/tables?state=ONLINE") + assert.Equal(t, s.controllerAPIReqURL, "http://localhost:9000/v2/brokers/tables?state=ONLINE") assert.ElementsMatch(t, s.allBrokerList, []string{}) } @@ -62,23 +61,36 @@ func TestControllerBasedBrokerSelectorInitError(t *testing.T) { err := s.init() assert.NotNil(t, err) assert.True(t, strings.Contains(err.Error(), "http client error")) + + s = &controllerBasedSelector{ + config: &ControllerConfig{ + ControllerAddress: "invalidControllerURL://host:9000", + }, + client: &MockHTTPClientFailure{ + err: errors.New("http client error"), + }, + } + err = s.init() + assert.NotNil(t, err) + assert.True(t, strings.Contains(err.Error(), "Unsupported controller URL scheme")) } func TestGetControllerRequestUrl(t *testing.T) { - u, err := getControllerRequestUrl("localhost:9000") + u, err := getControllerRequestURL("localhost:9000") assert.Nil(t, err) assert.Equal(t, "http://localhost:9000/v2/brokers/tables?state=ONLINE", u) - u, err = getControllerRequestUrl("https://host:1234") + u, err = getControllerRequestURL("https://host:1234") assert.Nil(t, err) assert.Equal(t, "https://host:1234/v2/brokers/tables?state=ONLINE", u) - u, err = getControllerRequestUrl("http://host:1234") + u, err = getControllerRequestURL("http://host:1234") assert.Nil(t, err) assert.Equal(t, "http://host:1234/v2/brokers/tables?state=ONLINE", u) - u, err = getControllerRequestUrl("smb://nope:1234") + u, err = getControllerRequestURL("smb://nope:1234") assert.NotNil(t, err) + assert.Equal(t, "", u) assert.True(t, strings.Contains(err.Error(), "Unsupported controller URL scheme: smb")) } @@ -105,7 +117,7 @@ func TestUpdateBrokerData(t *testing.T) { }, client: &MockHTTPClientSuccess{ statusCode: 200, - body: ioutil.NopCloser( + body: io.NopCloser( strings.NewReader( `{"baseballStats":[{"port":8000,"host":"172.17.0.2","instanceName":"Broker_172.17.0.2_8000"}]}`, ), @@ -158,7 +170,7 @@ func TestUpdateBrokerDataDecodeError(t *testing.T) { }, client: &MockHTTPClientSuccess{ statusCode: 200, - body: ioutil.NopCloser(strings.NewReader("{not a valid json")), + body: io.NopCloser(strings.NewReader("{not a valid json")), }, } err := s.updateBrokerData() @@ -168,7 +180,7 @@ func TestUpdateBrokerDataDecodeError(t *testing.T) { type errReader int -func (errReader) Read(p []byte) (n int, err error) { +func (errReader) Read(_ []byte) (n int, err error) { return 0, errors.New("test read error") } @@ -198,7 +210,7 @@ func TestUpdateBrokerDataUnexpectedHTTPStatus(t *testing.T) { }, client: &MockHTTPClientSuccess{ statusCode: 500, - body: ioutil.NopCloser(strings.NewReader("{}")), + body: io.NopCloser(strings.NewReader("{}")), }, } err := s.updateBrokerData() diff --git a/pinot/dynamicBrokerSelector.go b/pinot/dynamicBrokerSelector.go index 70aeaec..6290ca5 100644 --- a/pinot/dynamicBrokerSelector.go +++ b/pinot/dynamicBrokerSelector.go @@ -16,6 +16,7 @@ const ( brokerExternalViewPath = "EXTERNALVIEW/brokerResource" ) +// ReadZNode reads a ZNode content as bytes from Zookeeper type ReadZNode func(path string) ([]byte, error) type dynamicBrokerSelector struct { @@ -42,13 +43,10 @@ func (s *dynamicBrokerSelector) init() error { return err } s.readZNode = func(path string) ([]byte, error) { - if s.zkConn == nil { - return nil, fmt.Errorf("Zk Connection hasn't been initialized.") - } - node, _, err := s.zkConn.Get(s.externalViewZkPath) - if err != nil { + node, _, err2 := s.zkConn.Get(s.externalViewZkPath) + if err2 != nil { log.Errorf("Failed to read zk: %s, ExternalView path: %s\n", s.zkConfig.ZookeeperPath, s.externalViewZkPath) - return nil, err + return nil, err2 } return node, nil } @@ -67,14 +65,13 @@ func (s *dynamicBrokerSelector) init() error { func (s *dynamicBrokerSelector) setupWatcher() { for { - select { - case ev := <-s.externalViewZnodeWatch: - if ev.Err != nil { - log.Error("GetW watcher error", ev.Err) - } else if ev.Type == zk.EventNodeDataChanged { - s.refreshExternalView() + ev := <-s.externalViewZnodeWatch + if ev.Err != nil { + log.Error("GetW watcher error", ev.Err) + } else if ev.Type == zk.EventNodeDataChanged { + if err := s.refreshExternalView(); err != nil { + log.Errorf("Failed to refresh ExternalView, Error: %v\n", err) } - break } time.Sleep(100 * time.Millisecond) } @@ -82,7 +79,7 @@ func (s *dynamicBrokerSelector) setupWatcher() { func (s *dynamicBrokerSelector) refreshExternalView() error { if s.readZNode == nil { - return fmt.Errorf("No method defined to read from a ZNode.") + return fmt.Errorf("No method defined to read from a ZNode") } node, err := s.readZNode(s.externalViewZkPath) if err != nil { diff --git a/pinot/dynamicBrokerSelector_test.go b/pinot/dynamicBrokerSelector_test.go index 68eae9f..5fb9346 100644 --- a/pinot/dynamicBrokerSelector_test.go +++ b/pinot/dynamicBrokerSelector_test.go @@ -33,7 +33,7 @@ func TestExtractBrokerHostPort(t *testing.T) { func TestErrorInit(t *testing.T) { selector := &dynamicBrokerSelector{ tableAwareBrokerSelector: tableAwareBrokerSelector{ - tableBrokerMap: map[string][]string{"myTable": []string{}}, + tableBrokerMap: map[string][]string{"myTable": {}}, }, zkConfig: &ZookeeperConfig{ ZookeeperPath: []string{}, @@ -46,7 +46,7 @@ func TestErrorInit(t *testing.T) { func TestErrorRefreshExternalView(t *testing.T) { selector := &dynamicBrokerSelector{ tableAwareBrokerSelector: tableAwareBrokerSelector{ - tableBrokerMap: map[string][]string{"myTable": []string{}}, + tableBrokerMap: map[string][]string{"myTable": {}}, }, zkConfig: &ZookeeperConfig{ ZookeeperPath: []string{}, diff --git a/pinot/json.go b/pinot/json.go index 3faf60b..7868e94 100644 --- a/pinot/json.go +++ b/pinot/json.go @@ -5,10 +5,10 @@ import ( "encoding/json" ) -// decodeJsonWithNumber use the UseNumber option in std json, which works +// decodeJSONWithNumber use the UseNumber option in std json, which works // by first decode number into string, then back to converted type // see implementation of json.Number in std -func decodeJsonWithNumber(bodyBytes []byte, out interface{}) error { +func decodeJSONWithNumber(bodyBytes []byte, out interface{}) error { decoder := json.NewDecoder(bytes.NewReader(bodyBytes)) decoder.UseNumber() if err := decoder.Decode(out); err != nil { diff --git a/pinot/jsonAsyncHTTPClientTransport.go b/pinot/jsonAsyncHTTPClientTransport.go index bd9770e..29ebef2 100644 --- a/pinot/jsonAsyncHTTPClientTransport.go +++ b/pinot/jsonAsyncHTTPClientTransport.go @@ -33,7 +33,11 @@ func (t jsonAsyncHTTPClientTransport) execute(brokerAddress string, query *Reque if query.trace { requestJSON["trace"] = "true" } - jsonValue, _ := json.Marshal(requestJSON) + jsonValue, err := json.Marshal(requestJSON) + if err != nil { + log.Error("Unable to marshal request to JSON. ", err) + return nil, err + } req, err := createHTTPRequest(url, jsonValue, t.header) if err != nil { return nil, err @@ -43,7 +47,11 @@ func (t jsonAsyncHTTPClientTransport) execute(brokerAddress string, query *Reque log.Error("Got exceptions during sending request. ", err) return nil, err } - defer resp.Body.Close() + defer func() { + if err := resp.Body.Close(); err != nil { + log.Error("Got exceptions during closing response body. ", err) + } + }() if resp.StatusCode == http.StatusOK { bodyBytes, err := io.ReadAll(resp.Body) if err != nil { @@ -51,7 +59,7 @@ func (t jsonAsyncHTTPClientTransport) execute(brokerAddress string, query *Reque return nil, err } var brokerResponse BrokerResponse - if err = decodeJsonWithNumber(bodyBytes, &brokerResponse); err != nil { + if err = decodeJSONWithNumber(bodyBytes, &brokerResponse); err != nil { log.Error("Unable to unmarshal json response to a brokerResponse structure. ", err) return nil, err } diff --git a/pinot/jsonAsyncHTTPClientTransport_test.go b/pinot/jsonAsyncHTTPClientTransport_test.go index 218be1d..84f3ecc 100644 --- a/pinot/jsonAsyncHTTPClientTransport_test.go +++ b/pinot/jsonAsyncHTTPClientTransport_test.go @@ -50,5 +50,11 @@ func TestJsonAsyncHTTPClientTransport(t *testing.T) { query: "select * from baseballStats limit 10", }) assert.NotNil(t, err) - assert.True(t, strings.HasPrefix(err.Error(), "Post ")) + + _, err = transport.execute("localhos\t:8000", &Request{ + queryFormat: "sql", + query: "select * from baseballStats limit 10", + }) + assert.NotNil(t, err) + assert.True(t, strings.HasPrefix(err.Error(), "parse ")) } diff --git a/pinot/response.go b/pinot/response.go index a85dc47..d852fb5 100644 --- a/pinot/response.go +++ b/pinot/response.go @@ -1,6 +1,10 @@ package pinot -import "encoding/json" +import ( + "encoding/json" + + log "github.com/sirupsen/logrus" +) // BrokerResponse is the data structure for broker response. type BrokerResponse struct { @@ -94,24 +98,40 @@ func (r ResultTable) GetString(rowIndex int, columnIndex int) string { // GetInt returns a ResultTable int entry given row index and column index func (r ResultTable) GetInt(rowIndex int, columnIndex int) int32 { - val, _ := (r.Rows[rowIndex][columnIndex]).(json.Number).Int64() + val, err := (r.Rows[rowIndex][columnIndex]).(json.Number).Int64() + if err != nil { + log.Errorf("Error converting to int: %v", err) + return 0 + } return int32(val) } // GetLong returns a ResultTable long entry given row index and column index func (r ResultTable) GetLong(rowIndex int, columnIndex int) int64 { - val, _ := (r.Rows[rowIndex][columnIndex]).(json.Number).Int64() + val, err := (r.Rows[rowIndex][columnIndex]).(json.Number).Int64() + if err != nil { + log.Errorf("Error converting to long: %v", err) + return 0 + } return val } // GetFloat returns a ResultTable float entry given row index and column index func (r ResultTable) GetFloat(rowIndex int, columnIndex int) float32 { - val, _ := (r.Rows[rowIndex][columnIndex]).(json.Number).Float64() + val, err := (r.Rows[rowIndex][columnIndex]).(json.Number).Float64() + if err != nil { + log.Errorf("Error converting to float: %v", err) + return 0 + } return float32(val) } // GetDouble returns a ResultTable double entry given row index and column index func (r ResultTable) GetDouble(rowIndex int, columnIndex int) float64 { - val, _ := (r.Rows[rowIndex][columnIndex]).(json.Number).Float64() + val, err := (r.Rows[rowIndex][columnIndex]).(json.Number).Float64() + if err != nil { + log.Errorf("Error converting to double: %v", err) + return 0 + } return val } diff --git a/pinot/response_test.go b/pinot/response_test.go index 973598f..856d8ea 100644 --- a/pinot/response_test.go +++ b/pinot/response_test.go @@ -11,7 +11,7 @@ import ( func TestSqlSelectionQueryResponse(t *testing.T) { var brokerResponse BrokerResponse respBytes := []byte("{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"INT\",\"INT\",\"INT\",\"INT\",\"INT\",\"INT\",\"INT\",\"INT\",\"INT\",\"INT\",\"STRING\",\"INT\",\"INT\",\"STRING\",\"STRING\",\"INT\",\"INT\",\"INT\",\"INT\",\"INT\",\"INT\",\"INT\",\"STRING\",\"INT\",\"INT\"],\"columnNames\":[\"AtBatting\",\"G_old\",\"baseOnBalls\",\"caughtStealing\",\"doules\",\"groundedIntoDoublePlays\",\"hits\",\"hitsByPitch\",\"homeRuns\",\"intentionalWalks\",\"league\",\"numberOfGames\",\"numberOfGamesAsBatter\",\"playerID\",\"playerName\",\"playerStint\",\"runs\",\"runsBattedIn\",\"sacrificeFlies\",\"sacrificeHits\",\"stolenBases\",\"strikeouts\",\"teamID\",\"tripples\",\"yearID\"]},\"rows\":[[0,11,0,0,0,0,0,0,0,0,\"NL\",11,11,\"aardsda01\",\"David Allan\",1,0,0,0,0,0,0,\"SFN\",0,2004],[2,45,0,0,0,0,0,0,0,0,\"NL\",45,43,\"aardsda01\",\"David Allan\",1,0,0,0,1,0,0,\"CHN\",0,2006],[0,2,0,0,0,0,0,0,0,0,\"AL\",25,2,\"aardsda01\",\"David Allan\",1,0,0,0,0,0,0,\"CHA\",0,2007],[1,5,0,0,0,0,0,0,0,0,\"AL\",47,5,\"aardsda01\",\"David Allan\",1,0,0,0,0,0,1,\"BOS\",0,2008],[0,0,0,0,0,0,0,0,0,0,\"AL\",73,3,\"aardsda01\",\"David Allan\",1,0,0,0,0,0,0,\"SEA\",0,2009],[0,0,0,0,0,0,0,0,0,0,\"AL\",53,4,\"aardsda01\",\"David Allan\",1,0,0,0,0,0,0,\"SEA\",0,2010],[0,0,0,0,0,0,0,0,0,0,\"AL\",1,0,\"aardsda01\",\"David Allan\",1,0,0,0,0,0,0,\"NYA\",0,2012],[468,122,28,2,27,13,131,3,13,0,\"NL\",122,122,\"aaronha01\",\"Henry Louis\",1,58,69,4,6,2,39,\"ML1\",6,1954],[602,153,49,1,37,20,189,3,27,5,\"NL\",153,153,\"aaronha01\",\"Henry Louis\",1,105,106,4,7,3,61,\"ML1\",9,1955],[609,153,37,4,34,21,200,2,26,6,\"NL\",153,153,\"aaronha01\",\"Henry Louis\",1,106,92,7,5,2,54,\"ML1\",14,1956]]},\"exceptions\":[],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":1,\"numSegmentsMatched\":1,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":10,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":250,\"numGroupsLimitReached\":false,\"totalDocs\":97889,\"timeUsedMs\":6,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}") - err := decodeJsonWithNumber(respBytes, &brokerResponse) + err := decodeJSONWithNumber(respBytes, &brokerResponse) assert.Nil(t, err) assert.Equal(t, 0, len(brokerResponse.AggregationResults)) assert.Equal(t, 0, len(brokerResponse.Exceptions)) @@ -46,7 +46,7 @@ func TestSqlSelectionQueryResponse(t *testing.T) { func TestSqlAggregationQueryResponse(t *testing.T) { var brokerResponse BrokerResponse respBytes := []byte("{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"LONG\"],\"columnNames\":[\"cnt\"]},\"rows\":[[97889]]},\"exceptions\":[],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":1,\"numSegmentsMatched\":1,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":97889,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":0,\"numGroupsLimitReached\":false,\"totalDocs\":97889,\"timeUsedMs\":5,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}") - err := decodeJsonWithNumber(respBytes, &brokerResponse) + err := decodeJSONWithNumber(respBytes, &brokerResponse) assert.Nil(t, err) assert.Equal(t, 0, len(brokerResponse.AggregationResults)) assert.Equal(t, 0, len(brokerResponse.Exceptions)) @@ -81,7 +81,7 @@ func TestSqlAggregationQueryResponse(t *testing.T) { func TestSqlAggregationGroupByResponse(t *testing.T) { var brokerResponse BrokerResponse respBytes := []byte("{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"STRING\",\"LONG\",\"DOUBLE\"],\"columnNames\":[\"teamID\",\"cnt\",\"sum_homeRuns\"]},\"rows\":[[\"ANA\",337,1324.0],[\"BL2\",197,136.0],[\"ARI\",727,2715.0],[\"BL1\",48,24.0],[\"ALT\",17,2.0],[\"ATL\",1951,7312.0],[\"BFN\",122,105.0],[\"BL3\",36,32.0],[\"BFP\",26,20.0],[\"BAL\",2380,9164.0]]},\"exceptions\":[],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":1,\"numSegmentsMatched\":1,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":97889,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":195778,\"numGroupsLimitReached\":true,\"totalDocs\":97889,\"timeUsedMs\":24,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}") - err := decodeJsonWithNumber(respBytes, &brokerResponse) + err := decodeJSONWithNumber(respBytes, &brokerResponse) assert.Nil(t, err) assert.Equal(t, 0, len(brokerResponse.AggregationResults)) assert.Equal(t, 0, len(brokerResponse.Exceptions)) @@ -120,10 +120,23 @@ func TestSqlAggregationGroupByResponse(t *testing.T) { assert.Equal(t, float64(136.0), brokerResponse.ResultTable.GetDouble(1, 2)) } +func TestWrongTypeResponse(t *testing.T) { + var brokerResponse BrokerResponse + respBytes := []byte("{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"STRING\",\"LONG\",\"DOUBLE\"],\"columnNames\":[\"teamID\",\"cnt\",\"sum_homeRuns\"]},\"rows\":[[\"ANA\",9223372036854775808, 1e309]]},\"exceptions\":[],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":1,\"numSegmentsMatched\":1,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":97889,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":195778,\"numGroupsLimitReached\":true,\"totalDocs\":97889,\"timeUsedMs\":24,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}") + err := decodeJSONWithNumber(respBytes, &brokerResponse) + assert.Nil(t, err) + assert.Equal(t, "ANA", brokerResponse.ResultTable.GetString(0, 0)) + // Assert wrong type + assert.Equal(t, int32(0), brokerResponse.ResultTable.GetInt(0, 1)) + assert.Equal(t, int64(0), brokerResponse.ResultTable.GetLong(0, 1)) + assert.Equal(t, float32(0), brokerResponse.ResultTable.GetFloat(0, 2)) + assert.Equal(t, float64(0), brokerResponse.ResultTable.GetDouble(0, 2)) +} + func TestExceptionResponse(t *testing.T) { var brokerResponse BrokerResponse respBytes := []byte("{\"resultTable\":{\"dataSchema\":{\"columnDataTypes\":[\"DOUBLE\"],\"columnNames\":[\"max(league)\"]},\"rows\":[]},\"exceptions\":[{\"errorCode\":200,\"message\":\"QueryExecutionError:\\njava.lang.NumberFormatException: For input string: \\\"UA\\\"\\n\\tat sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)\\n\\tat sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)\\n\\tat java.lang.Double.parseDouble(Double.java:538)\\n\\tat org.apache.pinot.core.segment.index.readers.StringDictionary.getDoubleValue(StringDictionary.java:58)\\n\\tat org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator.getNextBlock(DictionaryBasedAggregationOperator.java:81)\\n\\tat org.apache.pinot.core.operator.query.DictionaryBasedAggregationOperator.getNextBlock(DictionaryBasedAggregationOperator.java:47)\\n\\tat org.apache.pinot.core.operator.BaseOperator.nextBlock(BaseOperator.java:48)\\n\\tat org.apache.pinot.core.operator.CombineOperator$1.runJob(CombineOperator.java:102)\\n\\tat org.apache.pinot.core.util.trace.TraceRunnable.run(TraceRunnable.java:40)\\n\\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\\n\\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\\n\\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\\n\\tat shaded.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)\\n\\tat shaded.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)\"}],\"numServersQueried\":1,\"numServersResponded\":1,\"numSegmentsQueried\":1,\"numSegmentsProcessed\":0,\"numSegmentsMatched\":0,\"numConsumingSegmentsQueried\":0,\"numDocsScanned\":0,\"numEntriesScannedInFilter\":0,\"numEntriesScannedPostFilter\":0,\"numGroupsLimitReached\":false,\"totalDocs\":97889,\"timeUsedMs\":5,\"segmentStatistics\":[],\"traceInfo\":{},\"minConsumingFreshnessTimeMs\":0}") - err := decodeJsonWithNumber(respBytes, &brokerResponse) + err := decodeJSONWithNumber(respBytes, &brokerResponse) assert.Nil(t, err) assert.Equal(t, 0, len(brokerResponse.AggregationResults)) assert.Equal(t, 1, len(brokerResponse.Exceptions)) diff --git a/pinot/simplebrokerselector.go b/pinot/simplebrokerselector.go index fdaf8db..156e237 100644 --- a/pinot/simplebrokerselector.go +++ b/pinot/simplebrokerselector.go @@ -16,9 +16,10 @@ func (s *simpleBrokerSelector) init() error { return nil } -func (s *simpleBrokerSelector) selectBroker(table string) (string, error) { +func (s *simpleBrokerSelector) selectBroker(_ string) (string, error) { if len(s.brokerList) == 0 { return "", fmt.Errorf("No pre-configured broker lists set in simpleBrokerSelector") } + // #nosec G404 return s.brokerList[rand.Intn(len(s.brokerList))], nil } diff --git a/pinot/tableAwareBrokerSelector.go b/pinot/tableAwareBrokerSelector.go index 114d953..fe21f77 100644 --- a/pinot/tableAwareBrokerSelector.go +++ b/pinot/tableAwareBrokerSelector.go @@ -40,6 +40,7 @@ func (s *tableAwareBrokerSelector) selectBroker(table string) (string, error) { return "", fmt.Errorf("No available broker found for table: %s", table) } } + // #nosec G404 return brokerList[rand.Intn(len(brokerList))], nil } diff --git a/pinot/tableAwareBrokerSelector_test.go b/pinot/tableAwareBrokerSelector_test.go index a12b2d7..8f9f95b 100644 --- a/pinot/tableAwareBrokerSelector_test.go +++ b/pinot/tableAwareBrokerSelector_test.go @@ -14,7 +14,7 @@ func TestExtractTableName(t *testing.T) { func TestSelectBroker(t *testing.T) { selector := &tableAwareBrokerSelector{ - tableBrokerMap: map[string][]string{"myTable": []string{"localhost:8000"}}, + tableBrokerMap: map[string][]string{"myTable": {"localhost:8000"}}, allBrokerList: []string{"localhost:8000"}, } broker, err := selector.selectBroker("") @@ -29,7 +29,7 @@ func TestSelectBroker(t *testing.T) { func TestErrorSelectBroker(t *testing.T) { emptySelector := &tableAwareBrokerSelector{ - tableBrokerMap: map[string][]string{"myTable": []string{}}, + tableBrokerMap: map[string][]string{"myTable": {}}, } _, err := emptySelector.selectBroker("") assert.NotNil(t, err)