Skip to content

Commit

Permalink
Support multistage query engine
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Feb 3, 2024
1 parent 7f7e7b2 commit ceb5b73
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 49 deletions.
30 changes: 21 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,6 @@ go build ./examples/json-batch-quickstart
./json-batch-quickstart
```

## Pinot Live Demo cluster

Build and run the example application to query from Pinot Batch Quickstart

```
go build ./examples/pinot-live-demo
./pinot-live-demo
```

# Usage

## Create a Pinot Connection
Expand Down Expand Up @@ -176,6 +167,27 @@ if err != nil {
log.Infof("Query Stats: response time - %d ms, scanned docs - %d, total docs - %d", brokerResp.TimeUsedMs, brokerResp.NumDocsScanned, brokerResp.TotalDocs)
```

## Query Pinot with Multi-Stage Engine

Please see this [example](https://github.com/startreedata/pinot-client-go/blob/master/examples/multistage-quickstart/main.go) for your reference.

How to run it:

```
go build ./examples/multistage-quickstart
./multistage-quickstart
```

Code snippet:

```
pinotClient, err := pinot.NewFromZookeeper([]string{"localhost:2123"}, "", "QuickStartCluster")
if err != nil {
log.Error(err)
}
pinotClient.UseMultistageEngine(true)
```

## Response Format

Query Response is defined as the struct of following:
Expand Down
72 changes: 72 additions & 0 deletions examples/multistage-quickstart/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"encoding/json"
"fmt"

pinot "github.com/startreedata/pinot-client-go/pinot"

log "github.com/sirupsen/logrus"
)

func main() {
pinotClient, err := pinot.NewFromZookeeper([]string{"localhost:2123"}, "", "QuickStartCluster")
if err != nil {
log.Error(err)
}
pinotClient.UseMultistageEngine(true)
table := "baseballStats"
pinotQueries := []string{
"select * from baseballStats limit 10",
"select count(*) as cnt from baseballStats limit 1",
"select count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats limit 1",
"select teamID, count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10",
"select distinctCount(league) as unique_league_cnt from baseballStats limit 10",
}

log.Infof("Querying SQL")
for _, query := range pinotQueries {
log.Infof("Trying to query Pinot: %v", query)
brokerResp, err := pinotClient.ExecuteSQL(table, query)
if err != nil {
log.Error(err)
}
printBrokerResp(brokerResp)
}
}

func printBrokerResp(brokerResp *pinot.BrokerResponse) {
log.Infof("Query Stats: response time - %d ms, scanned docs - %d, total docs - %d", brokerResp.TimeUsedMs, brokerResp.NumDocsScanned, brokerResp.TotalDocs)
if brokerResp.Exceptions != nil && len(brokerResp.Exceptions) > 0 {
jsonBytes, _ := json.Marshal(brokerResp.Exceptions)
log.Infof("brokerResp.Exceptions:\n%s\n", jsonBytes)
return
}
if brokerResp.ResultTable != nil {
jsonBytes, _ := json.Marshal(brokerResp.ResultTable)
log.Infof("brokerResp.ResultTable:\n%s\n", jsonBytes)
line := ""
for c := 0; c < brokerResp.ResultTable.GetColumnCount(); c++ {
line += fmt.Sprintf("%s(%s)\t", brokerResp.ResultTable.GetColumnName(c), brokerResp.ResultTable.GetColumnDataType(c))
}
line += "\n"
for r := 0; r < brokerResp.ResultTable.GetRowCount(); r++ {
for c := 0; c < brokerResp.ResultTable.GetColumnCount(); c++ {
line += fmt.Sprintf("%v\t", brokerResp.ResultTable.Get(r, c))
}
line += "\n"
}
log.Infof("ResultTable:\n%s", line)
return
}
if brokerResp.AggregationResults != nil {
jsonBytes, _ := json.Marshal(brokerResp.AggregationResults)
log.Infof("brokerResp.AggregationResults:\n%s\n", jsonBytes)
return
}
if brokerResp.SelectionResults != nil {
jsonBytes, _ := json.Marshal(brokerResp.SelectionResults)
log.Infof("brokerResp.SelectionResults:\n%s\n", jsonBytes)
return
}
}
69 changes: 56 additions & 13 deletions integration-tests/batch_quickstart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,30 @@ var (
brokerPort = getEnv("BROKER_PORT", "8000")
)

