From af06723f3c0bac73c9e688d9a2721a832f6ede6a Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Thu, 9 Jan 2025 10:33:14 -0800 Subject: [PATCH] Bulk Load CDK: Obj Storage Kit: Fix: Nested gen map --- .../ObjectStorageDestinationStateManager.kt | 53 ++++++++++--------- .../connectors/destination-s3/metadata.yaml | 2 +- 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index 2a6dd6e296b0..a8d608b16b59 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -31,7 +31,8 @@ import kotlinx.coroutines.sync.withLock class ObjectStorageDestinationState( // (State -> (GenerationId -> (Key -> PartNumber))) @JsonProperty("generations_by_state") - var generationMap: ConcurrentHashMap>> = + var generationMap: + ConcurrentHashMap>> = ConcurrentHashMap(), @JsonProperty("count_by_key") var countByKey: MutableMap = mutableMapOf() ) : DestinationState { @@ -188,37 +189,39 @@ class ObjectStorageFallbackPersister( "Searching path $longestUnambiguous (matching ${matcher.regex}) for destination state metadata" } val matches = client.list(longestUnambiguous).mapNotNull { matcher.match(it.key) }.toList() + + /* Initialize the unique key counts. */ val countByKey = mutableMapOf() matches.forEach { val key = it.path.replace(Regex("-[0-9]+$"), "") val ordinal = it.customSuffix?.substring(1)?.toLongOrNull() ?: 0 countByKey.merge(key, ordinal) { a, b -> maxOf(a, b) } } - matches - .groupBy { - client - .getMetadata(it.path)[ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY] - ?.toLong() - ?: 0L - } - .mapValues { (_, matches) -> - matches.associate { it.path to (it.partNumber ?: 0L) }.toMutableMap() - } - .toMutableMap() - .let { - val generationSizes = it.map { gen -> gen.key to gen.value.size } - log.info { - "Inferred state for generations with size: $generationSizes (minimum=${stream.minimumGenerationId}; current=${stream.generationId})" - } - return ObjectStorageDestinationState( - ConcurrentHashMap( - mutableMapOf( - ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it) - ) - ), - countByKey + + /* Build (generationId -> (key -> fileNumber)). */ + val generationIdToKeyAndFileNumber = + ConcurrentHashMap( + matches + .groupBy { + client + .getMetadata(it.path)[ + ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY] + ?.toLong() + ?: 0L + } + .mapValues { (_, matches) -> + ConcurrentHashMap(matches.associate { it.path to (it.partNumber ?: 0L) }) + } + ) + + return ObjectStorageDestinationState( + ConcurrentHashMap( + mapOf( + ObjectStorageDestinationState.State.FINALIZED to generationIdToKeyAndFileNumber ) - } + ), + countByKey + ) } override suspend fun persist(stream: DestinationStream, state: ObjectStorageDestinationState) { diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 78e246bb1cf0..b9a9262120a8 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.5.0-rc.5 + dockerImageTag: 1.5.0-rc.6 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg