From 7fd6556a30e005db8be16c0fdec7ed05d5fea5de Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Wed, 8 Jan 2025 14:55:30 -0800 Subject: [PATCH] revert to using access lock --- .../ObjectStorageDestinationStateManager.kt | 22 ++++++++++++------- .../connectors/destination-s3/build.gradle | 2 +- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index 9d0202a804a9..2a6dd6e296b0 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.load.state.object_storage +import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.annotation.JsonProperty import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream @@ -21,9 +22,10 @@ import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.nio.file.Paths import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicLong import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") class ObjectStorageDestinationState( @@ -31,14 +33,15 @@ class ObjectStorageDestinationState( @JsonProperty("generations_by_state") var generationMap: ConcurrentHashMap>> = ConcurrentHashMap(), - @JsonProperty("count_by_key") - var countByKey: ConcurrentHashMap = ConcurrentHashMap() + @JsonProperty("count_by_key") var countByKey: MutableMap = mutableMapOf() ) : DestinationState { enum class State { STAGED, FINALIZED } + @JsonIgnore private val countByKeyLock = Mutex() + companion object { const val METADATA_GENERATION_ID_KEY = "ab-generation-id" const val STREAM_NAMESPACE_KEY = "ab-stream-namespace" @@ -123,10 +126,13 @@ class ObjectStorageDestinationState( /** Used to guarantee the uniqueness of a key */ suspend fun ensureUnique(key: String): String { - val counter = countByKey.getOrPut(key) { AtomicLong(0L) } - val count = counter.incrementAndGet() - return if (count > 1L) { - "$key-$count" + val ordinal = + countByKeyLock.withLock { + countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } + } + ?: 0L + return if (ordinal > 0L) { + "$key-$ordinal" } else { key } @@ -210,7 +216,7 @@ class ObjectStorageFallbackPersister( ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it) ) ), - ConcurrentHashMap(countByKey.mapValues { (_, v) -> AtomicLong(v) }) + countByKey ) } } diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index 5b83c46c6525..3fd620b31e5a 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -6,7 +6,7 @@ plugins { airbyteBulkConnector { core = 'load' toolkits = ['load-s3', 'load-avro'] - cdk = '0.254' + cdk = 'local' } application {