From 64fb7c44139e4b0f1f2590f049766da82721ed7b Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Thu, 9 Jan 2025 10:33:14 -0800 Subject: [PATCH] Bulk Load CDK: Obj Storage Kit: Fix: Nested gen map --- .../ObjectStorageDestinationStateManager.kt | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index 2a6dd6e296b08..5e8b1d4fd0981 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -31,7 +31,7 @@ import kotlinx.coroutines.sync.withLock class ObjectStorageDestinationState( // (State -> (GenerationId -> (Key -> PartNumber))) @JsonProperty("generations_by_state") - var generationMap: ConcurrentHashMap>> = + var generationMap: ConcurrentHashMap>> = ConcurrentHashMap(), @JsonProperty("count_by_key") var countByKey: MutableMap = mutableMapOf() ) : DestinationState { @@ -188,13 +188,17 @@ class ObjectStorageFallbackPersister( "Searching path $longestUnambiguous (matching ${matcher.regex}) for destination state metadata" } val matches = client.list(longestUnambiguous).mapNotNull { matcher.match(it.key) }.toList() + + /* Initialize the unique key counts. */ val countByKey = mutableMapOf() matches.forEach { val key = it.path.replace(Regex("-[0-9]+$"), "") val ordinal = it.customSuffix?.substring(1)?.toLongOrNull() ?: 0 countByKey.merge(key, ordinal) { a, b -> maxOf(a, b) } } - matches + + /* Build (generationId -> (key -> fileNumber)). */ + val generationIdToKeyAndFileNumber = ConcurrentHashMap(matches .groupBy { client .getMetadata(it.path)[ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY] @@ -202,24 +206,14 @@ class ObjectStorageFallbackPersister( ?: 0L } .mapValues { (_, matches) -> - matches.associate { it.path to (it.partNumber ?: 0L) }.toMutableMap() - } - .toMutableMap() - .let { - val generationSizes = it.map { gen -> gen.key to gen.value.size } - log.info { - "Inferred state for generations with size: $generationSizes (minimum=${stream.minimumGenerationId}; current=${stream.generationId})" - } - return ObjectStorageDestinationState( - ConcurrentHashMap( - mutableMapOf( - ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it) - ) - ), + ConcurrentHashMap(matches.associate { it.path to (it.partNumber ?: 0L) }) + }) + + return ObjectStorageDestinationState( + ConcurrentHashMap(mapOf(ObjectStorageDestinationState.State.FINALIZED to generationIdToKeyAndFileNumber)), countByKey ) } - } override suspend fun persist(stream: DestinationStream, state: ObjectStorageDestinationState) { // No-op; state is persisted when the generation id is set on the object metadata