Skip to content

Commit

Permalink
Destination S3V2: Restore Thread-Safety to Unique Key Counter
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Jan 9, 2025
1 parent 1ed3bd3 commit 32fb615
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.state.object_storage

import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonProperty
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationStream
Expand All @@ -23,6 +24,8 @@ import java.nio.file.Paths
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
class ObjectStorageDestinationState(
Expand All @@ -37,6 +40,8 @@ class ObjectStorageDestinationState(
FINALIZED
}

@JsonIgnore private val countByKeyLock = Mutex()

companion object {
const val METADATA_GENERATION_ID_KEY = "ab-generation-id"
const val STREAM_NAMESPACE_KEY = "ab-stream-namespace"
Expand Down Expand Up @@ -121,7 +126,11 @@ class ObjectStorageDestinationState(

/** Used to guarantee the uniqueness of a key */
suspend fun ensureUnique(key: String): String {
val ordinal = countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } ?: 0L
val ordinal =
countByKeyLock.withLock {
countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) }
}
?: 0L
return if (ordinal > 0L) {
"$key-$ordinal"
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteBulkConnector {
core = 'load'
toolkits = ['load-s3', 'load-avro']
cdk = '0.254'
cdk = 'local'
}

application {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
testExecutionConcurrency=-1
JunitMethodExecutionTimeout=30 m
JunitMethodExecutionTimeout=35 m
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.0-rc.4
dockerImageTag: 1.5.0-rc.5
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ class S3V2Checker<T : OutputStream>(private val timeProvider: TimeProvider) :
log.info { "Successfully wrote test file: $results" }
} finally {
s3Object?.also { s3Client.delete(it) }
val results = s3Client.list(path).toList()
log.info { "Successfully removed test tile: $results" }
log.info { "Successfully removed test file" }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout

@Timeout(30, unit = TimeUnit.MINUTES)
@Timeout(35, unit = TimeUnit.MINUTES)
abstract class S3V2WriteTest(
path: String,
expectedRecordMapper: ExpectedRecordMapper,
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.5.0-rc.5 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: transient failure due to bug in filename clash prevention |
| 1.5.0-rc.4 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: StreamLoader::close dispatched multiple times per stream |
| 1.5.0-rc.3 | 2025-01-06 | [50949](https://github.com/airbytehq/airbyte/pull/50949) | Bug fix: parquet types/values nested in union of objects do not convert properly |
| 1.5.0-rc.2 | 2025-01-02 | [50857](https://github.com/airbytehq/airbyte/pull/50857) | Migrate to Bulk Load CDK: cost reduction, perf increase, bug fix for filename clashes |
Expand Down

0 comments on commit 32fb615

Please sign in to comment.