func getPinotClientFromZookeeper() *pinot.Connection {
func getPinotClientFromZookeeper(useMultistageEngine bool) *pinot.Connection {
pinotClient, err := pinot.NewFromZookeeper([]string{"localhost:" + zookeeperPort}, "", "QuickStartCluster")
if err != nil {
log.Fatalln(err)
}
pinotClient.UseMultistageEngine(useMultistageEngine)
return pinotClient
}

func getPinotClientFromController() *pinot.Connection {
func getPinotClientFromController(useMultistageEngine bool) *pinot.Connection {
pinotClient, err := pinot.NewFromController("localhost:" + controllerPort)
if err != nil {
log.Fatalln(err)
}
pinotClient.UseMultistageEngine(useMultistageEngine)
return pinotClient
}

func getPinotClientFromBroker() *pinot.Connection {
func getPinotClientFromBroker(useMultistageEngine bool) *pinot.Connection {
pinotClient, err := pinot.NewFromBrokerList([]string{"localhost:" + brokerPort})
if err != nil {
log.Fatalln(err)
}
pinotClient.UseMultistageEngine(useMultistageEngine)
return pinotClient
}

Expand All @@ -69,27 +72,56 @@ func getCustomHTTPClient() *http.Client {
return httpClient
}

func getPinotClientFromZookeeperAndCustomHTTPClient() *pinot.Connection {
func getPinotClientFromZookeeperAndCustomHTTPClient(useMultistageEngine bool) *pinot.Connection {
pinotClient, err := pinot.NewFromZookeeperAndClient([]string{"localhost:" + zookeeperPort}, "", "QuickStartCluster", getCustomHTTPClient())
if err != nil {
log.Fatalln(err)
}
pinotClient.UseMultistageEngine(useMultistageEngine)
return pinotClient
}

func getPinotClientFromControllerAndCustomHTTPClient() *pinot.Connection {
func getPinotClientFromControllerAndCustomHTTPClient(useMultistageEngine bool) *pinot.Connection {
pinotClient, err := pinot.NewFromControllerAndClient("localhost:"+controllerPort, getCustomHTTPClient())
if err != nil {
log.Fatalln(err)
}
pinotClient.UseMultistageEngine(useMultistageEngine)
return pinotClient
}

func getPinotClientFromBrokerAndCustomHTTPClient() *pinot.Connection {
func getPinotClientFromBrokerAndCustomHTTPClient(useMultistageEngine bool) *pinot.Connection {
pinotClient, err := pinot.NewFromBrokerListAndClient([]string{"localhost:" + brokerPort}, getCustomHTTPClient())
if err != nil {
log.Fatalln(err)
}
pinotClient.UseMultistageEngine(useMultistageEngine)
return pinotClient
}

func getPinotClientFromConfig(useMultistageEngine bool) *pinot.Connection {
pinotClient, err := pinot.NewWithConfig(&pinot.ClientConfig{
BrokerList: []string{"localhost:" + brokerPort},
HTTPTimeout: 1500 * time.Millisecond,
ExtraHTTPHeader: map[string]string{},
})
if err != nil {
log.Fatalln(err)
}
pinotClient.UseMultistageEngine(useMultistageEngine)
return pinotClient
}

func getPinotClientFromConfigAndCustomHTTPClient(useMultistageEngine bool) *pinot.Connection {
pinotClient, err := pinot.NewWithConfigAndClient(&pinot.ClientConfig{
BrokerList: []string{"localhost:" + brokerPort},
HTTPTimeout: 1500 * time.Millisecond,
ExtraHTTPHeader: map[string]string{},
}, getCustomHTTPClient())
if err != nil {
log.Fatalln(err)
}
pinotClient.UseMultistageEngine(useMultistageEngine)
return pinotClient
}

Expand All @@ -98,12 +130,23 @@ func getPinotClientFromBrokerAndCustomHTTPClient() *pinot.Connection {
// You can change the ports by setting the environment variables ZOOKEEPER_PORT, CONTROLLER_PORT, and BROKER_PORT.
func TestSendingQueriesToPinot(t *testing.T) {
pinotClients := []*pinot.Connection{
getPinotClientFromZookeeper(),
getPinotClientFromController(),
getPinotClientFromBroker(),
getPinotClientFromZookeeperAndCustomHTTPClient(),
getPinotClientFromControllerAndCustomHTTPClient(),
getPinotClientFromBrokerAndCustomHTTPClient(),
getPinotClientFromZookeeper(false),
getPinotClientFromController(false),
getPinotClientFromBroker(false),
getPinotClientFromConfig(false),
getPinotClientFromZookeeperAndCustomHTTPClient(false),
getPinotClientFromControllerAndCustomHTTPClient(false),
getPinotClientFromBrokerAndCustomHTTPClient(false),
getPinotClientFromConfigAndCustomHTTPClient(false),

getPinotClientFromZookeeper(true),
getPinotClientFromController(true),
getPinotClientFromBroker(true),
getPinotClientFromConfig(true),
getPinotClientFromZookeeperAndCustomHTTPClient(true),
getPinotClientFromControllerAndCustomHTTPClient(true),
getPinotClientFromBrokerAndCustomHTTPClient(true),
getPinotClientFromConfigAndCustomHTTPClient(true),
}

table := "baseballStats"
Expand All @@ -113,7 +156,7 @@ func TestSendingQueriesToPinot(t *testing.T) {

log.Printf("Querying SQL")
for _, query := range pinotQueries {
for i := 0; i < 100; i++ {
for i := 0; i < 200; i++ {
log.Printf("Trying to query Pinot: %v\n", query)
brokerResp, err := pinotClients[i%len(pinotClients)].ExecuteSQL(table, query)
assert.Nil(t, err)
Expand Down
2 changes: 2 additions & 0 deletions pinot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type ClientConfig struct {
BrokerList []string
// HTTP request timeout in your broker query for API requests
HTTPTimeout time.Duration
// UseMultistageEngine is a flag to enable multistage query execution engine
UseMultistageEngine bool
}

// ZookeeperConfig describes how to config Pinot Zookeeper connection
Expand Down
19 changes: 13 additions & 6 deletions pinot/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ import (

// Connection to Pinot, normally created through calls to the {@link ConnectionFactory}.
type Connection struct {
transport clientTransport
brokerSelector brokerSelector
trace bool
transport clientTransport
brokerSelector brokerSelector
trace bool
useMultistageEngine bool
}

// UseMultistageEngine for the connection
func (c *Connection) UseMultistageEngine(useMultistageEngine bool) {
c.useMultistageEngine = useMultistageEngine
}

// ExecuteSQL for a given table
Expand All @@ -19,9 +25,10 @@ func (c *Connection) ExecuteSQL(table string, query string) (*BrokerResponse, er
return nil, err
}
brokerResp, err := c.transport.execute(brokerAddress, &Request{
queryFormat: "sql",
query: query,
trace: c.trace,
queryFormat: "sql",
query: query,
trace: c.trace,
useMultistageEngine: c.useMultistageEngine,
})
if err != nil {
log.Errorf("Caught exception to execute SQL query %s, Error: %v\n", query, err)
Expand Down
3 changes: 3 additions & 0 deletions pinot/connectionFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con
brokerSelector: &dynamicBrokerSelector{
zkConfig: config.ZkConfig,
},
useMultistageEngine: config.UseMultistageEngine,
}
}
if config.BrokerList != nil && len(config.BrokerList) > 0 {
Expand All @@ -89,6 +90,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con
brokerSelector: &simpleBrokerSelector{
brokerList: config.BrokerList,
},
useMultistageEngine: config.UseMultistageEngine,
}
}
if config.ControllerConfig != nil {
Expand All @@ -98,6 +100,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con
config: config.ControllerConfig,
client: http.DefaultClient,
},
useMultistageEngine: config.UseMultistageEngine,
}
}
if conn != nil {
Expand Down
42 changes: 38 additions & 4 deletions pinot/connectionFactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
)

func TestPinotClients(t *testing.T) {
pinotClient1, err := NewFromZookeeper([]string{"localhost:2181"}, "", "QuickStartCluster")
pinotClient1, err := NewFromZookeeper([]string{"localhost:12181"}, "", "QuickStartCluster")
assert.NotNil(t, pinotClient1)
assert.NotNil(t, pinotClient1.brokerSelector)
assert.NotNil(t, pinotClient1.transport)
// Since there is no zk setup, so an error will be raised
assert.NotNil(t, err)
pinotClient2, err := NewWithConfig(&ClientConfig{
ZkConfig: &ZookeeperConfig{
ZookeeperPath: []string{"localhost:2181"},
ZookeeperPath: []string{"localhost:12181"},
PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"),
SessionTimeoutSec: defaultZkSessionTimeoutSec,
},
Expand All @@ -30,7 +30,7 @@ func TestPinotClients(t *testing.T) {
assert.NotNil(t, pinotClient2.transport)
// Since there is no zk setup, so an error will be raised
assert.NotNil(t, err)
pinotClient3, err := NewFromController("localhost:9000")
pinotClient3, err := NewFromController("localhost:19000")
assert.NotNil(t, pinotClient3)
assert.NotNil(t, pinotClient3.brokerSelector)
assert.NotNil(t, pinotClient3.transport)
Expand All @@ -39,9 +39,43 @@ func TestPinotClients(t *testing.T) {
_, err = NewWithConfig(&ClientConfig{})
assert.NotNil(t, err)
assert.True(t, strings.Contains(err.Error(), "please specify"))
pinotClient4, err := NewWithConfig(&ClientConfig{
ZkConfig: &ZookeeperConfig{
ZookeeperPath: []string{"localhost:12181"},
PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"),
SessionTimeoutSec: defaultZkSessionTimeoutSec,
},
ExtraHTTPHeader: map[string]string{
"k1": "v1",
},
UseMultistageEngine: true,
})
assert.NotNil(t, pinotClient4)
assert.NotNil(t, pinotClient4.brokerSelector)
assert.NotNil(t, pinotClient4.transport)
assert.True(t, pinotClient4.useMultistageEngine)
// Since there is no zk setup, so an error will be raised
assert.NotNil(t, err)
pinotClient5, err := NewWithConfig(&ClientConfig{
ZkConfig: &ZookeeperConfig{
ZookeeperPath: []string{"localhost:12181"},
PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"),
SessionTimeoutSec: defaultZkSessionTimeoutSec,
},
ExtraHTTPHeader: map[string]string{
"k1": "v1",
},
})
pinotClient5.UseMultistageEngine(true)
assert.NotNil(t, pinotClient5)
assert.NotNil(t, pinotClient5.brokerSelector)
assert.NotNil(t, pinotClient5.transport)
assert.True(t, pinotClient5.useMultistageEngine)
// Since there is no zk setup, so an error will be raised
assert.NotNil(t, err)
}

func TestPinotWithHttpTImeout(t *testing.T) {
func TestPinotWithHttpTimeout(t *testing.T) {
pinotClient, err := NewWithConfig(&ClientConfig{
// Hit an unreachable port
BrokerList: []string{"www.google.com:81"},
Expand Down
Loading

0 comments on commit ceb5b73

Please sign in to comment.