diff --git a/README.md b/README.md index 1a19312..667f7c3 100644 --- a/README.md +++ b/README.md @@ -26,10 +26,25 @@ Initiate a Config for Client ```go cfg := greptime.NewConfig(""). + WithPort(4001). WithAuth("", ""). WithDatabase("") ``` +#### Options + +##### Secure + +```go +cfg.WithInsecure(false) // default insecure=true +``` + +##### keepalive + +```go +cfg.WithKeepalive(time.Second*30, time.Second*5) // keepalive isn't enabled by default +``` + ### Client ```go @@ -40,12 +55,12 @@ cli, err := greptime.NewClient(cfg) - you can Insert data into GreptimeDB via different style: - - [Table style](#with-table) - - [ORM style](#with-struct-tag) + - [Table style](#table-style) + - [ORM style](#orm-style) - streaming insert is to Send data into GreptimeDB without waiting for response. -#### With Table +#### Table style you can define schema via Table and Column, and then AddRow to include the real data you want to write. @@ -82,7 +97,7 @@ err := cli.StreamWrite(context.Background(), tbl) affected, err := cli.CloseStream(ctx) ``` -#### With Struct Tag +#### ORM style If you prefer ORM style, and define column-field relationship via struct field tag, you can try the following way. diff --git a/client.go b/client.go index 91d387a..ef9cfd4 100644 --- a/client.go +++ b/client.go @@ -38,7 +38,7 @@ type Client struct { // NewClient helps to create the greptimedb client, which will be responsible write data into GreptimeDB. func NewClient(cfg *Config) (*Client, error) { - conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...) + conn, err := grpc.Dial(cfg.endpoint(), cfg.build()...) if err != nil { return nil, err } diff --git a/config.go b/config.go index 8b2527e..6cc5d24 100644 --- a/config.go +++ b/config.go @@ -17,6 +17,10 @@ package greptime import ( "fmt" "time" + + "google.golang.org/grpc" + + "github.com/GreptimeTeam/greptimedb-ingester-go/options" ) // Config is to define how the Client behaves. @@ -27,8 +31,6 @@ import ( // you can find them in GreptimeCloud service detail page. // - Database is the default database the client will operate on. // But you can change the database in InsertRequest or QueryRequest. -// - DialOptions and CallOptions are for gRPC service. -// You can specify them or leave them empty. type Config struct { Host string // no scheme or port included. example: 127.0.0.1 Port int // default: 4001 @@ -36,8 +38,8 @@ type Config struct { Password string Database string // the default database - keepaliveInterval time.Duration - keepaliveTimeout time.Duration + tls *options.TlsOption + options []grpc.DialOption } // NewConfig helps to init Config with host only @@ -45,6 +47,10 @@ func NewConfig(host string) *Config { return &Config{ Host: host, Port: 4001, + + options: []grpc.DialOption{ + options.NewUserAgentOption(version).Build(), + }, } } @@ -60,37 +66,50 @@ func (c *Config) WithDatabase(database string) *Config { return c } -// WithAuth helps to specify the Basic Auth username and password +// WithAuth helps to specify the Basic Auth username and password. +// Leave them empty if you are in local environment. func (c *Config) WithAuth(username, password string) *Config { c.Username = username c.Password = password return c } -func (c *Config) WithKeepalive(interval, timeout time.Duration) *Config { - c.keepaliveInterval = interval - c.keepaliveTimeout = timeout +// WithKeepalive helps to set the keepalive option. +// - time. After a duration of this time if the client doesn't see any activity it +// pings the server to see if the transport is still alive. +// If set below 10s, a minimum value of 10s will be used instead. +// - timeout. After having pinged for keepalive check, the client waits for a duration +// of Timeout and if no activity is seen even after that the connection is closed. +func (c *Config) WithKeepalive(time, timeout time.Duration) *Config { + keepalive := options.NewKeepaliveOption(time, timeout).Build() + c.options = append(c.options, keepalive) return c } -func (c *Config) GetEndpoint() string { - return fmt.Sprintf("%s:%d", c.Host, c.Port) +// TODO(yuanbohan): support more tls options +func (c *Config) WithInsecure(insecure bool) *Config { + opt := options.NewTlsOption(insecure) + c.tls = &opt + return c } -func (c *Config) Options() *Options { - if c.keepaliveInterval == 0 && c.keepaliveTimeout == 0 { - return nil - } - - keepalive := NewKeepaliveOptions() +// WithDialOption helps to specify the dial option +// which has not been supported by ingester sdk yet. +func (c *Config) WithDialOption(opt grpc.DialOption) *Config { + c.options = append(c.options, opt) + return c +} - if c.keepaliveInterval != 0 { - keepalive.WithInterval(c.keepaliveInterval) - } +func (c *Config) endpoint() string { + return fmt.Sprintf("%s:%d", c.Host, c.Port) +} - if c.keepaliveTimeout != 0 { - keepalive.WithTimeout(c.keepaliveTimeout) +func (c *Config) build() []grpc.DialOption { + if c.tls == nil { + opt := options.NewTlsOption(true) + c.tls = &opt } - return NewOptions(keepalive) + c.options = append(c.options, c.tls.Build()) + return c.options } diff --git a/examples/object/main.go b/examples/object/main.go index 496bccb..69a14e8 100644 --- a/examples/object/main.go +++ b/examples/object/main.go @@ -74,7 +74,10 @@ func data() []Monitor { } func writeObject() { - resp, err := client.WriteObject(context.Background(), data()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + + resp, err := client.WriteObject(ctx, data()) if err != nil { log.Fatal(err) } @@ -82,7 +85,9 @@ func writeObject() { } func streamWriteObject() { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + if err := client.StreamWriteObject(ctx, data()); err != nil { log.Println(err) } diff --git a/examples/table/main.go b/examples/table/main.go index c77cec9..977a509 100644 --- a/examples/table/main.go +++ b/examples/table/main.go @@ -69,7 +69,9 @@ func data() *table.Table { } func write() { - resp, err := client.Write(context.Background(), data()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + resp, err := client.Write(ctx, data()) if err != nil { log.Println(err) } @@ -77,7 +79,8 @@ func write() { } func streamWrite() { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() if err := client.StreamWrite(ctx, data()); err != nil { log.Println(err) } diff --git a/options.go b/options.go deleted file mode 100644 index 2a19f9c..0000000 --- a/options.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2024 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package greptime - -import ( - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" -) - -var ( - uaOpt = grpc.WithUserAgent("greptimedb-ingester-go/" + Version) - - // TODO(yuanbohan): SecurityOptions - insecureOpt = grpc.WithTransportCredentials(insecure.NewCredentials()) - - defaultKeepaliveInterval = 30 * time.Second - defaultKeepaliveTimeout = 5 * time.Second -) - -type Options struct { - keepalive *KeepaliveOption -} - -func NewOptions(keepalive *KeepaliveOption) *Options { - return &Options{ - keepalive: keepalive, - } -} - -func (o *Options) WithKeepalive(keepalive *KeepaliveOption) *Options { - o.keepalive = keepalive - return o -} - -func (o *Options) Build() []grpc.DialOption { - options := []grpc.DialOption{uaOpt, insecureOpt} - - if o == nil { - return options - } - - if opt := o.keepalive.Build(); opt != nil { - options = append(options, *opt) - } - - return options -} - -type KeepaliveOption struct { - Interval time.Duration // default value is 30 seconds. - Timeout time.Duration // default value is 5 seconds. -} - -func NewKeepaliveOptions() *KeepaliveOption { - return &KeepaliveOption{ - Interval: defaultKeepaliveInterval, - Timeout: defaultKeepaliveTimeout, - } -} - -func (o *KeepaliveOption) WithInterval(d time.Duration) *KeepaliveOption { - o.Interval = d - return o -} - -func (o *KeepaliveOption) WithTimeout(d time.Duration) *KeepaliveOption { - o.Timeout = d - return o -} - -func (o *KeepaliveOption) Build() *grpc.DialOption { - if o.Interval == 0 && o.Timeout == 0 { - return nil - } - - param := keepalive.ClientParameters{PermitWithoutStream: true} - if o.Interval != 0 { - param.Time = o.Interval - } - if o.Timeout != 0 { - param.Timeout = o.Timeout - } - option := grpc.WithKeepaliveParams(param) - - return &option -} diff --git a/options/keepalive.go b/options/keepalive.go new file mode 100644 index 0000000..1993c62 --- /dev/null +++ b/options/keepalive.go @@ -0,0 +1,55 @@ +// Copyright 2024 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package options + +import ( + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" +) + +var ( + defaultKeepaliveTime = time.Second * 30 + defaultKeepaliveTimeout = time.Second * 5 +) + +type KeepaliveOption struct { + time time.Duration + timeout time.Duration +} + +func NewKeepaliveOption(time, timeout time.Duration) KeepaliveOption { + return KeepaliveOption{ + time: time, + timeout: timeout, + } +} + +func (opt KeepaliveOption) Build() grpc.DialOption { + param := keepalive.ClientParameters{ + PermitWithoutStream: true, + Time: defaultKeepaliveTime, + Timeout: defaultKeepaliveTimeout, + } + + if opt.time != 0 { + param.Time = opt.time + } + if opt.timeout != 0 { + param.Timeout = opt.timeout + } + return grpc.WithKeepaliveParams(param) +} diff --git a/options/tls.go b/options/tls.go new file mode 100644 index 0000000..e334390 --- /dev/null +++ b/options/tls.go @@ -0,0 +1,45 @@ +// Copyright 2024 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package options + +import ( + "crypto/tls" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +type TlsOption struct { + InsecureSkipVerify bool + + // TODO(yuanbohan): support cert path + // ServerCertPath string // + // ClientKeyPath, ClientCertPath string // mTLS +} + +func NewTlsOption(InsecureSkipVerify bool) TlsOption { + return TlsOption{InsecureSkipVerify: InsecureSkipVerify} +} + +func (opt TlsOption) Build() grpc.DialOption { + if opt.InsecureSkipVerify { + return grpc.WithTransportCredentials(insecure.NewCredentials()) + } else { + // TODO(yuanbohan): setting for cert or key + tls := tls.Config{InsecureSkipVerify: opt.InsecureSkipVerify} + return grpc.WithTransportCredentials(credentials.NewTLS(&tls)) + } +} diff --git a/options/ua.go b/options/ua.go new file mode 100644 index 0000000..613aebd --- /dev/null +++ b/options/ua.go @@ -0,0 +1,31 @@ +// Copyright 2024 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package options + +import ( + "google.golang.org/grpc" +) + +type UserAgentOption struct { + version string +} + +func NewUserAgentOption(version string) UserAgentOption { + return UserAgentOption{version: version} +} + +func (opt UserAgentOption) Build() grpc.DialOption { + return grpc.WithUserAgent("greptimedb-ingester-go/" + opt.version) +} diff --git a/version.go b/version.go index 7fac8f1..cfda937 100644 --- a/version.go +++ b/version.go @@ -14,4 +14,4 @@ package greptime -const Version = "v0.3.0" // THIS MUST BE THE SAME AS THE VERSION in GitHub release +const version = "v0.4.0"