From e8bf09d4d52f2b80c638801b8979e4269f4cb3f5 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Thu, 9 Jan 2025 09:51:30 -0800 Subject: [PATCH] Destination S3V2: Restore Thread-Safety to Unique Key Counter (#50989) --- .../ObjectStorageDestinationStateManager.kt | 11 ++++++++++- .../connectors/destination-s3/build.gradle | 2 +- .../connectors/destination-s3/gradle.properties | 2 +- .../connectors/destination-s3/metadata.yaml | 2 +- .../destination-s3/src/main/kotlin/S3V2Checker.kt | 3 +-- .../integrations/destination/s3_v2/S3V2WriteTest.kt | 2 +- docs/integrations/destinations/s3.md | 1 + 7 files changed, 16 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index da8b5df5f871..2a6dd6e296b0 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.load.state.object_storage +import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.annotation.JsonProperty import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream @@ -23,6 +24,8 @@ import java.nio.file.Paths import java.util.concurrent.ConcurrentHashMap import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") class ObjectStorageDestinationState( @@ -37,6 +40,8 @@ class ObjectStorageDestinationState( FINALIZED } + @JsonIgnore private val countByKeyLock = Mutex() + companion object { const val METADATA_GENERATION_ID_KEY = "ab-generation-id" const val STREAM_NAMESPACE_KEY = "ab-stream-namespace" @@ -121,7 +126,11 @@ class ObjectStorageDestinationState( /** Used to guarantee the uniqueness of a key */ suspend fun ensureUnique(key: String): String { - val ordinal = countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } ?: 0L + val ordinal = + countByKeyLock.withLock { + countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } + } + ?: 0L return if (ordinal > 0L) { "$key-$ordinal" } else { diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index 5b83c46c6525..3fd620b31e5a 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -6,7 +6,7 @@ plugins { airbyteBulkConnector { core = 'load' toolkits = ['load-s3', 'load-avro'] - cdk = '0.254' + cdk = 'local' } application { diff --git a/airbyte-integrations/connectors/destination-s3/gradle.properties b/airbyte-integrations/connectors/destination-s3/gradle.properties index 07ec789b84c6..530093261232 100644 --- a/airbyte-integrations/connectors/destination-s3/gradle.properties +++ b/airbyte-integrations/connectors/destination-s3/gradle.properties @@ -1,2 +1,2 @@ testExecutionConcurrency=-1 -JunitMethodExecutionTimeout=30 m +JunitMethodExecutionTimeout=35 m diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 04640037c2df..78e246bb1cf0 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.5.0-rc.4 + dockerImageTag: 1.5.0-rc.5 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Checker.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Checker.kt index c445eb199086..46f5cf926a20 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Checker.kt +++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Checker.kt @@ -50,8 +50,7 @@ class S3V2Checker(private val timeProvider: TimeProvider) : log.info { "Successfully wrote test file: $results" } } finally { s3Object?.also { s3Client.delete(it) } - val results = s3Client.list(path).toList() - log.info { "Successfully removed test tile: $results" } + log.info { "Successfully removed test file" } } } } diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 05d5480ac016..1b30d96c0267 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -17,7 +17,7 @@ import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout -@Timeout(30, unit = TimeUnit.MINUTES) +@Timeout(35, unit = TimeUnit.MINUTES) abstract class S3V2WriteTest( path: String, expectedRecordMapper: ExpectedRecordMapper, diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 77525f24e443..0e566ffdfec8 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:-----------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.5.0-rc.5 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: transient failure due to bug in filename clash prevention | | 1.5.0-rc.4 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: StreamLoader::close dispatched multiple times per stream | | 1.5.0-rc.3 | 2025-01-06 | [50949](https://github.com/airbytehq/airbyte/pull/50949) | Bug fix: parquet types/values nested in union of objects do not convert properly | | 1.5.0-rc.2 | 2025-01-02 | [50857](https://github.com/airbytehq/airbyte/pull/50857) | Migrate to Bulk Load CDK: cost reduction, perf increase, bug fix for filename clashes |