From 69f938ffc4ea5f2bf4ddc52273e9ca2872b0e611 Mon Sep 17 00:00:00 2001 From: locona Date: Tue, 31 Oct 2017 13:01:47 +0900 Subject: [PATCH] First release --- .gitignore | 1 + LICENSE | 201 ++++++++++++++++++++++ README.md | 35 ++++ example/main.go | 30 ++++ presto/client_type_signature.go | 8 + presto/client_type_signature_parameter.go | 6 + presto/column.go | 17 ++ presto/config.go | 11 ++ presto/data.go | 30 ++++ presto/data_test.go | 25 +++ presto/error_location.go | 6 + presto/failer_info.go | 10 ++ presto/presto.go | 120 +++++++++++++ presto/query_error.go | 20 +++ presto/query_result.go | 53 ++++++ presto/stage_stats.go | 18 ++ presto/statement_stats.go | 17 ++ 17 files changed, 608 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 example/main.go create mode 100644 presto/client_type_signature.go create mode 100644 presto/client_type_signature_parameter.go create mode 100644 presto/column.go create mode 100644 presto/config.go create mode 100644 presto/data.go create mode 100644 presto/data_test.go create mode 100644 presto/error_location.go create mode 100644 presto/failer_info.go create mode 100644 presto/presto.go create mode 100644 presto/query_error.go create mode 100644 presto/query_result.go create mode 100644 presto/stage_stats.go create mode 100644 presto/statement_stats.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..22d0d82 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +vendor diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..12b04e3 --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +# Reckoner Presto + +## Presto Client For GO + +### Installing + +``` +go get -u github.com/3-shake/reckoner-presto/presto +``` + +### Example + +```go +import ( + "os" + + "github.com/3-shake/reckoner-presto/presto" +) + +func main() { + baseUrl := os.Getenv("PRESTO_BASE_URL") + config := presto.Config{ + Host: baseUrl, + User: "hive", + Catalog: "hive", + Schema: "xxxxxxxxxxx", + TimeZone: "UTC", + } + presto, _ := presto.New(&config) + query, _ := presto.Query("SELECT client_id FROM sample") + next, _ := query.Next() +} + +``` + diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..d9bd043 --- /dev/null +++ b/example/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "os" + + "github.com/3-shake/reckoner-presto/presto" +) + +type QueryResult *presto.QueryResult + +func main() { + baseUrl := os.Getenv("PRESTO_BASE_URL") + + config := presto.Config{ + Host: baseUrl, + User: "hive", + Catalog: "hive", + Schema: "sample", + TimeZone: "UTC", + } + presto := presto.New(&config) + query, err := presto.Query(fmt.Sprintf("SELECT * FROM %q LIMIT 1", "sample")) + if err != nil { + fmt.Println(err) + return + } + next, err := query.Next() + fmt.Println(next, err) +} diff --git a/presto/client_type_signature.go b/presto/client_type_signature.go new file mode 100644 index 0000000..caa09f9 --- /dev/null +++ b/presto/client_type_signature.go @@ -0,0 +1,8 @@ +package presto + +type ClientTypeSignature struct { + RawType string `json:"rawType"` + TypeArguments []interface{} `json:"typeArguments"` + LiteralArguments []interface{} `json:"literalArguments"` + Arguments []ClientTypeSignatureParameter `json:"arguments"` +} diff --git a/presto/client_type_signature_parameter.go b/presto/client_type_signature_parameter.go new file mode 100644 index 0000000..cb82534 --- /dev/null +++ b/presto/client_type_signature_parameter.go @@ -0,0 +1,6 @@ +package presto + +type ClientTypeSignatureParameter struct { + Kind interface{} `json:"kind"` + Value interface{} `json:"value"` +} diff --git a/presto/column.go b/presto/column.go new file mode 100644 index 0000000..be0f179 --- /dev/null +++ b/presto/column.go @@ -0,0 +1,17 @@ +package presto + +type Column struct { + Name string `json:"name"` + Type string `json:"type` + TypeSignature ClientTypeSignature `json:"typeSignature"` +} + +type Columns []Column + +func (this Columns) Names() []string { + names := make([]string, len(this)) + for idx := range this { + names[idx] = this[idx].Name + } + return names +} diff --git a/presto/config.go b/presto/config.go new file mode 100644 index 0000000..f0efc9e --- /dev/null +++ b/presto/config.go @@ -0,0 +1,11 @@ +package presto + +type Config struct { + Host string + + // HTTP-Request-Header + User string `default:"hive"` + Catalog string `default:"hive"` + Schema string `default:"default"` + TimeZone string `default:"UTC"` +} diff --git a/presto/data.go b/presto/data.go new file mode 100644 index 0000000..6fe5b0c --- /dev/null +++ b/presto/data.go @@ -0,0 +1,30 @@ +package presto + +import ( + "fmt" + "sync" +) + +type Data [][]interface{} + +func (this Data) StringSlices() [][]string { + data := make([][]string, len(this)) + var wg sync.WaitGroup + for idx := range this { + wg.Add(1) + go func(index int) { + defer wg.Done() + row := make([]string, 0, len(this[idx])) + for _, field := range this[idx] { + if field == nil { + row = append(row, "") + continue + } + row = append(row, fmt.Sprint(field)) + } + data[index] = row + }(idx) + } + wg.Wait() + return data +} diff --git a/presto/data_test.go b/presto/data_test.go new file mode 100644 index 0000000..4ff02f1 --- /dev/null +++ b/presto/data_test.go @@ -0,0 +1,25 @@ +package presto + +import ( + "fmt" + "testing" +) + +func TestStringSlices(t *testing.T) { + data := Data{ + {370125442440519680, 1508315181, 1508114292.000000}, + {370125442440519680, 1508315181, 1508114292.000000}, + {370125442440519680, 1508315181, 1508114292.000000}, + {370125442440519680, 1508315181, 1508114292.000000}, + {370125442440519680, 1508315181, 1508114292.000000}, + {370125442440519680, 1508315181, 1508114292.000000}, + {370125442440519680, 1508315181, 1508114292.000000}, + {370125442440519680, 1508315181, 1508114292.000000}, + } + + slices := data.StringSlices() + for idx := range slices { + fmt.Println(slices[idx]) + } + t.Error("########") +} diff --git a/presto/error_location.go b/presto/error_location.go new file mode 100644 index 0000000..34a7466 --- /dev/null +++ b/presto/error_location.go @@ -0,0 +1,6 @@ +package presto + +type ErrorLocation struct { + LineNumber int `json:"lineNumber"` + ColumnNumber int `json:"columnNumber"` +} diff --git a/presto/failer_info.go b/presto/failer_info.go new file mode 100644 index 0000000..3197056 --- /dev/null +++ b/presto/failer_info.go @@ -0,0 +1,10 @@ +package presto + +type FailerInfo struct { + Type string `json:"type"` + Message string `json:"message"` + Cause interface{} `json:"cause"` + Suppressed []interface{} `json:"Suppressed"` + Stack []string `json:"stack"` + ErrorLocation ErrorLocation `json:"errorLocation"` +} diff --git a/presto/presto.go b/presto/presto.go new file mode 100644 index 0000000..7d044e2 --- /dev/null +++ b/presto/presto.go @@ -0,0 +1,120 @@ +package presto + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "reflect" + "regexp" + "strings" + "time" +) + +var ( + // state + FAILED = "FAILED" +) + +type Presto struct { + URL string + DefaultHeader http.Header +} + +func New(config *Config) *Presto { + presto := &Presto{ + URL: config.Host, + DefaultHeader: make(http.Header, 4), + } + presto.SetHeader(config) + return presto +} + +func (this *Presto) SetHeader(config *Config) { + val := reflect.ValueOf(config).Elem() + + for i := 0; i < val.NumField(); i++ { + valueField := val.Field(i) + typeField := val.Type().Field(i) + tag := typeField.Tag + + key := fmt.Sprintf("X-Presto-%s", convertCamelToHyphen(typeField.Name)) + value := valueField.String() + if value == "" { + value = tag.Get("default") + } + this.DefaultHeader.Set(key, value) + } +} + +func (this *Presto) Query(query string) (*QueryResult, error) { + endpoint := fmt.Sprintf("%s/v1/statement", this.URL) + req, err := this.Request("POST", endpoint, strings.NewReader(query)) + if err != nil { + return nil, err + } + resp, err := Do(req) + if err != nil { + return nil, err + } + + var queryResult QueryResult + err = json.NewDecoder(resp.Body).Decode(&queryResult) + resp.Body.Close() + if err != nil { + return nil, err + } + if queryResult.Stats.State == FAILED { + return &QueryResult{}, queryResult.Error + } + + queryResult.Presto = this + return &queryResult, nil +} + +func (this *Presto) Request(method, endpoint string, body io.Reader) (*http.Request, error) { + parsedUrl, _ := url.Parse(endpoint) + req, err := http.NewRequest(method, parsedUrl.String(), body) + if err != nil { + return nil, err + } + for idx, _ := range this.DefaultHeader { + req.Header[idx] = this.DefaultHeader[idx] + } + return req, nil +} + +func Do(req *http.Request) (*http.Response, error) { + for { + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + switch resp.StatusCode { + case 200: + return resp, nil + case 503: + time.Sleep(1 * time.Millisecond) + continue + default: + resp.Body.Close() + return nil, errors.New("Query Error") + } + } +} + +func convertCamelToHyphen(s string) string { + camel := regexp.MustCompile("(^[^A-Z]*|[A-Z]*)([A-Z][^A-Z]+|$)") + var a []string + for _, sub := range camel.FindAllStringSubmatch(s, -1) { + if sub[1] != "" { + a = append(a, sub[1]) + } + if sub[2] != "" { + a = append(a, sub[2]) + } + } + return strings.ToLower(strings.Join(a, "-")) +} diff --git a/presto/query_error.go b/presto/query_error.go new file mode 100644 index 0000000..cff01fe --- /dev/null +++ b/presto/query_error.go @@ -0,0 +1,20 @@ +package presto + +import ( + "encoding/json" + "fmt" +) + +type QueryError struct { + Message string `json:"message"` + SqlState string `json:"sqlState"` + ErrorName string `json:"errorName"` + ErrorType string `json:"errorType"` + ErrorLocation ErrorLocation `json:"errorLocation"` + FailerInfo FailerInfo `json:"failureInfo"` +} + +func (err QueryError) Error() string { + b, _ := json.Marshal(err) + return fmt.Sprintf(string(b)) +} diff --git a/presto/query_result.go b/presto/query_result.go new file mode 100644 index 0000000..15a4b3a --- /dev/null +++ b/presto/query_result.go @@ -0,0 +1,53 @@ +package presto + +import ( + "encoding/json" + "errors" +) + +type QueryResult struct { + ID string `json:"id"` + InfoUri string `json:"infoUri"` + NextUri string `json:"nextUri"` + Columns Columns `json:"columns"` + Data Data `json:"data"` + Stats StatementStats `json:"stats"` + Error QueryError `json:"error"` + UpdateType string `json:"updateType"` + UpdateCount int `json:"updateCount"` + Presto *Presto `json:"-"` +} + +func (this *QueryResult) Next() (*QueryResult, error) { + queryResult := &QueryResult{NextUri: this.NextUri} + var data [][]interface{} + for queryResult.NextUri != "" { + req, err := this.Presto.Request("GET", queryResult.NextUri, nil) + if err != nil { + return nil, errors.New("invalid request") + } + resp, err := Do(req) + if err != nil { + return nil, err + } + var tmp QueryResult + + d := json.NewDecoder(resp.Body) + d.UseNumber() + err = d.Decode(&tmp) + defer resp.Body.Close() + if err != nil { + return nil, err + } + queryResult = &tmp + if queryResult.Stats.State == FAILED { + return &QueryResult{}, queryResult.Error + } + + if len(queryResult.Data) > 0 { + data = append(data, queryResult.Data...) + } + } + queryResult.Data = data + return queryResult, nil +} diff --git a/presto/stage_stats.go b/presto/stage_stats.go new file mode 100644 index 0000000..daafe74 --- /dev/null +++ b/presto/stage_stats.go @@ -0,0 +1,18 @@ +package presto + +type StageStats struct { + StageID string `json:"stageId"` + State string `json:"state"` + Done string `json:"done"` + Nodes string `json:"nodes"` + TotalSplits int `json:"totalSplits"` + QueuedSplits int `json:"queuedSplits"` + RunningSplits int `json:"runningSplits"` + CompleteSplits int `json:"completeSplits"` + UserMillis int `json:"userMillis"` + CpuMillis int `json:"cpuMillis"` + WallTimeMillis int `json:"wallTimeMillis"` + ProcessedRows int `json:"processedRows"` + ProcessedBytes int `json:"processedBytes"` + SubStages []StageStats `json:"subStages"` +} diff --git a/presto/statement_stats.go b/presto/statement_stats.go new file mode 100644 index 0000000..13d941d --- /dev/null +++ b/presto/statement_stats.go @@ -0,0 +1,17 @@ +package presto + +type StatementStats struct { + State string `json:"state"` + Queued bool `json:"queued"` + Nodes int `json:"nodes"` + TotalSplits int `json:"totalSplits"` + QueuedSplits int `json:"queuedSplits"` + RunningSplits int `json:"runningSplits"` + CompleteSplits int `json:"completeSplits"` + UserMillis int `json:"userMillis"` + CpuMillis int `json:"cpuMillis"` + WallTimeMillis int `json:"wallTimeMillis"` + ProcessedRows int `json:"processedRows"` + ProcessedBytes int `json:"processedBytes"` + RootState StageStats `json:"rootState"` +}