From 5a19d5ebc4e4beec5ccf02018f99e9247613bd63 Mon Sep 17 00:00:00 2001 From: liyang Date: Fri, 27 Sep 2024 16:01:03 +0800 Subject: [PATCH] feat: Supports gRPC write hints (#48) * feat: Supports gRPC write hints * chore: change table name * refactor: move to the context package * refactor: refactor the context package * refactor: init table * Update context/context.go Co-authored-by: zyy17 --------- Co-authored-by: zyy17 --- config.go | 4 +- context/context.go | 50 ++++++++++++++ examples/README.md | 1 + examples/hints/README.md | 58 ++++++++++++++++ examples/hints/main.go | 139 +++++++++++++++++++++++++++++++++++++++ request/request.go | 8 ++- 6 files changed, 256 insertions(+), 4 deletions(-) create mode 100644 context/context.go create mode 100644 examples/hints/README.md create mode 100644 examples/hints/main.go diff --git a/config.go b/config.go index 099e3cd..8aa6020 100644 --- a/config.go +++ b/config.go @@ -107,7 +107,7 @@ func (c *Config) WithMetricsEnabled(b bool) *Config { // WithMeterProvider provides a MeterProvider for SDK. // If metrics colleciton is not enabled, then this option has no effect. -// If metrics colleciton is enabled and this option is not provide +// If metrics colleciton is enabled and this option is not provide. // the global MeterProvider will be used. func (c *Config) WithMeterProvider(p metric.MeterProvider) *Config { c.telemetry.Metrics.MeterProvider = p @@ -122,7 +122,7 @@ func (c *Config) WithTracesEnabled(b bool) *Config { // WithTraceProvider provides a TracerProvider for SDK. // If traces colleciton is not enabled, then this option has no effect. -// If traces colleciton is enabled and this option is not provide +// If traces colleciton is enabled and this option is not provide. // the global MeterProvider will be used. func (c *Config) WithTraceProvider(p trace.TracerProvider) *Config { c.telemetry.Traces.TracerProvider = p diff --git a/context/context.go b/context/context.go new file mode 100644 index 0000000..598c58b --- /dev/null +++ b/context/context.go @@ -0,0 +1,50 @@ +// Copyright 2024 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package context + +import ( + "context" + + "google.golang.org/grpc/metadata" +) + +const ( + hintPrefix = "x-greptime-hint-" +) + +type Hint struct { + Key string + Value string +} + +type Option func(ctx context.Context) context.Context + +func New(parent context.Context, opts ...Option) context.Context { + ctx := parent + for _, opt := range opts { + ctx = opt(ctx) + } + return ctx +} + +func WithHints(hints []*Hint) Option { + return func(ctx context.Context) context.Context { + md := metadata.New(nil) + for _, hint := range hints { + md.Append(hintPrefix+hint.Key, hint.Value) + } + return metadata.NewOutgoingContext(ctx, md) + } +} diff --git a/examples/README.md b/examples/README.md index d085321..314f4c4 100644 --- a/examples/README.md +++ b/examples/README.md @@ -17,6 +17,7 @@ docker run --rm -p 4000-4003:4000-4003 \ - [object](object/README.md) - [healthcheck](healthcheck/README.md) - [opentelemetry](opentelemetry/README.md) +- [hints](hints/README.md) ## Query diff --git a/examples/hints/README.md b/examples/hints/README.md new file mode 100644 index 0000000..42d3698 --- /dev/null +++ b/examples/hints/README.md @@ -0,0 +1,58 @@ +# Write Hints + +## Insert + +```go +go run main.go +``` + +Output: + +```log +2024/09/24 01:16:03 create table, name: 'monitor_table_with_hints' +2024/09/24 01:16:03 affected rows: 3 +``` + +## Query + +Your can using [MySQL Client](https://docs.greptime.com/user-guide/protocols/mysql) to query the data from GreptimeDB. + +```shell +$ mysql -h 127.0.0.1 -P 4002 public + +mysql> select * from monitor_table_with_hints; ++------+-------+-------------+----------------------------+ +| id | host | temperature | timestamp | ++------+-------+-------------+----------------------------+ +| 1 | hello | 1.1 | 2024-09-23 17:16:03.476898 | +| 2 | hello | 2.2 | 2024-09-23 17:16:03.476898 | +| 3 | hello | 3.3 | 2024-09-23 17:16:03.476898 | ++------+-------+-------------+----------------------------+ +3 rows in set (0.03 sec) +``` + +You can view hints using `show create table` command: + +```mysql +mysql> show create table monitor_table_with_hints; ++------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Table | Create Table | ++------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| monitor_table_with_hints | CREATE TABLE IF NOT EXISTS `monitor_table_with_hints` ( + `id` BIGINT NULL, + `host` STRING NULL, + `temperature` DOUBLE NULL, + `timestamp` TIMESTAMP(6) NOT NULL, + TIME INDEX (`timestamp`), + PRIMARY KEY (`id`) +) + +ENGINE=mito +WITH( + append_mode = 'false', + merge_mode = 'last_non_null', + ttl = '3day' +) | ++------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +1 row in set (0.02 sec) +``` diff --git a/examples/hints/main.go b/examples/hints/main.go new file mode 100644 index 0000000..d2fe3a4 --- /dev/null +++ b/examples/hints/main.go @@ -0,0 +1,139 @@ +// Copyright 2024 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "log" + "time" + + greptime "github.com/GreptimeTeam/greptimedb-ingester-go" + + ingesterContext "github.com/GreptimeTeam/greptimedb-ingester-go/context" + "github.com/GreptimeTeam/greptimedb-ingester-go/table" + "github.com/GreptimeTeam/greptimedb-ingester-go/table/types" +) + +const ( + // The GreptimeDB address. + host = "127.0.0.1" + + // The database name. + database = "public" +) + +type client struct { + client *greptime.Client +} + +func newClient() (*client, error) { + cfg := greptime.NewConfig(host).WithDatabase(database) + gtClient, err := greptime.NewClient(cfg) + if err != nil { + return nil, err + } + + c := &client{ + client: gtClient, + } + + return c, nil +} + +func initData() (*table.Table, error) { + time1 := time.Now() + time2 := time.Now() + time3 := time.Now() + + itbl, err := table.New("monitor_table_with_hints") + if err != nil { + return nil, err + } + // add column at first. This is to define the schema of the table. + if err := itbl.AddTagColumn("id", types.INT64); err != nil { + return nil, err + } + if err := itbl.AddFieldColumn("host", types.STRING); err != nil { + return nil, err + } + if err := itbl.AddFieldColumn("temperature", types.FLOAT); err != nil { + return nil, err + } + if err := itbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MICROSECOND); err != nil { + return nil, err + } + + if err := itbl.AddRow(1, "hello", 1.1, time1); err != nil { + return nil, err + } + if err := itbl.AddRow(2, "hello", 2.2, time2); err != nil { + return nil, err + } + if err := itbl.AddRow(3, "hello", 3.3, time3); err != nil { + return nil, err + } + + return itbl, nil +} + +func (c client) write(data *table.Table) error { + hints := []*ingesterContext.Hint{ + { + Key: "ttl", + Value: "3d", + }, + { + Key: "merge_mode", + Value: "last_non_null", + }, + { + Key: "append_mode", + Value: "false", + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + + resp, err := c.client.Write(ingesterContext.New(ctx, ingesterContext.WithHints(hints)), data) + if err != nil { + return err + } + + tableName, err := data.GetName() + if err != nil { + return err + } + + log.Printf("create table, name: '%s'", tableName) + log.Printf("affected rows: %d\n", resp.GetAffectedRows().GetValue()) + return nil +} + +func main() { + data, err := initData() + if err != nil { + log.Fatalf("failed to init data: %v:", err) + } + + c, err := newClient() + if err != nil { + log.Fatalf("failed to new client: %v:", err) + } + + if err = c.write(data); err != nil { + log.Fatalf("failed to write data: %v:", err) + } +} diff --git a/request/request.go b/request/request.go index 686b676..1d5e239 100644 --- a/request/request.go +++ b/request/request.go @@ -76,7 +76,9 @@ func (r *Request) Build() (*gpb.GreptimeRequest, error) { insertReqs = append(insertReqs, req) } req := &gpb.GreptimeRequest_RowInserts{ - RowInserts: &gpb.RowInsertRequests{Inserts: insertReqs}, + RowInserts: &gpb.RowInsertRequests{ + Inserts: insertReqs, + }, } return &gpb.GreptimeRequest{ Header: header, @@ -93,7 +95,9 @@ func (r *Request) Build() (*gpb.GreptimeRequest, error) { } req := &gpb.GreptimeRequest_RowDeletes{ - RowDeletes: &gpb.RowDeleteRequests{Deletes: deleteReqs}, + RowDeletes: &gpb.RowDeleteRequests{ + Deletes: deleteReqs, + }, } return &gpb.GreptimeRequest{ Header: header,