Skip to content

Commit

Permalink
Destination S3V2: Restore Thread-Safety to Unique Key Counter
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Jan 8, 2025
1 parent b8f4274 commit 83a93e7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.nio.file.Paths
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.toList

Expand All @@ -30,7 +31,8 @@ class ObjectStorageDestinationState(
@JsonProperty("generations_by_state")
var generationMap: ConcurrentHashMap<State, ConcurrentHashMap<Long, MutableMap<String, Long>>> =
ConcurrentHashMap(),
@JsonProperty("count_by_key") var countByKey: MutableMap<String, Long> = mutableMapOf()
@JsonProperty("count_by_key")
var countByKey: ConcurrentHashMap<String, AtomicLong> = ConcurrentHashMap()
) : DestinationState {
enum class State {
STAGED,
Expand Down Expand Up @@ -121,9 +123,10 @@ class ObjectStorageDestinationState(

/** Used to guarantee the uniqueness of a key */
suspend fun ensureUnique(key: String): String {
val ordinal = countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } ?: 0L
return if (ordinal > 0L) {
"$key-$ordinal"
val counter = countByKey.getOrPut(key) { AtomicLong(0L) }
val count = counter.incrementAndGet()
return if (count > 1L) {
"$key-$count"
} else {
key
}
Expand Down Expand Up @@ -207,7 +210,7 @@ class ObjectStorageFallbackPersister(
ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it)
)
),
countByKey
ConcurrentHashMap(countByKey.mapValues { (_, v) -> AtomicLong(v) })
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ class S3V2Checker<T : OutputStream>(private val timeProvider: TimeProvider) :
log.info { "Successfully wrote test file: $results" }
} finally {
s3Object?.also { s3Client.delete(it) }
val results = s3Client.list(path).toList()
log.info { "Successfully removed test tile: $results" }
log.info { "Successfully removed test file" }
}
}
}
Expand Down

0 comments on commit 83a93e7

Please sign in to comment.