Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Jul 30, 2023
1 parent 7587f5c commit e7151d4
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
20 changes: 17 additions & 3 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package elasticsearch

import (
"context"
"encoding/json"
"errors"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"reflect"
"strings"
)

type BatchInserter struct {
Expand Down Expand Up @@ -38,10 +40,14 @@ func (w *BatchInserter) Write(ctx context.Context, model interface{}) ([]int, []
idValue := modelValue.Field(idIndex).String()
if idValue != "" {
body := BuildQueryWithoutIdFromObject(sliceValue)
jsonBody, err := json.Marshal(body)
if err != nil {
return successIndices, failureIndices, err
}
er1 := bi.Add(context.Background(), esutil.BulkIndexerItem{
Action: "create",
DocumentID: idValue,
Body: esutil.NewJSONReader(body),
Body: strings.NewReader(string(jsonBody)), // esutil.NewJSONReader(body),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
successIds = append(successIds, res.DocumentID)
},
Expand Down Expand Up @@ -89,10 +95,14 @@ func InsertMany(ctx context.Context, es *elasticsearch.Client, indexName string,
idValue := modelValue.Field(idIndex).String()
if idValue != "" {
body := BuildQueryWithoutIdFromObject(sliceValue)
jsonBody, err := json.Marshal(body)
if err != nil {
return successIndices, failureIndices, err
}
er1 := bi.Add(context.Background(), esutil.BulkIndexerItem{
Action: "create",
DocumentID: idValue,
Body: esutil.NewJSONReader(body),
Body: strings.NewReader(string(jsonBody)), // esutil.NewJSONReader(body),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
successIds = append(successIds, res.DocumentID)
},
Expand Down Expand Up @@ -140,10 +150,14 @@ func UpsertMany(ctx context.Context, es *elasticsearch.Client, indexName string,
idValue := modelValue.Field(idIndex).String()
if idValue != "" {
body := BuildQueryWithoutIdFromObject(sliceValue)
jsonBody, err := json.Marshal(body)
if err != nil {
return successIndices, failureIndices, err
}
er1 := bi.Add(context.Background(), esutil.BulkIndexerItem{
Action: "index",
DocumentID: idValue,
Body: esutil.NewJSONReader(body),
Body: strings.NewReader(string(jsonBody)), // esutil.NewJSONReader(body),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
successIds = append(successIds, res.DocumentID)
},
Expand Down
4 changes: 3 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Config struct {
CloudID *string `yaml:"cloud_id" mapstructure:"cloud_id" json:"cloudID,omitempty" gorm:"column:cloudid" bson:"cloudID,omitempty" dynamodbav:"cloudID,omitempty" firestore:"cloudID,omitempty"`
APIKey *string `yaml:"api_key" mapstructure:"api_key" json:"apiKey,omitempty" gorm:"column:apikey" bson:"apiKey,omitempty" dynamodbav:"apiKey,omitempty" firestore:"apiKey,omitempty"`
DisableRetry *bool `yaml:"disable_retry" mapstructure:"disable_retry" json:"disableRetry,omitempty" gorm:"column:disableretry" bson:"disableRetry,omitempty" dynamodbav:"disableRetry,omitempty" firestore:"disableRetry,omitempty"`
EnableRetryOnTimeout *bool `yaml:"enable_retry_on_timeout" mapstructure:"enable_retry_on_timeout" json:"enableRetryOnTimeout,omitempty" gorm:"column:enableretryontimeout" bson:"enableRetryOnTimeout,omitempty" dynamodbav:"enableRetryOnTimeout,omitempty" firestore:"enableRetryOnTimeout,omitempty"`
// EnableRetryOnTimeout *bool `yaml:"enable_retry_on_timeout" mapstructure:"enable_retry_on_timeout" json:"enableRetryOnTimeout,omitempty" gorm:"column:enableretryontimeout" bson:"enableRetryOnTimeout,omitempty" dynamodbav:"enableRetryOnTimeout,omitempty" firestore:"enableRetryOnTimeout,omitempty"`
MaxRetries *int `yaml:"max_retries" mapstructure:"max_retries" json:"maxRetries,omitempty" gorm:"column:maxretries" bson:"maxRetries,omitempty" dynamodbav:"maxRetries,omitempty" firestore:"maxRetries,omitempty"`
DiscoverNodesOnStart *bool `yaml:"discover_nodes_on_start" mapstructure:"discover_nodes_on_start" json:"discoverNodesOnStart,omitempty" gorm:"column:discovernodesonstart" bson:"discoverNodesOnStart,omitempty" dynamodbav:"discoverNodesOnStart,omitempty" firestore:"discoverNodesOnStart,omitempty"`
DiscoverNodesInterval *int64 `yaml:"discover_nodes_interval" mapstructure:"discover_nodes_interval" json:"discoverNodesInterval,omitempty" gorm:"column:discovernodesinterval" bson:"discoverNodesInterval,omitempty" dynamodbav:"discoverNodesInterval,omitempty" firestore:"discoverNodesInterval,omitempty"`
Expand Down Expand Up @@ -68,9 +68,11 @@ func GetConfig(conf Config, timeouts ...time.Duration) elasticsearch.Config {
if conf.DisableRetry != nil {
c.DisableRetry = *conf.DisableRetry
}
/*
if conf.EnableRetryOnTimeout != nil {
c.EnableRetryOnTimeout = *conf.EnableRetryOnTimeout
}
*/
if conf.MaxRetries != nil {
c.MaxRetries = *conf.MaxRetries
}
Expand Down

0 comments on commit e7151d4

Please sign in to comment.