From 3137fbd024d21cc61b42d3017510ab59b285eef4 Mon Sep 17 00:00:00 2001 From: Nick Z <2420177+nickzelei@users.noreply.github.com> Date: Wed, 30 Oct 2024 16:25:43 -0700 Subject: [PATCH] updates s3 method to share batching config code --- .../activities/gen-benthos-configs/sync.go | 35 +++++-------------- 1 file changed, 8 insertions(+), 27 deletions(-) diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go index 12c6d457bc..93925a4362 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/sync.go @@ -671,28 +671,9 @@ func (b *benthosBuilder) getAwsS3SyncBenthosOutput( `records-${!count("files")}-${!timestamp_unix_nano()}.jsonl.gz`, ) - maxInFlight := 64 - if destinationOptions.GetMaxInFlight() > 0 { - maxInFlight = int(destinationOptions.GetMaxInFlight()) - } - - batchCount := 100 - batchPeriod := "5s" - batchConfig := destinationOptions.GetBatch() - if batchConfig != nil { - batchCount = int(batchConfig.GetCount()) - - if batchConfig.GetPeriod() != "" { - _, err := time.ParseDuration(batchConfig.GetPeriod()) - if err != nil { - return nil, fmt.Errorf("unable to parse batch period for s3 destination config: %w", err) - } - } - batchPeriod = batchConfig.GetPeriod() - } - - if batchCount == 0 && batchPeriod == "" { - return nil, fmt.Errorf("must have at least one batch policy configured. Cannot disable both period and count") + batchingConfig, err := getParsedBatchingConfig(destinationOptions) + if err != nil { + return nil, err } timeout := "" @@ -714,14 +695,14 @@ func (b *benthosBuilder) getAwsS3SyncBenthosOutput( { AwsS3: &neosync_benthos.AwsS3Insert{ Bucket: connection.AwsS3Config.Bucket, - MaxInFlight: maxInFlight, + MaxInFlight: int(batchingConfig.MaxInFlight), Timeout: timeout, StorageClass: storageClass, Path: strings.Join(s3pathpieces, "/"), ContentType: "application/gzip", Batching: &neosync_benthos.Batching{ - Count: batchCount, - Period: batchPeriod, + Count: batchingConfig.BatchCount, + Period: batchingConfig.BatchPeriod, Processors: []*neosync_benthos.BatchProcessor{ {Archive: &neosync_benthos.ArchiveProcessor{Format: "lines"}}, {Compress: &neosync_benthos.CompressProcessor{Algorithm: "gzip"}}, @@ -736,8 +717,8 @@ func (b *benthosBuilder) getAwsS3SyncBenthosOutput( {Error: &neosync_benthos.ErrorOutputConfig{ ErrorMsg: `${! meta("fallback_error")}`, Batching: &neosync_benthos.Batching{ - Period: batchPeriod, - Count: batchCount, + Period: batchingConfig.BatchPeriod, + Count: batchingConfig.BatchCount, }, }}, },