Skip to content

Commit

Permalink
feat: support delete record (#29)
Browse files Browse the repository at this point in the history
* feat: support delete record

* feat: replace WriteOp by Operation

* fix code comment mistakes
  • Loading branch information
JetSquirrel authored Mar 18, 2024
1 parent 08930f5 commit 99663b9
Show file tree
Hide file tree
Showing 7 changed files with 520 additions and 46 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,34 @@ err := tbl.AddRow(2, "127.0.0.2", time.Now())
resp, err := cli.Write(context.Background(), tbl)
```

##### Delete in GreptimeDB

```go
dtbl, err := table.New("<table_name>")
dtbl.AddTagColumn("id", types.INT64)
dtbl.AddFieldColumn("host", types.STRING)
dtbl.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)

// timestamp is the time you want to delete row
err := dtbl.AddRow(1, "127.0.0.1",timestamp)

affected, err := cli.Delete(context.Background(),dtbl)
```

##### Stream Write into GreptimeDB

```go
err := cli.StreamWrite(context.Background(), tbl)
...
affected, err := cli.CloseStream(ctx)
```
##### Stream Delete in GreptimeDB

```go
err := cli.StreamDelete(context.Background(), tbl)
...
affected, err := cli.CloseStream(ctx)
```

#### ORM style

Expand Down Expand Up @@ -149,6 +170,14 @@ monitors := []Monitor{
resp, err := cli.WriteObject(context.Background(), monitors)
```

##### DeleteObject in GreptimeDB

```go
deleteMonitors := monitors[:1]

affected, err := cli.DeleteObject(context.Background(), deleteMonitors)
```

##### Stream WriteObject into GreptimeDB

```go
Expand All @@ -157,6 +186,16 @@ err := cli.StreamWriteObject(context.Background(), monitors)
affected, err := cli.CloseStream(ctx)
```

##### Stream DeleteObject in GreptimeDB

```go
deleteMonitors := monitors[:1]

err := cli.StreamDeleteObject(context.Background(), deleteMonitors)
...
affected, err := cli.CloseStream(ctx)
```

## Datatypes supported

The **GreptimeDB** column is for the datatypes supported in library, and the **Go** column is the matched Go type.
Expand Down
116 changes: 91 additions & 25 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/GreptimeTeam/greptimedb-ingester-go/request/header"
"github.com/GreptimeTeam/greptimedb-ingester-go/schema"
"github.com/GreptimeTeam/greptimedb-ingester-go/table"
"github.com/GreptimeTeam/greptimedb-ingester-go/table/types"
)

// Client helps to write data into GreptimeDB. A Client is safe for concurrent
Expand All @@ -47,6 +48,17 @@ func NewClient(cfg *Config) (*Client, error) {
return &Client{cfg: cfg, client: client}, nil
}

// submit is create the request and send it to GreptimeDB.
// It is can set up the Operation as [INSERT,DELETE]
func (c *Client) submit(ctx context.Context, operation types.Operation, tables ...*table.Table) (*gpb.GreptimeResponse, error) {
header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, operation, tables...).Build()
if err != nil {
return nil, err
}
return c.client.Handle(ctx, request_)
}

// Write is to write the data into GreptimeDB via explicit schema.
//
// tbl, err := table.New(<tableName>)
Expand All @@ -56,19 +68,31 @@ func NewClient(cfg *Config) (*Client, error) {
// tbl.AddFieldColumn("field1", types.STRING)
// tbl.AddFieldColumn("field2", types.FLOAT64)
// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND)
//
// timestamp := time.Now()
// // you can add multiple row(s). This is the real data.
// tbl.AddRow(1, "hello", 1.1, time.Now())
// tbl.AddRow(1, "hello", 1.1, timestamp)
//
// // write data into GreptimeDB
// resp, err := client.Write(context.Background(), tbl)
func (c *Client) Write(ctx context.Context, tables ...*table.Table) (*gpb.GreptimeResponse, error) {
header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, tables...).Build()
if err != nil {
return nil, err
}
return c.client.Handle(ctx, request_)
return c.submit(ctx, types.INSERT, tables...)
}

