From e7151d4d76c72a61535d4902a93ae8aad8b442d5 Mon Sep 17 00:00:00 2001 From: Duc Nguyen Date: Sun, 30 Jul 2023 23:56:19 +0700 Subject: [PATCH] Refactor code --- batch.go | 20 +++++++++++++++++--- config.go | 4 +++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/batch.go b/batch.go index 69ec82e..2ede262 100644 --- a/batch.go +++ b/batch.go @@ -2,10 +2,12 @@ package elasticsearch import ( "context" + "encoding/json" "errors" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esutil" "reflect" + "strings" ) type BatchInserter struct { @@ -38,10 +40,14 @@ func (w *BatchInserter) Write(ctx context.Context, model interface{}) ([]int, [] idValue := modelValue.Field(idIndex).String() if idValue != "" { body := BuildQueryWithoutIdFromObject(sliceValue) + jsonBody, err := json.Marshal(body) + if err != nil { + return successIndices, failureIndices, err + } er1 := bi.Add(context.Background(), esutil.BulkIndexerItem{ Action: "create", DocumentID: idValue, - Body: esutil.NewJSONReader(body), + Body: strings.NewReader(string(jsonBody)), // esutil.NewJSONReader(body), OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { successIds = append(successIds, res.DocumentID) }, @@ -89,10 +95,14 @@ func InsertMany(ctx context.Context, es *elasticsearch.Client, indexName string, idValue := modelValue.Field(idIndex).String() if idValue != "" { body := BuildQueryWithoutIdFromObject(sliceValue) + jsonBody, err := json.Marshal(body) + if err != nil { + return successIndices, failureIndices, err + } er1 := bi.Add(context.Background(), esutil.BulkIndexerItem{ Action: "create", DocumentID: idValue, - Body: esutil.NewJSONReader(body), + Body: strings.NewReader(string(jsonBody)), // esutil.NewJSONReader(body), OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { successIds = append(successIds, res.DocumentID) }, @@ -140,10 +150,14 @@ func UpsertMany(ctx context.Context, es *elasticsearch.Client, indexName string, idValue := modelValue.Field(idIndex).String() if idValue != "" { body := BuildQueryWithoutIdFromObject(sliceValue) + jsonBody, err := json.Marshal(body) + if err != nil { + return successIndices, failureIndices, err + } er1 := bi.Add(context.Background(), esutil.BulkIndexerItem{ Action: "index", DocumentID: idValue, - Body: esutil.NewJSONReader(body), + Body: strings.NewReader(string(jsonBody)), // esutil.NewJSONReader(body), OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { successIds = append(successIds, res.DocumentID) }, diff --git a/config.go b/config.go index dfce842..d981366 100644 --- a/config.go +++ b/config.go @@ -20,7 +20,7 @@ type Config struct { CloudID *string `yaml:"cloud_id" mapstructure:"cloud_id" json:"cloudID,omitempty" gorm:"column:cloudid" bson:"cloudID,omitempty" dynamodbav:"cloudID,omitempty" firestore:"cloudID,omitempty"` APIKey *string `yaml:"api_key" mapstructure:"api_key" json:"apiKey,omitempty" gorm:"column:apikey" bson:"apiKey,omitempty" dynamodbav:"apiKey,omitempty" firestore:"apiKey,omitempty"` DisableRetry *bool `yaml:"disable_retry" mapstructure:"disable_retry" json:"disableRetry,omitempty" gorm:"column:disableretry" bson:"disableRetry,omitempty" dynamodbav:"disableRetry,omitempty" firestore:"disableRetry,omitempty"` - EnableRetryOnTimeout *bool `yaml:"enable_retry_on_timeout" mapstructure:"enable_retry_on_timeout" json:"enableRetryOnTimeout,omitempty" gorm:"column:enableretryontimeout" bson:"enableRetryOnTimeout,omitempty" dynamodbav:"enableRetryOnTimeout,omitempty" firestore:"enableRetryOnTimeout,omitempty"` + // EnableRetryOnTimeout *bool `yaml:"enable_retry_on_timeout" mapstructure:"enable_retry_on_timeout" json:"enableRetryOnTimeout,omitempty" gorm:"column:enableretryontimeout" bson:"enableRetryOnTimeout,omitempty" dynamodbav:"enableRetryOnTimeout,omitempty" firestore:"enableRetryOnTimeout,omitempty"` MaxRetries *int `yaml:"max_retries" mapstructure:"max_retries" json:"maxRetries,omitempty" gorm:"column:maxretries" bson:"maxRetries,omitempty" dynamodbav:"maxRetries,omitempty" firestore:"maxRetries,omitempty"` DiscoverNodesOnStart *bool `yaml:"discover_nodes_on_start" mapstructure:"discover_nodes_on_start" json:"discoverNodesOnStart,omitempty" gorm:"column:discovernodesonstart" bson:"discoverNodesOnStart,omitempty" dynamodbav:"discoverNodesOnStart,omitempty" firestore:"discoverNodesOnStart,omitempty"` DiscoverNodesInterval *int64 `yaml:"discover_nodes_interval" mapstructure:"discover_nodes_interval" json:"discoverNodesInterval,omitempty" gorm:"column:discovernodesinterval" bson:"discoverNodesInterval,omitempty" dynamodbav:"discoverNodesInterval,omitempty" firestore:"discoverNodesInterval,omitempty"` @@ -68,9 +68,11 @@ func GetConfig(conf Config, timeouts ...time.Duration) elasticsearch.Config { if conf.DisableRetry != nil { c.DisableRetry = *conf.DisableRetry } + /* if conf.EnableRetryOnTimeout != nil { c.EnableRetryOnTimeout = *conf.EnableRetryOnTimeout } + */ if conf.MaxRetries != nil { c.MaxRetries = *conf.MaxRetries }