diff --git a/integration-tests/batch_quickstart_test.go b/integration-tests/batch_quickstart_test.go index c0f0d08..1803541 100644 --- a/integration-tests/batch_quickstart_test.go +++ b/integration-tests/batch_quickstart_test.go @@ -93,6 +93,32 @@ func getPinotClientFromBrokerAndCustomHTTPClient() *pinot.Connection { return pinotClient } +func getPinotClientFromConfig() *pinot.Connection { + pinotClient, err := pinot.NewWithConfig(&pinot.ClientConfig{ + BrokerList: []string{"localhost:" + brokerPort}, + HTTPTimeout: 1500 * time.Millisecond, + ExtraHTTPHeader: map[string]string{}, + UseMultistageEngine: true, + }) + if err != nil { + log.Fatalln(err) + } + return pinotClient +} + +func getPinotClientFromConfigAndCustomHTTPClient() *pinot.Connection { + pinotClient, err := pinot.NewWithConfigAndClient(&pinot.ClientConfig{ + BrokerList: []string{"localhost:" + brokerPort}, + HTTPTimeout: 1500 * time.Millisecond, + ExtraHTTPHeader: map[string]string{}, + UseMultistageEngine: true, + }, getCustomHTTPClient()) + if err != nil { + log.Fatalln(err) + } + return pinotClient +} + // TestSendingQueriesToPinot tests sending queries to Pinot using different Pinot clients. // This test requires a Pinot cluster running locally with binary not docker. // You can change the ports by setting the environment variables ZOOKEEPER_PORT, CONTROLLER_PORT, and BROKER_PORT. @@ -101,9 +127,11 @@ func TestSendingQueriesToPinot(t *testing.T) { getPinotClientFromZookeeper(), getPinotClientFromController(), getPinotClientFromBroker(), + getPinotClientFromConfig(), getPinotClientFromZookeeperAndCustomHTTPClient(), getPinotClientFromControllerAndCustomHTTPClient(), getPinotClientFromBrokerAndCustomHTTPClient(), + getPinotClientFromConfigAndCustomHTTPClient(), } table := "baseballStats" diff --git a/pinot/config.go b/pinot/config.go index 765cd40..fcf00b2 100644 --- a/pinot/config.go +++ b/pinot/config.go @@ -14,6 +14,8 @@ type ClientConfig struct { BrokerList []string // HTTP request timeout in your broker query for API requests HTTPTimeout time.Duration + // UseMultistageEngine is a flag to enable multistage query execution engine + UseMultistageEngine bool } // ZookeeperConfig describes how to config Pinot Zookeeper connection diff --git a/pinot/connection.go b/pinot/connection.go index 5fae778..0b2d39b 100644 --- a/pinot/connection.go +++ b/pinot/connection.go @@ -6,9 +6,10 @@ import ( // Connection to Pinot, normally created through calls to the {@link ConnectionFactory}. type Connection struct { - transport clientTransport - brokerSelector brokerSelector - trace bool + transport clientTransport + brokerSelector brokerSelector + trace bool + useMultistageEngine bool } // ExecuteSQL for a given table @@ -19,9 +20,10 @@ func (c *Connection) ExecuteSQL(table string, query string) (*BrokerResponse, er return nil, err } brokerResp, err := c.transport.execute(brokerAddress, &Request{ - queryFormat: "sql", - query: query, - trace: c.trace, + queryFormat: "sql", + query: query, + trace: c.trace, + useMultistageEngine: c.useMultistageEngine, }) if err != nil { log.Errorf("Caught exception to execute SQL query %s, Error: %v\n", query, err) diff --git a/pinot/connectionFactory.go b/pinot/connectionFactory.go index efa8e0d..68ab802 100644 --- a/pinot/connectionFactory.go +++ b/pinot/connectionFactory.go @@ -81,6 +81,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con brokerSelector: &dynamicBrokerSelector{ zkConfig: config.ZkConfig, }, + useMultistageEngine: config.UseMultistageEngine, } } if config.BrokerList != nil && len(config.BrokerList) > 0 { @@ -89,6 +90,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con brokerSelector: &simpleBrokerSelector{ brokerList: config.BrokerList, }, + useMultistageEngine: config.UseMultistageEngine, } } if config.ControllerConfig != nil { @@ -98,6 +100,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con config: config.ControllerConfig, client: http.DefaultClient, }, + useMultistageEngine: config.UseMultistageEngine, } } if conn != nil { diff --git a/pinot/connectionFactory_test.go b/pinot/connectionFactory_test.go index 2e62818..7d94cbb 100644 --- a/pinot/connectionFactory_test.go +++ b/pinot/connectionFactory_test.go @@ -39,9 +39,26 @@ func TestPinotClients(t *testing.T) { _, err = NewWithConfig(&ClientConfig{}) assert.NotNil(t, err) assert.True(t, strings.Contains(err.Error(), "please specify")) + pinotClient4, err := NewWithConfig(&ClientConfig{ + ZkConfig: &ZookeeperConfig{ + ZookeeperPath: []string{"localhost:2181"}, + PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"), + SessionTimeoutSec: defaultZkSessionTimeoutSec, + }, + ExtraHTTPHeader: map[string]string{ + "k1": "v1", + }, + UseMultistageEngine: true, + }) + assert.NotNil(t, pinotClient4) + assert.NotNil(t, pinotClient4.brokerSelector) + assert.NotNil(t, pinotClient4.transport) + assert.True(t, pinotClient4.useMultistageEngine) + // Since there is no zk setup, so an error will be raised + assert.NotNil(t, err) } -func TestPinotWithHttpTImeout(t *testing.T) { +func TestPinotWithHttpTimeout(t *testing.T) { pinotClient, err := NewWithConfig(&ClientConfig{ // Hit an unreachable port BrokerList: []string{"www.google.com:81"}, diff --git a/pinot/jsonAsyncHTTPClientTransport.go b/pinot/jsonAsyncHTTPClientTransport.go index 29ebef2..7a0049e 100644 --- a/pinot/jsonAsyncHTTPClientTransport.go +++ b/pinot/jsonAsyncHTTPClientTransport.go @@ -27,8 +27,18 @@ func (t jsonAsyncHTTPClientTransport) execute(brokerAddress string, query *Reque url := fmt.Sprintf(getQueryTemplate(query.queryFormat, brokerAddress), brokerAddress) requestJSON := map[string]string{} requestJSON[query.queryFormat] = query.query + queryOptions := "" if query.queryFormat == "sql" { - requestJSON["queryOptions"] = "groupByMode=sql;responseFormat=sql" + queryOptions = "groupByMode=sql;responseFormat=sql" + } + if query.useMultistageEngine { + if queryOptions != "" { + queryOptions += ";" + } + queryOptions += "useMultistageEngine=true" + } + if queryOptions != "" { + requestJSON["queryOptions"] = queryOptions } if query.trace { requestJSON["trace"] = "true" diff --git a/pinot/request.go b/pinot/request.go index 702a040..1471926 100644 --- a/pinot/request.go +++ b/pinot/request.go @@ -2,7 +2,8 @@ package pinot // Request is used in server request to host multiple pinot query types, like PQL, SQL. type Request struct { - queryFormat string - query string - trace bool + queryFormat string + query string + trace bool + useMultistageEngine bool }