diff --git a/README.md b/README.md index 667f7c3..31b69d0 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,20 @@ err := tbl.AddRow(2, "127.0.0.2", time.Now()) resp, err := cli.Write(context.Background(), tbl) ``` +##### Delete in GreptimeDB + +```go +dtbl, err := table.New("") +dtbl.AddTagColumn("id", types.INT64) +dtbl.AddFieldColumn("host", types.STRING) +dtbl.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND) + +// timestamp is the time you want to delete row +err := dtbl.AddRow(1, "127.0.0.1",timestamp) + +affected, err := cli.Delete(context.Background(),dtbl) +``` + ##### Stream Write into GreptimeDB ```go @@ -96,6 +110,13 @@ err := cli.StreamWrite(context.Background(), tbl) ... affected, err := cli.CloseStream(ctx) ``` +##### Stream Delete in GreptimeDB + +```go +err := cli.StreamDelete(context.Background(), tbl) +... +affected, err := cli.CloseStream(ctx) +``` #### ORM style @@ -149,6 +170,14 @@ monitors := []Monitor{ resp, err := cli.WriteObject(context.Background(), monitors) ``` +##### DeleteObject in GreptimeDB + +```go +deleteMonitors := monitors[:1] + +affected, err := cli.DeleteObject(context.Background(), deleteMonitors) +``` + ##### Stream WriteObject into GreptimeDB ```go @@ -157,6 +186,16 @@ err := cli.StreamWriteObject(context.Background(), monitors) affected, err := cli.CloseStream(ctx) ``` +##### Stream DeleteObject in GreptimeDB + +```go +deleteMonitors := monitors[:1] + +err := cli.StreamDeleteObject(context.Background(), deleteMonitors) +... +affected, err := cli.CloseStream(ctx) +``` + ## Datatypes supported The **GreptimeDB** column is for the datatypes supported in library, and the **Go** column is the matched Go type. diff --git a/client.go b/client.go index ef9cfd4..6a9ca80 100644 --- a/client.go +++ b/client.go @@ -24,6 +24,7 @@ import ( "github.com/GreptimeTeam/greptimedb-ingester-go/request/header" "github.com/GreptimeTeam/greptimedb-ingester-go/schema" "github.com/GreptimeTeam/greptimedb-ingester-go/table" + "github.com/GreptimeTeam/greptimedb-ingester-go/table/types" ) // Client helps to write data into GreptimeDB. A Client is safe for concurrent @@ -47,6 +48,17 @@ func NewClient(cfg *Config) (*Client, error) { return &Client{cfg: cfg, client: client}, nil } +// submit is create the request and send it to GreptimeDB. +// It is can set up the Operation as [INSERT,DELETE] +func (c *Client) submit(ctx context.Context, operation types.Operation, tables ...*table.Table) (*gpb.GreptimeResponse, error) { + header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) + request_, err := request.New(header_, operation, tables...).Build() + if err != nil { + return nil, err + } + return c.client.Handle(ctx, request_) +} + // Write is to write the data into GreptimeDB via explicit schema. // // tbl, err := table.New() @@ -56,19 +68,31 @@ func NewClient(cfg *Config) (*Client, error) { // tbl.AddFieldColumn("field1", types.STRING) // tbl.AddFieldColumn("field2", types.FLOAT64) // tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND) -// +// timestamp := time.Now() // // you can add multiple row(s). This is the real data. -// tbl.AddRow(1, "hello", 1.1, time.Now()) +// tbl.AddRow(1, "hello", 1.1, timestamp) // // // write data into GreptimeDB // resp, err := client.Write(context.Background(), tbl) func (c *Client) Write(ctx context.Context, tables ...*table.Table) (*gpb.GreptimeResponse, error) { - header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) - request_, err := request.New(header_, tables...).Build() - if err != nil { - return nil, err - } - return c.client.Handle(ctx, request_) + return c.submit(ctx, types.INSERT, tables...) +} + +// Delete is to delete the data from GreptimeDB via explicit schema. +// +// tbl, err := table.New() +// +// // add column at first. This is to define the schema of the table. +// tbl.AddTagColumn("tag1", types.INT64) +// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND) +// +// // you can add multiple row(s). This is the real data. +// tbl.AddRow("tag1", timestamp) +// +// // delete the data from GreptimeDB +// resp, err := client.Delete(context.Background() tbl) +func (c *Client) Delete(ctx context.Context, tables ...*table.Table) (*gpb.GreptimeResponse, error) { + return c.submit(ctx, types.DELETE, tables...) } // WriteObject is like [Write] to write the data into GreptimeDB, but schema is defined in the struct tag. @@ -115,7 +139,35 @@ func (c *Client) WriteObject(ctx context.Context, obj any) (*gpb.GreptimeRespons return nil, err } - return c.Write(ctx, tbl) + return c.submit(ctx, types.INSERT, tbl) +} + +// DeleteObject is like [Delete] to delete the data from GreptimeDB, but schema is defined in the struct tag. +// resp, err := client.WriteObject(context.Background(), deleteMonitors) +func (c *Client) DeleteObject(ctx context.Context, obj any) (*gpb.GreptimeResponse, error) { + tbl, err := schema.Parse(obj) + if err != nil { + return nil, err + } + + return c.submit(ctx, types.DELETE, tbl) +} + +func (c *Client) streamSubimt(ctx context.Context, operation types.Operation, tables ...*table.Table) error { + if c.stream == nil { + stream, err := c.client.HandleRequests(ctx) + if err != nil { + return err + } + c.stream = stream + } + + header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) + request_, err := request.New(header_, operation, tables...).Build() + if err != nil { + return err + } + return c.stream.Send(request_) } // StreamWrite is to send the data into GreptimeDB via explicit schema. @@ -127,27 +179,31 @@ func (c *Client) WriteObject(ctx context.Context, obj any) (*gpb.GreptimeRespons // tbl.AddFieldColumn("field1", types.STRING) // tbl.AddFieldColumn("field2", types.FLOAT64) // tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND) -// +// timestamp = time.Now() // // you can add multiple row(s). This is the real data. -// tbl.AddRow(1, "hello", 1.1, time.Now()) +// tbl.AddRow(1, "hello", 1.1, timestamp) // // // send data into GreptimeDB // resp, err := client.StreamWrite(context.Background(), tbl) func (c *Client) StreamWrite(ctx context.Context, tables ...*table.Table) error { - if c.stream == nil { - stream, err := c.client.HandleRequests(ctx) - if err != nil { - return err - } - c.stream = stream - } + return c.streamSubimt(ctx, types.INSERT, tables...) +} - header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) - request_, err := request.New(header_, tables...).Build() - if err != nil { - return err - } - return c.stream.Send(request_) +// StreamDelete is to delete the data from GreptimeDB via explicit schema. +// +// tbl, err := table.New() +// +// // add column at first. This is to define the schema of the table. +// tbl.AddTagColumn("tag1", types.INT64) +// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND) +// +// // you can add multiple row(s). This is the real data. +// tbl.AddRow("tag1", timestamp) +// +// // delete the data from GreptimeDB +// resp, err := client.StreamWrite(context.Background(), tbl) +func (c *Client) StreamDelete(ctx context.Context, tables ...*table.Table) error { + return c.streamSubimt(ctx, types.DELETE, tables...) } // StreamWriteObject is like [StreamWrite] to send the data into GreptimeDB, but schema is defined in the struct tag. @@ -193,7 +249,17 @@ func (c *Client) StreamWriteObject(ctx context.Context, body any) error { if err != nil { return err } - return c.StreamWrite(ctx, tbl) + return c.streamSubimt(ctx, types.INSERT, tbl) +} + +// StreamDeleteObject is like [StreamDelete] to Delete the data from GreptimeDB, but schema is defined in the struct tag. +// resp, err := client.StreamWriteObject(context.Background(), deleteMonitors) +func (c *Client) StreamDeleteObject(ctx context.Context, body any) error { + tbl, err := schema.Parse(body) + if err != nil { + return err + } + return c.streamSubimt(ctx, types.DELETE, tbl) } // CloseStream closes the stream. Once we’ve finished writing our client’s requests to the stream diff --git a/client_test.go b/client_test.go index 82460bb..63d8551 100644 --- a/client_test.go +++ b/client_test.go @@ -189,7 +189,7 @@ func newMysql() *Mysql { func init() { repo := "greptime/greptimedb" - tag := "v0.6.0" + tag := "v0.7.0" pool, err := dockertest.NewPool("") if err != nil { @@ -307,6 +307,100 @@ func TestWriteMonitors(t *testing.T) { } } +func TestDeleteMonitors(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + ts3 := time.Now().Add(-3 * time.Minute).UnixMilli() + time3 := time.UnixMilli(ts3).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.3", + Memory: 3, + Cpu: 3.0, + Temperature: -3, + Ts: time3, + Running: true, + }, + } + + table, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, table.AddTagColumn("id", types.INT64)) + assert.Nil(t, table.AddTagColumn("host", types.STRING)) + assert.Nil(t, table.AddFieldColumn("memory", types.UINT64)) + assert.Nil(t, table.AddFieldColumn("cpu", types.FLOAT64)) + assert.Nil(t, table.AddFieldColumn("temperature", types.INT64)) + assert.Nil(t, table.AddFieldColumn("running", types.BOOLEAN)) + assert.Nil(t, table.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + + for _, monitor := range monitors { + err := table.AddRow(monitor.ID, monitor.Host, + monitor.Memory, monitor.Cpu, monitor.Temperature, monitor.Running, + monitor.Ts) + assert.Nil(t, err) + } + + resp, err := cli.Write(context.Background(), table) + assert.Nil(t, err) + assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) + assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) + assert.Equal(t, uint32(len(monitors)), resp.GetAffectedRows().GetValue()) + + dtable, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, dtable.AddTagColumn("id", types.INT64)) + assert.Nil(t, dtable.AddTagColumn("host", types.STRING)) + assert.Nil(t, dtable.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + deleteMonitors := monitors[:1] + for _, monitor := range deleteMonitors { + err := dtable.AddRow(monitor.ID, monitor.Host, monitor.Ts) + assert.Nil(t, err) + } + + resp, err = cli.Delete(context.Background(), dtable) + + assert.Nil(t, err) + assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) + assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) + assert.Equal(t, uint32(len(deleteMonitors)), resp.GetAffectedRows().GetValue()) + + monitors = monitors[1:] + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) + assert.Nil(t, err) + + assert.Equal(t, len(monitors), len(monitors_)) + + for i, monitor_ := range monitors_ { + assert.Equal(t, monitors[i], monitor_) + } +} + func TestCreateMonitors(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err) @@ -352,6 +446,71 @@ func TestCreateMonitors(t *testing.T) { } } +func TestDeleteObjMonitors(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + ts3 := time.Now().Add(-3 * time.Minute).UnixMilli() + time3 := time.UnixMilli(ts3).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.3", + Memory: 3, + Cpu: 3.0, + Temperature: -3, + Ts: time3, + Running: true, + }, + } + + resp, err := cli.WriteObject(context.Background(), monitors) + assert.Nil(t, err) + assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) + assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) + assert.Equal(t, uint32(len(monitors)), resp.GetAffectedRows().GetValue()) + + deleteMonitors := monitors[:1] + resp, err = cli.DeleteObject(context.Background(), deleteMonitors) + + assert.Nil(t, err) + assert.Zero(t, resp.GetHeader().GetStatus().GetStatusCode()) + assert.Empty(t, resp.GetHeader().GetStatus().GetErrMsg()) + assert.Equal(t, uint32(len(deleteMonitors)), resp.GetAffectedRows().GetValue()) + + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) + assert.Nil(t, err) + + monitors = monitors[1:] + assert.Equal(t, len(monitors), len(monitors_)) + + for i, monitor_ := range monitors_ { + assert.Equal(t, monitors[i], monitor_) + } +} + func TestInsertMonitorWithNilFields(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err) @@ -578,6 +737,102 @@ func TestStreamWrite(t *testing.T) { } } +func TestStreamDelete(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + ts3 := time.Now().Add(-3 * time.Minute).UnixMilli() + time3 := time.UnixMilli(ts3).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.3", + Memory: 3, + Cpu: 3.0, + Temperature: -3, + Ts: time3, + Running: true, + }, + } + + table, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, table.AddTagColumn("id", types.INT64)) + assert.Nil(t, table.AddTagColumn("host", types.STRING)) + assert.Nil(t, table.AddFieldColumn("memory", types.UINT64)) + assert.Nil(t, table.AddFieldColumn("cpu", types.FLOAT64)) + assert.Nil(t, table.AddFieldColumn("temperature", types.INT64)) + assert.Nil(t, table.AddFieldColumn("running", types.BOOLEAN)) + assert.Nil(t, table.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + + for _, monitor := range monitors { + err := table.AddRow(monitor.ID, monitor.Host, + monitor.Memory, monitor.Cpu, monitor.Temperature, monitor.Running, + monitor.Ts) + assert.Nil(t, err) + } + + err = cli.StreamWrite(context.Background(), table) + assert.Nil(t, err) + affected, err := cli.CloseStream(context.Background()) + assert.EqualValues(t, uint(len(monitors)), affected.GetValue()) + assert.Nil(t, err) + + // test stream delete after wirted data points + dtable, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, dtable.AddTagColumn("id", types.INT64)) + assert.Nil(t, dtable.AddTagColumn("host", types.STRING)) + assert.Nil(t, dtable.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + + deleteMonitors := monitors[:1] + + for _, monitor := range deleteMonitors { + err := dtable.AddRow(monitor.ID, monitor.Host, monitor.Ts) + assert.Nil(t, err) + } + err = cli.StreamDelete(context.Background(), dtable) + assert.Nil(t, err) + affected, err = cli.CloseStream(context.Background()) + + assert.EqualValues(t, uint(len(deleteMonitors)), affected.GetValue()) + assert.Nil(t, err) + + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) + assert.Nil(t, err) + + monitors = monitors[1:] + assert.Equal(t, len(monitors), len(monitors_)) + + for i, monitor_ := range monitors_ { + assert.Equal(t, monitors[i], monitor_) + } +} + func TestStreamCreate(t *testing.T) { loc, err := time.LoadLocation(timezone) assert.Nil(t, err) @@ -610,7 +865,7 @@ func TestStreamCreate(t *testing.T) { err = cli.StreamWriteObject(context.Background(), monitors) assert.Nil(t, err) affected, err := cli.CloseStream(context.Background()) - assert.EqualValues(t, 2, affected.GetValue()) + assert.EqualValues(t, uint32(len(monitors)), affected.GetValue()) assert.Nil(t, err) monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) @@ -622,3 +877,66 @@ func TestStreamCreate(t *testing.T) { assert.Equal(t, monitors[i], monitor_) } } + +func TestStreamDeleteObj(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + ts3 := time.Now().Add(-3 * time.Minute).UnixMilli() + time3 := time.UnixMilli(ts3).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.3", + Memory: 3, + Cpu: 3.0, + Temperature: -3, + Ts: time3, + Running: true, + }, + } + + err = cli.StreamWriteObject(context.Background(), monitors) + assert.Nil(t, err) + affected, err := cli.CloseStream(context.Background()) + assert.EqualValues(t, uint32(len(monitors)), affected.GetValue()) + assert.Nil(t, err) + + deleteMonitors := monitors[:1] + err = cli.StreamDeleteObject(context.Background(), deleteMonitors) + assert.Nil(t, err) + affected, err = cli.CloseStream(context.Background()) + assert.EqualValues(t, uint32(len(deleteMonitors)), affected.GetValue()) + assert.Nil(t, err) + + monitors = monitors[1:] + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) + assert.Nil(t, err) + assert.Equal(t, len(monitors), len(monitors_)) + + for i, monitor_ := range monitors_ { + assert.Equal(t, monitors[i], monitor_) + } +} diff --git a/errs/error.go b/errs/error.go index 2926341..ac992e5 100644 --- a/errs/error.go +++ b/errs/error.go @@ -24,4 +24,5 @@ var ( ErrEmptyTableName = errors.New("name of table should not be empty") ErrEmptyTable = errors.New("please add at least one row") ErrEmptyColumn = errors.New("column not set, please call AddColumn first") + ErrInvalidOperation = errors.New("invalid operation") ) diff --git a/request/request.go b/request/request.go index 43aef3b..d46366d 100644 --- a/request/request.go +++ b/request/request.go @@ -20,17 +20,20 @@ import ( "github.com/GreptimeTeam/greptimedb-ingester-go/errs" "github.com/GreptimeTeam/greptimedb-ingester-go/request/header" "github.com/GreptimeTeam/greptimedb-ingester-go/table" + "github.com/GreptimeTeam/greptimedb-ingester-go/table/types" ) type Request struct { - header *header.Header - tables []*table.Table + header *header.Header + tables []*table.Table + operation types.Operation } -func New(header *header.Header, tables ...*table.Table) *Request { +func New(header *header.Header, operation types.Operation, tables ...*table.Table) *Request { return &Request{ - header: header, - tables: tables, + header: header, + tables: tables, + operation: operation, } } @@ -62,18 +65,40 @@ func (r *Request) Build() (*gpb.GreptimeRequest, error) { return nil, err } - reqs := make([]*gpb.RowInsertRequest, 0, len(r.tables)) - for _, table := range r.tables { - req, err := table.ToRequest() - if err != nil { - return nil, err + switch r.operation { + case types.INSERT: + insertReqs := make([]*gpb.RowInsertRequest, 0, len(r.tables)) + for _, table := range r.tables { + req, err := table.ToInsertRequest() + if err != nil { + return nil, err + } + insertReqs = append(insertReqs, req) + } + req := &gpb.GreptimeRequest_RowInserts{ + RowInserts: &gpb.RowInsertRequests{Inserts: insertReqs}, + } + return &gpb.GreptimeRequest{ + Header: header, + Request: req, + }, nil + case types.DELETE: + deleteReqs := make([]*gpb.RowDeleteRequest, 0, len(r.tables)) + for _, table := range r.tables { + req, err := table.ToDeleteRequest() + if err != nil { + return nil, err + } + deleteReqs = append(deleteReqs, req) } - reqs = append(reqs, req) - } - req := &gpb.GreptimeRequest_RowInserts{ - RowInserts: &gpb.RowInsertRequests{Inserts: reqs}, + req := &gpb.GreptimeRequest_RowDeletes{ + RowDeletes: &gpb.RowDeleteRequests{Deletes: deleteReqs}, + } + return &gpb.GreptimeRequest{ + Header: header, + Request: req, + }, nil } - return &gpb.GreptimeRequest{Header: header, Request: req}, nil - + return nil, errs.ErrInvalidOperation } diff --git a/table/table.go b/table/table.go index c292b72..892d3dc 100644 --- a/table/table.go +++ b/table/table.go @@ -204,7 +204,7 @@ func (t *Table) sanitate_if_needed(name string) (string, error) { return name, nil } -func (t *Table) ToRequest() (*gpb.RowInsertRequest, error) { +func (t *Table) ToInsertRequest() (*gpb.RowInsertRequest, error) { if t.IsEmpty() { return nil, errs.ErrEmptyTable } @@ -214,9 +214,24 @@ func (t *Table) ToRequest() (*gpb.RowInsertRequest, error) { return nil, err } - req := &gpb.RowInsertRequest{ + return &gpb.RowInsertRequest{ TableName: name, Rows: t.GetRows(), + }, nil +} + +func (t *Table) ToDeleteRequest() (*gpb.RowDeleteRequest, error) { + if t.IsEmpty() { + return nil, errs.ErrEmptyTable + } + + name, err := t.GetName() + if err != nil { + return nil, err } - return req, nil + + return &gpb.RowDeleteRequest{ + TableName: name, + Rows: t.GetRows(), + }, nil } diff --git a/table/types/types.go b/table/types/types.go index 6afae79..2106921 100644 --- a/table/types/types.go +++ b/table/types/types.go @@ -247,3 +247,13 @@ func ConvertType(type_ ColumnType) (gpb.ColumnDataType, error) { } } + +// Operation is the type of write operation +// current supported [Insert, Delete] +// TODO: [Update] +type Operation uint + +const ( + INSERT Operation = iota + DELETE +)