diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index 2a6dd6e296b0..a8d608b16b59 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -31,7 +31,8 @@ import kotlinx.coroutines.sync.withLock class ObjectStorageDestinationState( // (State -> (GenerationId -> (Key -> PartNumber))) @JsonProperty("generations_by_state") - var generationMap: ConcurrentHashMap>> = + var generationMap: + ConcurrentHashMap>> = ConcurrentHashMap(), @JsonProperty("count_by_key") var countByKey: MutableMap = mutableMapOf() ) : DestinationState { @@ -188,37 +189,39 @@ class ObjectStorageFallbackPersister( "Searching path $longestUnambiguous (matching ${matcher.regex}) for destination state metadata" } val matches = client.list(longestUnambiguous).mapNotNull { matcher.match(it.key) }.toList() + + /* Initialize the unique key counts. */ val countByKey = mutableMapOf() matches.forEach { val key = it.path.replace(Regex("-[0-9]+$"), "") val ordinal = it.customSuffix?.substring(1)?.toLongOrNull() ?: 0 countByKey.merge(key, ordinal) { a, b -> maxOf(a, b) } } - matches - .groupBy { - client - .getMetadata(it.path)[ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY] - ?.toLong() - ?: 0L - } - .mapValues { (_, matches) -> - matches.associate { it.path to (it.partNumber ?: 0L) }.toMutableMap() - } - .toMutableMap() - .let { - val generationSizes = it.map { gen -> gen.key to gen.value.size } - log.info { - "Inferred state for generations with size: $generationSizes (minimum=${stream.minimumGenerationId}; current=${stream.generationId})" - } - return ObjectStorageDestinationState( - ConcurrentHashMap( - mutableMapOf( - ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it) - ) - ), - countByKey + + /* Build (generationId -> (key -> fileNumber)). */ + val generationIdToKeyAndFileNumber = + ConcurrentHashMap( + matches + .groupBy { + client + .getMetadata(it.path)[ + ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY] + ?.toLong() + ?: 0L + } + .mapValues { (_, matches) -> + ConcurrentHashMap(matches.associate { it.path to (it.partNumber ?: 0L) }) + } + ) + + return ObjectStorageDestinationState( + ConcurrentHashMap( + mapOf( + ObjectStorageDestinationState.State.FINALIZED to generationIdToKeyAndFileNumber ) - } + ), + countByKey + ) } override suspend fun persist(stream: DestinationStream, state: ObjectStorageDestinationState) { diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 78e246bb1cf0..b9a9262120a8 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.5.0-rc.5 + dockerImageTag: 1.5.0-rc.6 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 0e566ffdfec8..b1ca0c234240 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:-----------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.5.0-rc.6 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: transient failure due to bug in generation tracker | | 1.5.0-rc.5 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: transient failure due to bug in filename clash prevention | | 1.5.0-rc.4 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: StreamLoader::close dispatched multiple times per stream | | 1.5.0-rc.3 | 2025-01-06 | [50949](https://github.com/airbytehq/airbyte/pull/50949) | Bug fix: parquet types/values nested in union of objects do not convert properly |