From 83a93e7537980d7aafb85e135aede45df82e7c56 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Wed, 8 Jan 2025 13:46:12 -0800 Subject: [PATCH] Destination S3V2: Restore Thread-Safety to Unique Key Counter --- .../ObjectStorageDestinationStateManager.kt | 13 ++++++++----- .../destination-s3/src/main/kotlin/S3V2Checker.kt | 3 +-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index da8b5df5f871..9d0202a804a9 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -21,6 +21,7 @@ import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.nio.file.Paths import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.toList @@ -30,7 +31,8 @@ class ObjectStorageDestinationState( @JsonProperty("generations_by_state") var generationMap: ConcurrentHashMap>> = ConcurrentHashMap(), - @JsonProperty("count_by_key") var countByKey: MutableMap = mutableMapOf() + @JsonProperty("count_by_key") + var countByKey: ConcurrentHashMap = ConcurrentHashMap() ) : DestinationState { enum class State { STAGED, @@ -121,9 +123,10 @@ class ObjectStorageDestinationState( /** Used to guarantee the uniqueness of a key */ suspend fun ensureUnique(key: String): String { - val ordinal = countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } ?: 0L - return if (ordinal > 0L) { - "$key-$ordinal" + val counter = countByKey.getOrPut(key) { AtomicLong(0L) } + val count = counter.incrementAndGet() + return if (count > 1L) { + "$key-$count" } else { key } @@ -207,7 +210,7 @@ class ObjectStorageFallbackPersister( ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it) ) ), - countByKey + ConcurrentHashMap(countByKey.mapValues { (_, v) -> AtomicLong(v) }) ) } } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Checker.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Checker.kt index c445eb199086..46f5cf926a20 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Checker.kt +++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Checker.kt @@ -50,8 +50,7 @@ class S3V2Checker(private val timeProvider: TimeProvider) : log.info { "Successfully wrote test file: $results" } } finally { s3Object?.also { s3Client.delete(it) } - val results = s3Client.list(path).toList() - log.info { "Successfully removed test tile: $results" } + log.info { "Successfully removed test file" } } } }