From 84736d4d709ab368706126ece92c726330c7eaf7 Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Sat, 3 Feb 2024 00:10:26 -0800 Subject: [PATCH] Support multistage query engine --- integration-tests/batch_quickstart_test.go | 28 ++++++++++++++++++++++ pinot/config.go | 2 ++ pinot/connection.go | 14 ++++++----- pinot/connectionFactory.go | 3 +++ pinot/connectionFactory_test.go | 25 +++++++++++++++---- pinot/jsonAsyncHTTPClientTransport.go | 12 +++++++++- pinot/jsonAsyncHTTPClientTransport_test.go | 9 +++---- pinot/request.go | 7 +++--- 8 files changed, 82 insertions(+), 18 deletions(-) diff --git a/integration-tests/batch_quickstart_test.go b/integration-tests/batch_quickstart_test.go index c0f0d08..1803541 100644 --- a/integration-tests/batch_quickstart_test.go +++ b/integration-tests/batch_quickstart_test.go @@ -93,6 +93,32 @@ func getPinotClientFromBrokerAndCustomHTTPClient() *pinot.Connection { return pinotClient } +func getPinotClientFromConfig() *pinot.Connection { + pinotClient, err := pinot.NewWithConfig(&pinot.ClientConfig{ + BrokerList: []string{"localhost:" + brokerPort}, + HTTPTimeout: 1500 * time.Millisecond, + ExtraHTTPHeader: map[string]string{}, + UseMultistageEngine: true, + }) + if err != nil { + log.Fatalln(err) + } + return pinotClient +} + +func getPinotClientFromConfigAndCustomHTTPClient() *pinot.Connection { + pinotClient, err := pinot.NewWithConfigAndClient(&pinot.ClientConfig{ + BrokerList: []string{"localhost:" + brokerPort}, + HTTPTimeout: 1500 * time.Millisecond, + ExtraHTTPHeader: map[string]string{}, + UseMultistageEngine: true, + }, getCustomHTTPClient()) + if err != nil { + log.Fatalln(err) + } + return pinotClient +} + // TestSendingQueriesToPinot tests sending queries to Pinot using different Pinot clients. // This test requires a Pinot cluster running locally with binary not docker. // You can change the ports by setting the environment variables ZOOKEEPER_PORT, CONTROLLER_PORT, and BROKER_PORT. @@ -101,9 +127,11 @@ func TestSendingQueriesToPinot(t *testing.T) { getPinotClientFromZookeeper(), getPinotClientFromController(), getPinotClientFromBroker(), + getPinotClientFromConfig(), getPinotClientFromZookeeperAndCustomHTTPClient(), getPinotClientFromControllerAndCustomHTTPClient(), getPinotClientFromBrokerAndCustomHTTPClient(), + getPinotClientFromConfigAndCustomHTTPClient(), } table := "baseballStats" diff --git a/pinot/config.go b/pinot/config.go index 765cd40..fcf00b2 100644 --- a/pinot/config.go +++ b/pinot/config.go @@ -14,6 +14,8 @@ type ClientConfig struct { BrokerList []string // HTTP request timeout in your broker query for API requests HTTPTimeout time.Duration + // UseMultistageEngine is a flag to enable multistage query execution engine + UseMultistageEngine bool } // ZookeeperConfig describes how to config Pinot Zookeeper connection diff --git a/pinot/connection.go b/pinot/connection.go index 5fae778..0b2d39b 100644 --- a/pinot/connection.go +++ b/pinot/connection.go @@ -6,9 +6,10 @@ import ( // Connection to Pinot, normally created through calls to the {@link ConnectionFactory}. type Connection struct { - transport clientTransport - brokerSelector brokerSelector - trace bool + transport clientTransport + brokerSelector brokerSelector + trace bool + useMultistageEngine bool } // ExecuteSQL for a given table @@ -19,9 +20,10 @@ func (c *Connection) ExecuteSQL(table string, query string) (*BrokerResponse, er return nil, err } brokerResp, err := c.transport.execute(brokerAddress, &Request{ - queryFormat: "sql", - query: query, - trace: c.trace, + queryFormat: "sql", + query: query, + trace: c.trace, + useMultistageEngine: c.useMultistageEngine, }) if err != nil { log.Errorf("Caught exception to execute SQL query %s, Error: %v\n", query, err) diff --git a/pinot/connectionFactory.go b/pinot/connectionFactory.go index efa8e0d..68ab802 100644 --- a/pinot/connectionFactory.go +++ b/pinot/connectionFactory.go @@ -81,6 +81,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con brokerSelector: &dynamicBrokerSelector{ zkConfig: config.ZkConfig, }, + useMultistageEngine: config.UseMultistageEngine, } } if config.BrokerList != nil && len(config.BrokerList) > 0 { @@ -89,6 +90,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con brokerSelector: &simpleBrokerSelector{ brokerList: config.BrokerList, }, + useMultistageEngine: config.UseMultistageEngine, } } if config.ControllerConfig != nil { @@ -98,6 +100,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con config: config.ControllerConfig, client: http.DefaultClient, }, + useMultistageEngine: config.UseMultistageEngine, } } if conn != nil { diff --git a/pinot/connectionFactory_test.go b/pinot/connectionFactory_test.go index 2e62818..d0dd7d4 100644 --- a/pinot/connectionFactory_test.go +++ b/pinot/connectionFactory_test.go @@ -9,7 +9,7 @@ import ( ) func TestPinotClients(t *testing.T) { - pinotClient1, err := NewFromZookeeper([]string{"localhost:2181"}, "", "QuickStartCluster") + pinotClient1, err := NewFromZookeeper([]string{"localhost:12181"}, "", "QuickStartCluster") assert.NotNil(t, pinotClient1) assert.NotNil(t, pinotClient1.brokerSelector) assert.NotNil(t, pinotClient1.transport) @@ -17,7 +17,7 @@ func TestPinotClients(t *testing.T) { assert.NotNil(t, err) pinotClient2, err := NewWithConfig(&ClientConfig{ ZkConfig: &ZookeeperConfig{ - ZookeeperPath: []string{"localhost:2181"}, + ZookeeperPath: []string{"localhost:12181"}, PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"), SessionTimeoutSec: defaultZkSessionTimeoutSec, }, @@ -30,7 +30,7 @@ func TestPinotClients(t *testing.T) { assert.NotNil(t, pinotClient2.transport) // Since there is no zk setup, so an error will be raised assert.NotNil(t, err) - pinotClient3, err := NewFromController("localhost:9000") + pinotClient3, err := NewFromController("localhost:19000") assert.NotNil(t, pinotClient3) assert.NotNil(t, pinotClient3.brokerSelector) assert.NotNil(t, pinotClient3.transport) @@ -39,9 +39,26 @@ func TestPinotClients(t *testing.T) { _, err = NewWithConfig(&ClientConfig{}) assert.NotNil(t, err) assert.True(t, strings.Contains(err.Error(), "please specify")) + pinotClient4, err := NewWithConfig(&ClientConfig{ + ZkConfig: &ZookeeperConfig{ + ZookeeperPath: []string{"localhost:12181"}, + PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"), + SessionTimeoutSec: defaultZkSessionTimeoutSec, + }, + ExtraHTTPHeader: map[string]string{ + "k1": "v1", + }, + UseMultistageEngine: true, + }) + assert.NotNil(t, pinotClient4) + assert.NotNil(t, pinotClient4.brokerSelector) + assert.NotNil(t, pinotClient4.transport) + assert.True(t, pinotClient4.useMultistageEngine) + // Since there is no zk setup, so an error will be raised + assert.NotNil(t, err) } -func TestPinotWithHttpTImeout(t *testing.T) { +func TestPinotWithHttpTimeout(t *testing.T) { pinotClient, err := NewWithConfig(&ClientConfig{ // Hit an unreachable port BrokerList: []string{"www.google.com:81"}, diff --git a/pinot/jsonAsyncHTTPClientTransport.go b/pinot/jsonAsyncHTTPClientTransport.go index 29ebef2..7a0049e 100644 --- a/pinot/jsonAsyncHTTPClientTransport.go +++ b/pinot/jsonAsyncHTTPClientTransport.go @@ -27,8 +27,18 @@ func (t jsonAsyncHTTPClientTransport) execute(brokerAddress string, query *Reque url := fmt.Sprintf(getQueryTemplate(query.queryFormat, brokerAddress), brokerAddress) requestJSON := map[string]string{} requestJSON[query.queryFormat] = query.query + queryOptions := "" if query.queryFormat == "sql" { - requestJSON["queryOptions"] = "groupByMode=sql;responseFormat=sql" + queryOptions = "groupByMode=sql;responseFormat=sql" + } + if query.useMultistageEngine { + if queryOptions != "" { + queryOptions += ";" + } + queryOptions += "useMultistageEngine=true" + } + if queryOptions != "" { + requestJSON["queryOptions"] = queryOptions } if query.trace { requestJSON["trace"] = "true" diff --git a/pinot/jsonAsyncHTTPClientTransport_test.go b/pinot/jsonAsyncHTTPClientTransport_test.go index 84f3ecc..cf9e05e 100644 --- a/pinot/jsonAsyncHTTPClientTransport_test.go +++ b/pinot/jsonAsyncHTTPClientTransport_test.go @@ -51,10 +51,11 @@ func TestJsonAsyncHTTPClientTransport(t *testing.T) { }) assert.NotNil(t, err) - _, err = transport.execute("localhos\t:8000", &Request{ - queryFormat: "sql", - query: "select * from baseballStats limit 10", + _, err = transport.execute("localhost:18000", &Request{ + queryFormat: "sql", + query: "select * from baseballStats limit 10", + useMultistageEngine: true, }) assert.NotNil(t, err) - assert.True(t, strings.HasPrefix(err.Error(), "parse ")) + assert.True(t, strings.HasPrefix(err.Error(), "Post ")) } diff --git a/pinot/request.go b/pinot/request.go index 702a040..1471926 100644 --- a/pinot/request.go +++ b/pinot/request.go @@ -2,7 +2,8 @@ package pinot // Request is used in server request to host multiple pinot query types, like PQL, SQL. type Request struct { - queryFormat string - query string - trace bool + queryFormat string + query string + trace bool + useMultistageEngine bool }