// Delete is to delete the data from GreptimeDB via explicit schema.
//
// tbl, err := table.New(<tableName>)
//
// // add column at first. This is to define the schema of the table.
// tbl.AddTagColumn("tag1", types.INT64)
// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND)
//
// // you can add multiple row(s). This is the real data.
// tbl.AddRow("tag1", timestamp)
//
// // delete the data from GreptimeDB
// resp, err := client.Delete(context.Background() tbl)
func (c *Client) Delete(ctx context.Context, tables ...*table.Table) (*gpb.GreptimeResponse, error) {
return c.submit(ctx, types.DELETE, tables...)
}

// WriteObject is like [Write] to write the data into GreptimeDB, but schema is defined in the struct tag.
Expand Down Expand Up @@ -115,7 +139,35 @@ func (c *Client) WriteObject(ctx context.Context, obj any) (*gpb.GreptimeRespons
return nil, err
}

return c.Write(ctx, tbl)
return c.submit(ctx, types.INSERT, tbl)
}

// DeleteObject is like [Delete] to delete the data from GreptimeDB, but schema is defined in the struct tag.
// resp, err := client.WriteObject(context.Background(), deleteMonitors)
func (c *Client) DeleteObject(ctx context.Context, obj any) (*gpb.GreptimeResponse, error) {
tbl, err := schema.Parse(obj)
if err != nil {
return nil, err
}

return c.submit(ctx, types.DELETE, tbl)
}

func (c *Client) streamSubimt(ctx context.Context, operation types.Operation, tables ...*table.Table) error {
if c.stream == nil {
stream, err := c.client.HandleRequests(ctx)
if err != nil {
return err
}
c.stream = stream
}

header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, operation, tables...).Build()
if err != nil {
return err
}
return c.stream.Send(request_)
}

// StreamWrite is to send the data into GreptimeDB via explicit schema.
Expand All @@ -127,27 +179,31 @@ func (c *Client) WriteObject(ctx context.Context, obj any) (*gpb.GreptimeRespons
// tbl.AddFieldColumn("field1", types.STRING)
// tbl.AddFieldColumn("field2", types.FLOAT64)
// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND)
//
// timestamp = time.Now()
// // you can add multiple row(s). This is the real data.
// tbl.AddRow(1, "hello", 1.1, time.Now())
// tbl.AddRow(1, "hello", 1.1, timestamp)
//
// // send data into GreptimeDB
// resp, err := client.StreamWrite(context.Background(), tbl)
func (c *Client) StreamWrite(ctx context.Context, tables ...*table.Table) error {
if c.stream == nil {
stream, err := c.client.HandleRequests(ctx)
if err != nil {
return err
}
c.stream = stream
}
return c.streamSubimt(ctx, types.INSERT, tables...)
}

header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, tables...).Build()
if err != nil {
return err
}
return c.stream.Send(request_)
// StreamDelete is to delete the data from GreptimeDB via explicit schema.
//
// tbl, err := table.New(<tableName>)
//
// // add column at first. This is to define the schema of the table.
// tbl.AddTagColumn("tag1", types.INT64)
// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND)
//
// // you can add multiple row(s). This is the real data.
// tbl.AddRow("tag1", timestamp)
//
// // delete the data from GreptimeDB
// resp, err := client.StreamWrite(context.Background(), tbl)
func (c *Client) StreamDelete(ctx context.Context, tables ...*table.Table) error {
return c.streamSubimt(ctx, types.DELETE, tables...)
}

// StreamWriteObject is like [StreamWrite] to send the data into GreptimeDB, but schema is defined in the struct tag.
Expand Down Expand Up @@ -193,7 +249,17 @@ func (c *Client) StreamWriteObject(ctx context.Context, body any) error {
if err != nil {
return err
}
return c.StreamWrite(ctx, tbl)
return c.streamSubimt(ctx, types.INSERT, tbl)
}

// StreamDeleteObject is like [StreamDelete] to Delete the data from GreptimeDB, but schema is defined in the struct tag.
// resp, err := client.StreamWriteObject(context.Background(), deleteMonitors)
func (c *Client) StreamDeleteObject(ctx context.Context, body any) error {
tbl, err := schema.Parse(body)
if err != nil {
return err
}
return c.streamSubimt(ctx, types.DELETE, tbl)
}

// CloseStream closes the stream. Once we’ve finished writing our client’s requests to the stream
Expand Down
Loading

0 comments on commit 99663b9

Please sign in to comment.