Skip to content

Commit

Permalink
revert to using access lock
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Jan 8, 2025
1 parent af7d487 commit bb92632
Showing 1 changed file with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.state.object_storage

import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonProperty
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationStream
Expand All @@ -21,24 +22,27 @@ import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.nio.file.Paths
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
class ObjectStorageDestinationState(
// (State -> (GenerationId -> (Key -> PartNumber)))
@JsonProperty("generations_by_state")
var generationMap: ConcurrentHashMap<State, ConcurrentHashMap<Long, MutableMap<String, Long>>> =
ConcurrentHashMap(),
@JsonProperty("count_by_key")
var countByKey: ConcurrentHashMap<String, AtomicLong> = ConcurrentHashMap()
@JsonProperty("count_by_key") var countByKey: MutableMap<String, Long> = mutableMapOf()
) : DestinationState {
enum class State {
STAGED,
FINALIZED
}

@JsonIgnore
private val countByKeyLock = Mutex()

companion object {
const val METADATA_GENERATION_ID_KEY = "ab-generation-id"
const val STREAM_NAMESPACE_KEY = "ab-stream-namespace"
Expand Down Expand Up @@ -123,10 +127,9 @@ class ObjectStorageDestinationState(

/** Used to guarantee the uniqueness of a key */
suspend fun ensureUnique(key: String): String {
val counter = countByKey.getOrPut(key) { AtomicLong(0L) }
val count = counter.incrementAndGet()
return if (count > 1L) {
"$key-$count"
val ordinal = countByKeyLock.withLock { countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } } ?: 0L
return if (ordinal > 0L) {
"$key-$ordinal"
} else {
key
}
Expand Down Expand Up @@ -210,7 +213,7 @@ class ObjectStorageFallbackPersister(
ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it)
)
),
ConcurrentHashMap(countByKey.mapValues { (_, v) -> AtomicLong(v) })
countByKey
)
}
}
Expand Down

0 comments on commit bb92632

Please sign in to comment.