Skip to content

Commit

Permalink
feat: merge client with streamClient (#23)
Browse files Browse the repository at this point in the history
* merge client with streamClient

* version and docs
  • Loading branch information
yuanbohan authored Feb 26, 2024
1 parent e963b15 commit 2da0829
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 423 deletions.
74 changes: 34 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@

Provide API to insert data into GreptimeDB.

## Basic Example

- [schema](examples/schema/main.go)
- [tag](examples/tag/main.go)

## How To Use

### Installation
Expand All @@ -19,39 +14,29 @@ Provide API to insert data into GreptimeDB.
go get -u github.com/GreptimeTeam/greptimedb-ingester-go
```

### Example

#### Config

Initiate a Config for Client or StreamClient
### Import

```go
cfg := config.New("<host>").
WithAuth("<username>", "<password>").
WithDatabase(database)
```

##### Options

- keepalive
import greptime "github.com/GreptimeTeam/greptimedb-ingester-go"

```go
cfg = cfg.WithKeepalive(30*time.Second, 5*time.Second)
```

#### Client or StreamClient
### Example

- Client
#### Config

Initiate a Config for Client

```go
cli, err := client.New(cfg)
cfg := greptime.NewConfig("<host>").
WithAuth("<username>", "<password>").
WithDatabase("<database>")
```

- StreamClient
#### Client

```go

stream, err := client.NewStreamClient(cfg)
cli, err := greptime.NewClient(cfg)
```

#### Insert & StreamInsert
Expand Down Expand Up @@ -81,14 +66,14 @@ The **GreptimeDB** column is for the datatypes supported in library, and the **G
| BOOLEAN, BOOL | bool | |
| STRING | string | |
| BINARY, BYTES | []byte | |
| DATE | Int or time.Time | the day elapsed since 1970-1-1 |
| DATETIME | Int or time.Time | the millisecond elapsed since 1970-1-1 |
| TIMESTAMP_SECOND | Int or time.Time | |
| TIMESTAMP_MILLISECOND, TIMESTAMP | Int or time.Time | |
| TIMESTAMP_MICROSECOND | Int or time.Time | |
| TIMESTAMP_NANOSECOND | Int or time.Time | |
| DATE | *Int* or time.Time | the day elapsed since 1970-1-1 |
| DATETIME | *Int* or time.Time | the millisecond elapsed since 1970-1-1 |
| TIMESTAMP_SECOND | *Int* or time.Time | |
| TIMESTAMP_MILLISECOND, TIMESTAMP | *Int* or time.Time | |
| TIMESTAMP_MICROSECOND | *Int* or time.Time | |
| TIMESTAMP_NANOSECOND | *Int* or time.Time | |

NOTE: Int is for all of Integer and Unsigned Integer in Go
NOTE: *Int* is for all of Integer and Unsigned Integer in Go

##### With Schema predefined

Expand All @@ -97,6 +82,11 @@ you can define schema via Table and Column, and then AddRow to include the real
###### define table schema, and add rows

```go
import(
"github.com/GreptimeTeam/greptimedb-ingester-go/table"
"github.com/GreptimeTeam/greptimedb-ingester-go/table/types"
)

tbl, err := table.New("<table_name>")

tbl.AddTagColumn("id", types.INT64)
Expand All @@ -108,16 +98,18 @@ err := tbl.AddRow(2, "127.0.0.2", time.Now())
...
```

###### Client Write into GreptimeDB
###### Write into GreptimeDB

```go
resp, err := cli.Write(context.Background(), tbl)
```

###### StreamClient Send into GreptimeDB
###### Stream Write into GreptimeDB

```go
err := streamClient.Send(context.Background(), tbl)
err := cli.StreamWrite(context.Background(), tbl)
...
affected, err := cli.CloseStream(ctx)
```

##### With Struct Tag
Expand All @@ -130,7 +122,7 @@ If you prefer ORM style, and define column-field relationship via struct field t
- `tag`, `field`, `timestamp` is for [SemanticType][data-model], and the value is ignored
- `column` is to define the column name
- `type` is to define the data type. if type is timestamp, `precision` is supported
- the metadata separator is `;`, and the key value separator is `:`
- the metadata separator is `;` and the key value separator is `:`

type supported is the same as described [Datatypes supported](#datatypes-supported), and case insensitive

Expand Down Expand Up @@ -166,16 +158,18 @@ monitors := []Monitor{
}
```

###### Client Create into GreptimeDB
###### Create into GreptimeDB

```go
resp, err := cli.Create(context.Background(), monitors)
```

###### StreamClient Create into GreptimeDB
###### Stream Create into GreptimeDB

```go
err := streamClient.Create(context.Background(), monitors)
err := cli.StreamCreate(context.Background(), monitors)
...
affected, err := cli.CloseStream(ctx)
```

#### Query
Expand Down
102 changes: 97 additions & 5 deletions client/client.go → client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package client
package greptime

import (
"context"

gpb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
"google.golang.org/grpc"

"github.com/GreptimeTeam/greptimedb-ingester-go/config"
"github.com/GreptimeTeam/greptimedb-ingester-go/request"
"github.com/GreptimeTeam/greptimedb-ingester-go/request/header"
"github.com/GreptimeTeam/greptimedb-ingester-go/schema"
Expand All @@ -30,13 +29,15 @@ import (
// Client helps to write data into GreptimeDB. A Client is safe for concurrent
// use by multiple goroutines,you can have one Client instance in your application.
type Client struct {
cfg *config.Config
cfg *Config

client gpb.GreptimeDatabaseClient

stream gpb.GreptimeDatabase_HandleRequestsClient
}

// New helps to create the greptimedb client, which will be responsible write data into GreptimeDB.
func New(cfg *config.Config) (*Client, error) {
// NewClient helps to create the greptimedb client, which will be responsible write data into GreptimeDB.
func NewClient(cfg *Config) (*Client, error) {
conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -116,3 +117,94 @@ func (c *Client) Create(ctx context.Context, body any) (*gpb.GreptimeResponse, e

return c.Write(ctx, tbl)
}

// StreamWrite is to send the data into GreptimeDB via explicit schema.
//
// tbl, err := table.New(<tableName>)
//
// // add column at first. This is to define the schema of the table.
// tbl.AddTagColumn("tag1", types.INT64)
// tbl.AddFieldColumn("field1", types.STRING)
// tbl.AddFieldColumn("field2", types.FLOAT64)
// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND)
//
// // you can add multiple row(s). This is the real data.
// tbl.AddRow(1, "hello", 1.1, time.Now())
//
// // send data into GreptimeDB
// resp, err := client.StreamWrite(context.Background(), tbl)
func (c *Client) StreamWrite(ctx context.Context, tables ...*table.Table) error {
if c.stream == nil {
stream, err := c.client.HandleRequests(ctx)
if err != nil {
return err
}
c.stream = stream
}

header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, tables...).Build()
if err != nil {
return err
}
return c.stream.Send(request_)
}

// StreamCreate is like [StreamWrite] to send the data into GreptimeDB, but schema is defined in the struct tag.
//
// type monitor struct {
// ID int64 `greptime:"tag;column:id;type:int64"`
// Host string `greptime:"tag;column:host;type:string"`
// Memory uint64 `greptime:"field;column:memory;type:uint64"`
// Cpu float64 `greptime:"field;column:cpu;type:float64"`
// Temperature int64 `greptime:"field;column:temperature;type:int64"`
// Running bool `greptime:"field;column:running;type:boolean"`
// Ts time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"`
// }
//
// func (monitor) TableName() string {
// return monitorTableName
// }
//
// monitors := []monitor{
// {
// ID: randomId(),
// Host: "127.0.0.1",
// Memory: 1,
// Cpu: 1.0,
// Temperature: -1,
// Ts: time1,
// Running: true,
// },
// {
// ID: randomId(),
// Host: "127.0.0.2",
// Memory: 2,
// Cpu: 2.0,
// Temperature: -2,
// Ts: time2,
// Running: true,
// },
// }
//
// resp, err := client.StreamCreate(context.Background(), monitors)
func (c *Client) StreamCreate(ctx context.Context, body any) error {
tbl, err := schema.Parse(body)
if err != nil {
return err
}
return c.StreamWrite(ctx, tbl)
}

// CloseStream closes the stream. Once we’ve finished writing our client’s requests to the stream
// using client.StreamWrite or client.StreamCreate, we need to call client.CloseStream to let
// GreptimeDB know that we’ve finished writing and are expecting to receive a response.
func (c *Client) CloseStream(ctx context.Context) (*gpb.AffectedRows, error) {
resp, err := c.stream.CloseAndRecv()
if err != nil {
return nil, err
}

c.stream = nil
return resp.GetAffectedRows(), nil
}
Loading

0 comments on commit 2da0829

Please sign in to comment.