Skip to content

Commit

Permalink
feat: v3 write, plus some other general fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
soudrug authored and vlastahajek committed Dec 30, 2022
1 parent 486a4ad commit 91f06df
Show file tree
Hide file tree
Showing 42 changed files with 2,781 additions and 591 deletions.
16 changes: 15 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
# API Skeleton
# TODO
## Items
- Write
- Async examples
- default tags
- E2E tests
- support for enterpise
- write consistency
- ignore some errors
- Readme
- Examples
- Copyright


## API Skeleton

Bellow is the skeleton for the new client V3 API with parts that are not implemented yet.

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.17
require (
github.com/deepmap/oapi-codegen v1.12.4
github.com/google/go-cmp v0.5.9
github.com/influxdata/line-protocol/v2 v2.2.1
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.1
)
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d/go.
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs=
github.com/deepmap/oapi-codegen v1.12.4 h1:pPmn6qI9MuOtCz82WY2Xaw46EQjgvxednXXrP7g5Q2s=
github.com/deepmap/oapi-codegen v1.12.4/go.mod h1:3lgHGMu6myQ2vqbbTXH2H1o4eXFTGnFiDaOaKKl5yas=
github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/getkin/kin-openapi v0.107.0/go.mod h1:9Dhr+FasATJZjS4iOLvB0hkaxgYdulrNYm2e9epLWOo=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.8.1/go.mod h1:ji8BvRH1azfM+SYow9zQ6SZMvR8qOMZHmsCuWR9tTTk=
Expand All @@ -28,13 +32,21 @@ github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo=
github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY=
github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY=
github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE=
github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM=
github.com/invopop/yaml v0.1.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
Expand Down
20 changes: 10 additions & 10 deletions influxclient/authorizations.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ func newAuthorizationsAPI(client *model.Client) *AuthorizationsAPI {

// Find returns all authorizations matching the given filter.
// Supported filters:
// - OrgName
// - OrgID
// - UserName
// - UserID
// - OrgName
// - OrgID
// - UserName
// - UserID
func (a *AuthorizationsAPI) Find(ctx context.Context, filter *Filter) ([]model.Authorization, error) {
return a.getAuthorizations(ctx, filter)
}

// FindOne returns one authorizationsmatching the given filter.
// Supported filters:
// - OrgName
// - OrgID
// - UserName
// - UserID
// - OrgName
// - OrgID
// - UserName
// - UserID
func (a *AuthorizationsAPI) FindOne(ctx context.Context, filter *Filter) (*model.Authorization, error) {
authorizations, err := a.getAuthorizations(ctx, filter)
if err != nil {
Expand All @@ -60,8 +60,8 @@ func (a *AuthorizationsAPI) Create(ctx context.Context, auth *model.Authorizatio
}
params := &model.PostAuthorizationsAllParams{
Body: model.PostAuthorizationsJSONRequestBody{
OrgID: auth.OrgID,
UserID: auth.UserID,
OrgID: auth.OrgID,
UserID: auth.UserID,
Permissions: auth.Permissions,
},
}
Expand Down
5 changes: 3 additions & 2 deletions influxclient/authorizations_e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build e2e
// +build e2e

// Copyright 2021 InfluxData, Inc. All rights reserved.
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestAuthorizationsAPI(t *testing.T) {
}

auth, err := authAPI.Create(ctx, &model.Authorization{
OrgID: org.Id,
OrgID: org.Id,
Permissions: &permissions,
})
require.NoError(t, err)
Expand Down Expand Up @@ -183,7 +184,7 @@ func TestAuthorizationsAPI_failing(t *testing.T) {
}

auth, err = authAPI.Create(ctx, &model.Authorization{
OrgID: &notExistingID,
OrgID: &notExistingID,
Permissions: &permissions,
})
assert.Error(t, err)
Expand Down
25 changes: 13 additions & 12 deletions influxclient/buckets_e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build e2e
// +build e2e

// Copyright 2020-2021 InfluxData, Inc. All rights reserved.
Expand Down Expand Up @@ -49,7 +50,7 @@ func TestBucketsAPI(t *testing.T) {
// test find existing bucket by name
bucket, err := bucketsAPI.FindOne(ctx, &Filter{
OrgName: orgName,
Name: bucketName,
Name: bucketName,
})
require.Nil(t, err, err)
require.NotNil(t, bucket)
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestBucketsAPI(t *testing.T) {
name := "bucket-x"
b, err := bucketsAPI.Create(ctx, &model.Bucket{
OrgID: org.Id,
Name: name,
Name: name,
RetentionRules: model.RetentionRules{
{
EverySeconds: 3600 * 12,
Expand All @@ -103,7 +104,7 @@ func TestBucketsAPI(t *testing.T) {
defer bucketsAPI.Delete(ctx, safeId(b.Id))
assert.Equal(t, name, b.Name)
assert.Len(t, b.RetentionRules, 1)
assert.Equal(t, b.RetentionRules[0].EverySeconds, int64(3600 * 12))
assert.Equal(t, b.RetentionRules[0].EverySeconds, int64(3600*12))

// Test update
desc := "bucket description"
Expand All @@ -123,15 +124,15 @@ func TestBucketsAPI(t *testing.T) {
rpx := "0"
schemaType := model.SchemaTypeImplicit
bx, err := bucketsAPI.Create(ctx, &model.Bucket{
OrgID: org.Id,
Name: namex,
OrgID: org.Id,
Name: namex,
Description: &descx,
RetentionRules: model.RetentionRules{
{
EverySeconds: 3600 * 12,
},
},
Rp: &rpx,
Rp: &rpx,
SchemaType: &schemaType,
})
require.Nil(t, err, err)
Expand All @@ -140,7 +141,7 @@ func TestBucketsAPI(t *testing.T) {
assert.Equal(t, namex, bx.Name)
assert.Equal(t, descx, *bx.Description)
assert.Len(t, bx.RetentionRules, 1)
assert.Equal(t, int64(3600 * 12), bx.RetentionRules[0].EverySeconds)
assert.Equal(t, int64(3600*12), bx.RetentionRules[0].EverySeconds)
//assert.NotNil(t, bx.SchemaType, "%v", bx.SchemaType)
assert.Equal(t, rpx, *bx.Rp)

Expand All @@ -151,7 +152,7 @@ func TestBucketsAPI(t *testing.T) {
require.Nil(t, err, err)
require.NotNil(t, b)

/* TODO UsersAPI does not support these in v3
/* TODO UsersAPI does not support these in v3
// Test owners
userOwner, err := client.UsersAPI().CreateUserWithName(ctx, "bucket-owner")
require.Nil(t, err, err)
Expand Down Expand Up @@ -216,12 +217,12 @@ func TestBucketsAPI(t *testing.T) {
err = client.UsersAPI().DeleteUser(ctx, userMember)
assert.Nil(t, err, err)
*/
*/

// attempt to create bucket with existing name should fail
bucket, err = bucketsAPI.Create(ctx, &model.Bucket{
OrgID: org.Id,
Name: b.Name,
Name: b.Name,
})
assert.Error(t, err)
assert.Nil(t, bucket)
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestBucketsAPI_paging(t *testing.T) {
name := fmt.Sprintf("bucket-%03d", i)
b, err := bucketsAPI.Create(ctx, &model.Bucket{
OrgID: org.Id,
Name: name,
Name: name,
})
require.Nil(t, err, err)
require.NotNil(t, b)
Expand Down Expand Up @@ -306,7 +307,7 @@ func TestBucketsAPI_paging(t *testing.T) {
// test filtering buckets by org name
buckets, err = bucketsAPI.Find(ctx, &Filter{
OrgName: org.Name,
Limit: 100,
Limit: 100,
})
require.Nil(t, err, err)
require.NotNil(t, buckets)
Expand Down
55 changes: 32 additions & 23 deletions influxclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,16 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"mime"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb-client-go/influxclient/model"
)

const (
// DefaultBatchSize default batch size used if not set otherwise.
DefaultBatchSize = 5000
)

// Params holds the parameters for creating a new client.
// The only mandatory field is ServerURL. AuthToken is also important
// if authentication was not done outside this client.
Expand All @@ -39,13 +34,9 @@ type Params struct {
AuthToken string

// Organization is name or ID of organization where data (buckets, users, tasks, etc.) belongs to
// Optional for InfluxDB Cloud
Organization string

// BatchSize holds the default batch size used by PointWriter.
// If it's zero, DefaultBatchSize will be used.
// Note that this can be overridden with PointWriter.SetBatchSize.
BatchSize int

// HTTPClient is used to make API requests.
//
// This can be used to specify a custom TLS configuration
Expand All @@ -54,6 +45,8 @@ type Params struct {
//
// It HTTPClient is nil, http.DefaultClient will be used.
HTTPClient *http.Client
// Write Params
WriteParams WriteParams
}

// Client implements an InfluxDB client.
Expand Down Expand Up @@ -112,6 +105,9 @@ func New(params Params) (*Client, error) {
if err != nil {
return nil, fmt.Errorf("error parsing server URL: %w", err)
}
if params.WriteParams.MaxBatchBytes == 0 {
c.params.WriteParams = DefaultWriteParams
}
// Create API client
c.apiClient, err = model.NewClient(c.apiURL.String(), &apiCallDelegate{c})
if err != nil {
Expand All @@ -133,19 +129,19 @@ func (c *Client) APIClient() *model.Client {
// DeleteParams holds options for DeletePoints.
type DeleteParams struct {
// Bucket holds bucket name.
Bucket string
Bucket string
// BucketID holds bucket ID.
BucketID string
BucketID string
// Org holds organization name.
Org string
Org string
// OrgID holds organization ID.
OrgID string
OrgID string
// Predicate is an expression in delete predicate syntax.
Predicate string
// Start is the earliest time to delete from.
Start time.Time
Start time.Time
// Stop is the latest time to delete from.
Stop time.Time
Stop time.Time
}

// DeletePoints deletes data from a bucket.
Expand All @@ -168,8 +164,8 @@ func (c *Client) DeletePoints(ctx context.Context, params *DeleteParams) error {
postParams := model.PostDeleteAllParams{
Body: model.PostDeleteJSONRequestBody{
Predicate: &params.Predicate,
Start: params.Start,
Stop: params.Stop,
Start: params.Start,
Stop: params.Stop,
},
}
if params.Bucket != "" {
Expand Down Expand Up @@ -273,8 +269,14 @@ func (c *Client) resolveHTTPError(r *http.Response) error {
}

httpError.StatusCode = r.StatusCode
if v := r.Header.Get("Retry-After"); v != "" {
r, err := strconv.ParseInt(v, 10, 32)
if err == nil {
httpError.RetryAfter = int(r)
}
}

body, err := ioutil.ReadAll(r.Body)
body, err := io.ReadAll(r.Body)
if err != nil {
httpError.Message = fmt.Sprintf("cannot read error response:: %v", err)
}
Expand Down Expand Up @@ -306,9 +308,9 @@ func (d *apiCallDelegate) Do(req *http.Request) (*http.Response, error) {
req.URL.RawQuery = ""
return d.c.makeAPICall(req.Context(), httpParams{
endpointURL: req.URL,
headers: req.Header,
httpMethod: req.Method,
body: req.Body,
headers: req.Header,
httpMethod: req.Method,
body: req.Body,
queryParams: queryParams,
})
}
Expand Down Expand Up @@ -348,3 +350,10 @@ func (c *Client) TasksAPI() *TasksAPI {
func (c *Client) UsersAPI() *UsersAPI {
return newUsersAPI(c.apiClient)
}

// Close closes all idle connections.
func (c *Client) Close() error {
c.params.HTTPClient.CloseIdleConnections()
// Support closer interface
return nil
}
Loading

0 comments on commit 91f06df

Please sign in to comment.