Skip to content

Commit

Permalink
updates s3 method to share batching config code
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei committed Oct 30, 2024
1 parent 9c84cd7 commit 3137fbd
Showing 1 changed file with 8 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -671,28 +671,9 @@ func (b *benthosBuilder) getAwsS3SyncBenthosOutput(
`records-${!count("files")}-${!timestamp_unix_nano()}.jsonl.gz`,
)

maxInFlight := 64
if destinationOptions.GetMaxInFlight() > 0 {
maxInFlight = int(destinationOptions.GetMaxInFlight())
}

batchCount := 100
batchPeriod := "5s"
batchConfig := destinationOptions.GetBatch()
if batchConfig != nil {
batchCount = int(batchConfig.GetCount())

if batchConfig.GetPeriod() != "" {
_, err := time.ParseDuration(batchConfig.GetPeriod())
if err != nil {
return nil, fmt.Errorf("unable to parse batch period for s3 destination config: %w", err)
}
}
batchPeriod = batchConfig.GetPeriod()
}

if batchCount == 0 && batchPeriod == "" {
return nil, fmt.Errorf("must have at least one batch policy configured. Cannot disable both period and count")
batchingConfig, err := getParsedBatchingConfig(destinationOptions)
if err != nil {
return nil, err
}

timeout := ""
Expand All @@ -714,14 +695,14 @@ func (b *benthosBuilder) getAwsS3SyncBenthosOutput(
{
AwsS3: &neosync_benthos.AwsS3Insert{
Bucket: connection.AwsS3Config.Bucket,
MaxInFlight: maxInFlight,
MaxInFlight: int(batchingConfig.MaxInFlight),
Timeout: timeout,
StorageClass: storageClass,
Path: strings.Join(s3pathpieces, "/"),
ContentType: "application/gzip",
Batching: &neosync_benthos.Batching{
Count: batchCount,
Period: batchPeriod,
Count: batchingConfig.BatchCount,
Period: batchingConfig.BatchPeriod,
Processors: []*neosync_benthos.BatchProcessor{
{Archive: &neosync_benthos.ArchiveProcessor{Format: "lines"}},
{Compress: &neosync_benthos.CompressProcessor{Algorithm: "gzip"}},
Expand All @@ -736,8 +717,8 @@ func (b *benthosBuilder) getAwsS3SyncBenthosOutput(
{Error: &neosync_benthos.ErrorOutputConfig{
ErrorMsg: `${! meta("fallback_error")}`,
Batching: &neosync_benthos.Batching{
Period: batchPeriod,
Count: batchCount,
Period: batchingConfig.BatchPeriod,
Count: batchingConfig.BatchCount,
},
}},
},
Expand Down

0 comments on commit 3137fbd

Please sign in to comment.