From ceb5b735ee4b16288d6c2e2ce2561de3bb36a174 Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Sat, 3 Feb 2024 00:10:26 -0800 Subject: [PATCH] Support multistage query engine --- README.md | 30 ++++++--- examples/multistage-quickstart/main.go | 72 ++++++++++++++++++++++ integration-tests/batch_quickstart_test.go | 69 +++++++++++++++++---- pinot/config.go | 2 + pinot/connection.go | 19 ++++-- pinot/connectionFactory.go | 3 + pinot/connectionFactory_test.go | 42 +++++++++++-- pinot/jsonAsyncHTTPClientTransport.go | 12 +++- pinot/jsonAsyncHTTPClientTransport_test.go | 9 +-- pinot/request.go | 7 ++- scripts/start-pinot-quickstart.sh | 30 ++++++--- 11 files changed, 246 insertions(+), 49 deletions(-) create mode 100644 examples/multistage-quickstart/main.go diff --git a/README.md b/README.md index 965a4a3..e13b878 100644 --- a/README.md +++ b/README.md @@ -55,15 +55,6 @@ go build ./examples/json-batch-quickstart ./json-batch-quickstart ``` -## Pinot Live Demo cluster - -Build and run the example application to query from Pinot Batch Quickstart - -``` -go build ./examples/pinot-live-demo -./pinot-live-demo -``` - # Usage ## Create a Pinot Connection @@ -176,6 +167,27 @@ if err != nil { log.Infof("Query Stats: response time - %d ms, scanned docs - %d, total docs - %d", brokerResp.TimeUsedMs, brokerResp.NumDocsScanned, brokerResp.TotalDocs) ``` +## Query Pinot with Multi-Stage Engine + +Please see this [example](https://github.com/startreedata/pinot-client-go/blob/master/examples/multistage-quickstart/main.go) for your reference. + +How to run it: + +``` +go build ./examples/multistage-quickstart +./multistage-quickstart +``` + +Code snippet: + +``` +pinotClient, err := pinot.NewFromZookeeper([]string{"localhost:2123"}, "", "QuickStartCluster") +if err != nil { + log.Error(err) +} +pinotClient.UseMultistageEngine(true) +``` + ## Response Format Query Response is defined as the struct of following: diff --git a/examples/multistage-quickstart/main.go b/examples/multistage-quickstart/main.go new file mode 100644 index 0000000..4f2979c --- /dev/null +++ b/examples/multistage-quickstart/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "encoding/json" + "fmt" + + pinot "github.com/startreedata/pinot-client-go/pinot" + + log "github.com/sirupsen/logrus" +) + +func main() { + pinotClient, err := pinot.NewFromZookeeper([]string{"localhost:2123"}, "", "QuickStartCluster") + if err != nil { + log.Error(err) + } + pinotClient.UseMultistageEngine(true) + table := "baseballStats" + pinotQueries := []string{ + "select * from baseballStats limit 10", + "select count(*) as cnt from baseballStats limit 1", + "select count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats limit 1", + "select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10", + "select distinctCount(league) as unique_league_cnt from baseballStats limit 10", + } + + log.Infof("Querying SQL") + for _, query := range pinotQueries { + log.Infof("Trying to query Pinot: %v", query) + brokerResp, err := pinotClient.ExecuteSQL(table, query) + if err != nil { + log.Error(err) + } + printBrokerResp(brokerResp) + } +} + +func printBrokerResp(brokerResp *pinot.BrokerResponse) { + log.Infof("Query Stats: response time - %d ms, scanned docs - %d, total docs - %d", brokerResp.TimeUsedMs, brokerResp.NumDocsScanned, brokerResp.TotalDocs) + if brokerResp.Exceptions != nil && len(brokerResp.Exceptions) > 0 { + jsonBytes, _ := json.Marshal(brokerResp.Exceptions) + log.Infof("brokerResp.Exceptions:\n%s\n", jsonBytes) + return + } + if brokerResp.ResultTable != nil { + jsonBytes, _ := json.Marshal(brokerResp.ResultTable) + log.Infof("brokerResp.ResultTable:\n%s\n", jsonBytes) + line := "" + for c := 0; c < brokerResp.ResultTable.GetColumnCount(); c++ { + line += fmt.Sprintf("%s(%s)\t", brokerResp.ResultTable.GetColumnName(c), brokerResp.ResultTable.GetColumnDataType(c)) + } + line += "\n" + for r := 0; r < brokerResp.ResultTable.GetRowCount(); r++ { + for c := 0; c < brokerResp.ResultTable.GetColumnCount(); c++ { + line += fmt.Sprintf("%v\t", brokerResp.ResultTable.Get(r, c)) + } + line += "\n" + } + log.Infof("ResultTable:\n%s", line) + return + } + if brokerResp.AggregationResults != nil { + jsonBytes, _ := json.Marshal(brokerResp.AggregationResults) + log.Infof("brokerResp.AggregationResults:\n%s\n", jsonBytes) + return + } + if brokerResp.SelectionResults != nil { + jsonBytes, _ := json.Marshal(brokerResp.SelectionResults) + log.Infof("brokerResp.SelectionResults:\n%s\n", jsonBytes) + return + } +} diff --git a/integration-tests/batch_quickstart_test.go b/integration-tests/batch_quickstart_test.go index c0f0d08..1175948 100644 --- a/integration-tests/batch_quickstart_test.go +++ b/integration-tests/batch_quickstart_test.go @@ -28,27 +28,30 @@ var ( brokerPort = getEnv("BROKER_PORT", "8000") ) -func getPinotClientFromZookeeper() *pinot.Connection { +func getPinotClientFromZookeeper(useMultistageEngine bool) *pinot.Connection { pinotClient, err := pinot.NewFromZookeeper([]string{"localhost:" + zookeeperPort}, "", "QuickStartCluster") if err != nil { log.Fatalln(err) } + pinotClient.UseMultistageEngine(useMultistageEngine) return pinotClient } -func getPinotClientFromController() *pinot.Connection { +func getPinotClientFromController(useMultistageEngine bool) *pinot.Connection { pinotClient, err := pinot.NewFromController("localhost:" + controllerPort) if err != nil { log.Fatalln(err) } + pinotClient.UseMultistageEngine(useMultistageEngine) return pinotClient } -func getPinotClientFromBroker() *pinot.Connection { +func getPinotClientFromBroker(useMultistageEngine bool) *pinot.Connection { pinotClient, err := pinot.NewFromBrokerList([]string{"localhost:" + brokerPort}) if err != nil { log.Fatalln(err) } + pinotClient.UseMultistageEngine(useMultistageEngine) return pinotClient } @@ -69,27 +72,56 @@ func getCustomHTTPClient() *http.Client { return httpClient } -func getPinotClientFromZookeeperAndCustomHTTPClient() *pinot.Connection { +func getPinotClientFromZookeeperAndCustomHTTPClient(useMultistageEngine bool) *pinot.Connection { pinotClient, err := pinot.NewFromZookeeperAndClient([]string{"localhost:" + zookeeperPort}, "", "QuickStartCluster", getCustomHTTPClient()) if err != nil { log.Fatalln(err) } + pinotClient.UseMultistageEngine(useMultistageEngine) return pinotClient } -func getPinotClientFromControllerAndCustomHTTPClient() *pinot.Connection { +func getPinotClientFromControllerAndCustomHTTPClient(useMultistageEngine bool) *pinot.Connection { pinotClient, err := pinot.NewFromControllerAndClient("localhost:"+controllerPort, getCustomHTTPClient()) if err != nil { log.Fatalln(err) } + pinotClient.UseMultistageEngine(useMultistageEngine) return pinotClient } -func getPinotClientFromBrokerAndCustomHTTPClient() *pinot.Connection { +func getPinotClientFromBrokerAndCustomHTTPClient(useMultistageEngine bool) *pinot.Connection { pinotClient, err := pinot.NewFromBrokerListAndClient([]string{"localhost:" + brokerPort}, getCustomHTTPClient()) if err != nil { log.Fatalln(err) } + pinotClient.UseMultistageEngine(useMultistageEngine) + return pinotClient +} + +func getPinotClientFromConfig(useMultistageEngine bool) *pinot.Connection { + pinotClient, err := pinot.NewWithConfig(&pinot.ClientConfig{ + BrokerList: []string{"localhost:" + brokerPort}, + HTTPTimeout: 1500 * time.Millisecond, + ExtraHTTPHeader: map[string]string{}, + }) + if err != nil { + log.Fatalln(err) + } + pinotClient.UseMultistageEngine(useMultistageEngine) + return pinotClient +} + +func getPinotClientFromConfigAndCustomHTTPClient(useMultistageEngine bool) *pinot.Connection { + pinotClient, err := pinot.NewWithConfigAndClient(&pinot.ClientConfig{ + BrokerList: []string{"localhost:" + brokerPort}, + HTTPTimeout: 1500 * time.Millisecond, + ExtraHTTPHeader: map[string]string{}, + }, getCustomHTTPClient()) + if err != nil { + log.Fatalln(err) + } + pinotClient.UseMultistageEngine(useMultistageEngine) return pinotClient } @@ -98,12 +130,23 @@ func getPinotClientFromBrokerAndCustomHTTPClient() *pinot.Connection { // You can change the ports by setting the environment variables ZOOKEEPER_PORT, CONTROLLER_PORT, and BROKER_PORT. func TestSendingQueriesToPinot(t *testing.T) { pinotClients := []*pinot.Connection{ - getPinotClientFromZookeeper(), - getPinotClientFromController(), - getPinotClientFromBroker(), - getPinotClientFromZookeeperAndCustomHTTPClient(), - getPinotClientFromControllerAndCustomHTTPClient(), - getPinotClientFromBrokerAndCustomHTTPClient(), + getPinotClientFromZookeeper(false), + getPinotClientFromController(false), + getPinotClientFromBroker(false), + getPinotClientFromConfig(false), + getPinotClientFromZookeeperAndCustomHTTPClient(false), + getPinotClientFromControllerAndCustomHTTPClient(false), + getPinotClientFromBrokerAndCustomHTTPClient(false), + getPinotClientFromConfigAndCustomHTTPClient(false), + + getPinotClientFromZookeeper(true), + getPinotClientFromController(true), + getPinotClientFromBroker(true), + getPinotClientFromConfig(true), + getPinotClientFromZookeeperAndCustomHTTPClient(true), + getPinotClientFromControllerAndCustomHTTPClient(true), + getPinotClientFromBrokerAndCustomHTTPClient(true), + getPinotClientFromConfigAndCustomHTTPClient(true), } table := "baseballStats" @@ -113,7 +156,7 @@ func TestSendingQueriesToPinot(t *testing.T) { log.Printf("Querying SQL") for _, query := range pinotQueries { - for i := 0; i < 100; i++ { + for i := 0; i < 200; i++ { log.Printf("Trying to query Pinot: %v\n", query) brokerResp, err := pinotClients[i%len(pinotClients)].ExecuteSQL(table, query) assert.Nil(t, err) diff --git a/pinot/config.go b/pinot/config.go index 765cd40..fcf00b2 100644 --- a/pinot/config.go +++ b/pinot/config.go @@ -14,6 +14,8 @@ type ClientConfig struct { BrokerList []string // HTTP request timeout in your broker query for API requests HTTPTimeout time.Duration + // UseMultistageEngine is a flag to enable multistage query execution engine + UseMultistageEngine bool } // ZookeeperConfig describes how to config Pinot Zookeeper connection diff --git a/pinot/connection.go b/pinot/connection.go index 5fae778..7f3976c 100644 --- a/pinot/connection.go +++ b/pinot/connection.go @@ -6,9 +6,15 @@ import ( // Connection to Pinot, normally created through calls to the {@link ConnectionFactory}. type Connection struct { - transport clientTransport - brokerSelector brokerSelector - trace bool + transport clientTransport + brokerSelector brokerSelector + trace bool + useMultistageEngine bool +} + +// UseMultistageEngine for the connection +func (c *Connection) UseMultistageEngine(useMultistageEngine bool) { + c.useMultistageEngine = useMultistageEngine } // ExecuteSQL for a given table @@ -19,9 +25,10 @@ func (c *Connection) ExecuteSQL(table string, query string) (*BrokerResponse, er return nil, err } brokerResp, err := c.transport.execute(brokerAddress, &Request{ - queryFormat: "sql", - query: query, - trace: c.trace, + queryFormat: "sql", + query: query, + trace: c.trace, + useMultistageEngine: c.useMultistageEngine, }) if err != nil { log.Errorf("Caught exception to execute SQL query %s, Error: %v\n", query, err) diff --git a/pinot/connectionFactory.go b/pinot/connectionFactory.go index efa8e0d..68ab802 100644 --- a/pinot/connectionFactory.go +++ b/pinot/connectionFactory.go @@ -81,6 +81,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con brokerSelector: &dynamicBrokerSelector{ zkConfig: config.ZkConfig, }, + useMultistageEngine: config.UseMultistageEngine, } } if config.BrokerList != nil && len(config.BrokerList) > 0 { @@ -89,6 +90,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con brokerSelector: &simpleBrokerSelector{ brokerList: config.BrokerList, }, + useMultistageEngine: config.UseMultistageEngine, } } if config.ControllerConfig != nil { @@ -98,6 +100,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con config: config.ControllerConfig, client: http.DefaultClient, }, + useMultistageEngine: config.UseMultistageEngine, } } if conn != nil { diff --git a/pinot/connectionFactory_test.go b/pinot/connectionFactory_test.go index 2e62818..e7a3d09 100644 --- a/pinot/connectionFactory_test.go +++ b/pinot/connectionFactory_test.go @@ -9,7 +9,7 @@ import ( ) func TestPinotClients(t *testing.T) { - pinotClient1, err := NewFromZookeeper([]string{"localhost:2181"}, "", "QuickStartCluster") + pinotClient1, err := NewFromZookeeper([]string{"localhost:12181"}, "", "QuickStartCluster") assert.NotNil(t, pinotClient1) assert.NotNil(t, pinotClient1.brokerSelector) assert.NotNil(t, pinotClient1.transport) @@ -17,7 +17,7 @@ func TestPinotClients(t *testing.T) { assert.NotNil(t, err) pinotClient2, err := NewWithConfig(&ClientConfig{ ZkConfig: &ZookeeperConfig{ - ZookeeperPath: []string{"localhost:2181"}, + ZookeeperPath: []string{"localhost:12181"}, PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"), SessionTimeoutSec: defaultZkSessionTimeoutSec, }, @@ -30,7 +30,7 @@ func TestPinotClients(t *testing.T) { assert.NotNil(t, pinotClient2.transport) // Since there is no zk setup, so an error will be raised assert.NotNil(t, err) - pinotClient3, err := NewFromController("localhost:9000") + pinotClient3, err := NewFromController("localhost:19000") assert.NotNil(t, pinotClient3) assert.NotNil(t, pinotClient3.brokerSelector) assert.NotNil(t, pinotClient3.transport) @@ -39,9 +39,43 @@ func TestPinotClients(t *testing.T) { _, err = NewWithConfig(&ClientConfig{}) assert.NotNil(t, err) assert.True(t, strings.Contains(err.Error(), "please specify")) + pinotClient4, err := NewWithConfig(&ClientConfig{ + ZkConfig: &ZookeeperConfig{ + ZookeeperPath: []string{"localhost:12181"}, + PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"), + SessionTimeoutSec: defaultZkSessionTimeoutSec, + }, + ExtraHTTPHeader: map[string]string{ + "k1": "v1", + }, + UseMultistageEngine: true, + }) + assert.NotNil(t, pinotClient4) + assert.NotNil(t, pinotClient4.brokerSelector) + assert.NotNil(t, pinotClient4.transport) + assert.True(t, pinotClient4.useMultistageEngine) + // Since there is no zk setup, so an error will be raised + assert.NotNil(t, err) + pinotClient5, err := NewWithConfig(&ClientConfig{ + ZkConfig: &ZookeeperConfig{ + ZookeeperPath: []string{"localhost:12181"}, + PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"), + SessionTimeoutSec: defaultZkSessionTimeoutSec, + }, + ExtraHTTPHeader: map[string]string{ + "k1": "v1", + }, + }) + pinotClient5.UseMultistageEngine(true) + assert.NotNil(t, pinotClient5) + assert.NotNil(t, pinotClient5.brokerSelector) + assert.NotNil(t, pinotClient5.transport) + assert.True(t, pinotClient5.useMultistageEngine) + // Since there is no zk setup, so an error will be raised + assert.NotNil(t, err) } -func TestPinotWithHttpTImeout(t *testing.T) { +func TestPinotWithHttpTimeout(t *testing.T) { pinotClient, err := NewWithConfig(&ClientConfig{ // Hit an unreachable port BrokerList: []string{"www.google.com:81"}, diff --git a/pinot/jsonAsyncHTTPClientTransport.go b/pinot/jsonAsyncHTTPClientTransport.go index 29ebef2..7a0049e 100644 --- a/pinot/jsonAsyncHTTPClientTransport.go +++ b/pinot/jsonAsyncHTTPClientTransport.go @@ -27,8 +27,18 @@ func (t jsonAsyncHTTPClientTransport) execute(brokerAddress string, query *Reque url := fmt.Sprintf(getQueryTemplate(query.queryFormat, brokerAddress), brokerAddress) requestJSON := map[string]string{} requestJSON[query.queryFormat] = query.query + queryOptions := "" if query.queryFormat == "sql" { - requestJSON["queryOptions"] = "groupByMode=sql;responseFormat=sql" + queryOptions = "groupByMode=sql;responseFormat=sql" + } + if query.useMultistageEngine { + if queryOptions != "" { + queryOptions += ";" + } + queryOptions += "useMultistageEngine=true" + } + if queryOptions != "" { + requestJSON["queryOptions"] = queryOptions } if query.trace { requestJSON["trace"] = "true" diff --git a/pinot/jsonAsyncHTTPClientTransport_test.go b/pinot/jsonAsyncHTTPClientTransport_test.go index 84f3ecc..cf9e05e 100644 --- a/pinot/jsonAsyncHTTPClientTransport_test.go +++ b/pinot/jsonAsyncHTTPClientTransport_test.go @@ -51,10 +51,11 @@ func TestJsonAsyncHTTPClientTransport(t *testing.T) { }) assert.NotNil(t, err) - _, err = transport.execute("localhos\t:8000", &Request{ - queryFormat: "sql", - query: "select * from baseballStats limit 10", + _, err = transport.execute("localhost:18000", &Request{ + queryFormat: "sql", + query: "select * from baseballStats limit 10", + useMultistageEngine: true, }) assert.NotNil(t, err) - assert.True(t, strings.HasPrefix(err.Error(), "parse ")) + assert.True(t, strings.HasPrefix(err.Error(), "Post ")) } diff --git a/pinot/request.go b/pinot/request.go index 702a040..1471926 100644 --- a/pinot/request.go +++ b/pinot/request.go @@ -2,7 +2,8 @@ package pinot // Request is used in server request to host multiple pinot query types, like PQL, SQL. type Request struct { - queryFormat string - query string - trace bool + queryFormat string + query string + trace bool + useMultistageEngine bool } diff --git a/scripts/start-pinot-quickstart.sh b/scripts/start-pinot-quickstart.sh index 60b7998..7dfcbce 100755 --- a/scripts/start-pinot-quickstart.sh +++ b/scripts/start-pinot-quickstart.sh @@ -15,17 +15,29 @@ if [ -z "${PINOT_HOME}" ]; then PINOT_HOME="/tmp/pinot" fi +# Set the broker port +if [ -z "${BROKER_PORT_FORWARD}" ]; then + echo "BROKER_PORT_FORWARD is not set. Using default port 8000" + BROKER_PORT_FORWARD="8000" +fi + # Create the destination directory mkdir -p "${PINOT_HOME}" -# Download the Pinot package -curl -L "${DOWNLOAD_URL}" -o "${PINOT_HOME}/apache-pinot-${PINOT_VERSION}-bin.tar.gz" +# Check if the directory exists +if [ -d "${PINOT_HOME}/apache-pinot-${PINOT_VERSION}-bin" ]; then + echo "Pinot package already exists in ${PINOT_HOME}/apache-pinot-${PINOT_VERSION}-bin" +else + # Download the Pinot package + curl -L "${DOWNLOAD_URL}" -o "${PINOT_HOME}/apache-pinot-${PINOT_VERSION}-bin.tar.gz" -# Extract the downloaded package -tar -xzf "${PINOT_HOME}/apache-pinot-${PINOT_VERSION}-bin.tar.gz" -C "${PINOT_HOME}" + # Extract the downloaded package + tar -xzf "${PINOT_HOME}/apache-pinot-${PINOT_VERSION}-bin.tar.gz" -C "${PINOT_HOME}" + + # Remove the downloaded package + rm "${PINOT_HOME}/apache-pinot-${PINOT_VERSION}-bin.tar.gz" +fi -# Remove the downloaded package -rm "${PINOT_HOME}/apache-pinot-${PINOT_VERSION}-bin.tar.gz" # Start the Pinot cluster ${PINOT_HOME}/apache-pinot-${PINOT_VERSION}-bin/bin/pinot-admin.sh QuickStart -type MULTI_STAGE & @@ -40,7 +52,7 @@ jps -lvm echo "Ensure Pinot cluster started correctly" -# Wait at most 5 minutes to reach the desired state +# Wait at most 10 minutes to reach the desired state for i in $(seq 1 150) do SUCCEED_TABLE=0 @@ -64,11 +76,11 @@ do if [ "${SUCCEED_TABLE}" -eq 6 ]; then break fi - sleep 2 + sleep 4 done if [ "${SUCCEED_TABLE}" -lt 6 ]; then - echo 'Quickstart failed: Cannot confirmed count-star result from quickstart table in 5 minutes' + echo 'Quickstart failed: Cannot confirmed count-star result from quickstart table in 10 minutes' exit 1 fi echo "Pinot cluster started correctly"