Skip to content

Commit

Permalink
Support multistage query engine
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Feb 3, 2024
1 parent 7f7e7b2 commit 5e20192
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 15 deletions.
28 changes: 28 additions & 0 deletions integration-tests/batch_quickstart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,32 @@ func getPinotClientFromBrokerAndCustomHTTPClient() *pinot.Connection {
return pinotClient
}

func getPinotClientFromConfig() *pinot.Connection {
pinotClient, err := pinot.NewWithConfig(&pinot.ClientConfig{
BrokerList: []string{"localhost:" + brokerPort},
HTTPTimeout: 1500 * time.Millisecond,
ExtraHTTPHeader: map[string]string{},
UseMultistageEngine: true,
})
if err != nil {
log.Fatalln(err)
}
return pinotClient
}

func getPinotClientFromConfigAndCustomHTTPClient() *pinot.Connection {
pinotClient, err := pinot.NewWithConfigAndClient(&pinot.ClientConfig{
BrokerList: []string{"localhost:" + brokerPort},
HTTPTimeout: 1500 * time.Millisecond,
ExtraHTTPHeader: map[string]string{},
UseMultistageEngine: true,
}, getCustomHTTPClient())
if err != nil {
log.Fatalln(err)
}
return pinotClient
}

// TestSendingQueriesToPinot tests sending queries to Pinot using different Pinot clients.
// This test requires a Pinot cluster running locally with binary not docker.
// You can change the ports by setting the environment variables ZOOKEEPER_PORT, CONTROLLER_PORT, and BROKER_PORT.
Expand All @@ -101,9 +127,11 @@ func TestSendingQueriesToPinot(t *testing.T) {
getPinotClientFromZookeeper(),
getPinotClientFromController(),
getPinotClientFromBroker(),
getPinotClientFromConfig(),
getPinotClientFromZookeeperAndCustomHTTPClient(),
getPinotClientFromControllerAndCustomHTTPClient(),
getPinotClientFromBrokerAndCustomHTTPClient(),
getPinotClientFromConfigAndCustomHTTPClient(),
}

table := "baseballStats"
Expand Down
2 changes: 2 additions & 0 deletions pinot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type ClientConfig struct {
BrokerList []string
// HTTP request timeout in your broker query for API requests
HTTPTimeout time.Duration
// UseMultistageEngine is a flag to enable multistage query execution engine
UseMultistageEngine bool
}

// ZookeeperConfig describes how to config Pinot Zookeeper connection
Expand Down
14 changes: 8 additions & 6 deletions pinot/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (

// Connection to Pinot, normally created through calls to the {@link ConnectionFactory}.
type Connection struct {
transport clientTransport
brokerSelector brokerSelector
trace bool
transport clientTransport
brokerSelector brokerSelector
trace bool
useMultistageEngine bool
}

// ExecuteSQL for a given table
Expand All @@ -19,9 +20,10 @@ func (c *Connection) ExecuteSQL(table string, query string) (*BrokerResponse, er
return nil, err
}
brokerResp, err := c.transport.execute(brokerAddress, &Request{
queryFormat: "sql",
query: query,
trace: c.trace,
queryFormat: "sql",
query: query,
trace: c.trace,
useMultistageEngine: c.useMultistageEngine,
})
if err != nil {
log.Errorf("Caught exception to execute SQL query %s, Error: %v\n", query, err)
Expand Down
3 changes: 3 additions & 0 deletions pinot/connectionFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con
brokerSelector: &dynamicBrokerSelector{
zkConfig: config.ZkConfig,
},
useMultistageEngine: config.UseMultistageEngine,
}
}
if config.BrokerList != nil && len(config.BrokerList) > 0 {
Expand All @@ -89,6 +90,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con
brokerSelector: &simpleBrokerSelector{
brokerList: config.BrokerList,
},
useMultistageEngine: config.UseMultistageEngine,
}
}
if config.ControllerConfig != nil {
Expand All @@ -98,6 +100,7 @@ func NewWithConfigAndClient(config *ClientConfig, httpClient *http.Client) (*Con
config: config.ControllerConfig,
client: http.DefaultClient,
},
useMultistageEngine: config.UseMultistageEngine,
}
}
if conn != nil {
Expand Down
19 changes: 18 additions & 1 deletion pinot/connectionFactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,26 @@ func TestPinotClients(t *testing.T) {
_, err = NewWithConfig(&ClientConfig{})
assert.NotNil(t, err)
assert.True(t, strings.Contains(err.Error(), "please specify"))
pinotClient4, err := NewWithConfig(&ClientConfig{
ZkConfig: &ZookeeperConfig{
ZookeeperPath: []string{"localhost:2181"},
PathPrefix: strings.Join([]string{"/", "QuickStartCluster"}, "/"),
SessionTimeoutSec: defaultZkSessionTimeoutSec,
},
ExtraHTTPHeader: map[string]string{
"k1": "v1",
},
UseMultistageEngine: true,
})
assert.NotNil(t, pinotClient4)
assert.NotNil(t, pinotClient4.brokerSelector)
assert.NotNil(t, pinotClient4.transport)
assert.True(t, pinotClient4.useMultistageEngine)
// Since there is no zk setup, so an error will be raised
assert.NotNil(t, err)
}

func TestPinotWithHttpTImeout(t *testing.T) {
func TestPinotWithHttpTimeout(t *testing.T) {
pinotClient, err := NewWithConfig(&ClientConfig{
// Hit an unreachable port
BrokerList: []string{"www.google.com:81"},
Expand Down
12 changes: 11 additions & 1 deletion pinot/jsonAsyncHTTPClientTransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,18 @@ func (t jsonAsyncHTTPClientTransport) execute(brokerAddress string, query *Reque
url := fmt.Sprintf(getQueryTemplate(query.queryFormat, brokerAddress), brokerAddress)
requestJSON := map[string]string{}
requestJSON[query.queryFormat] = query.query
queryOptions := ""
if query.queryFormat == "sql" {
requestJSON["queryOptions"] = "groupByMode=sql;responseFormat=sql"
queryOptions = "groupByMode=sql;responseFormat=sql"
}
if query.useMultistageEngine {
if queryOptions != "" {
queryOptions += ";"
}
queryOptions += "useMultistageEngine=true"
}
if queryOptions != "" {
requestJSON["queryOptions"] = queryOptions
}
if query.trace {
requestJSON["trace"] = "true"
Expand Down
9 changes: 5 additions & 4 deletions pinot/jsonAsyncHTTPClientTransport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ func TestJsonAsyncHTTPClientTransport(t *testing.T) {
})
assert.NotNil(t, err)

_, err = transport.execute("localhos\t:8000", &Request{
queryFormat: "sql",
query: "select * from baseballStats limit 10",
_, err = transport.execute("localhost:8000", &Request{
queryFormat: "sql",
query: "select * from baseballStats limit 10",
useMultistageEngine: true,
})
assert.NotNil(t, err)
assert.True(t, strings.HasPrefix(err.Error(), "parse "))
assert.True(t, strings.HasPrefix(err.Error(), "Post "))
}
7 changes: 4 additions & 3 deletions pinot/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package pinot

// Request is used in server request to host multiple pinot query types, like PQL, SQL.
type Request struct {
queryFormat string
query string
trace bool
queryFormat string
query string
trace bool
useMultistageEngine bool
}

0 comments on commit 5e20192

Please sign in to comment.