Skip to content

Commit

Permalink
Bulk Load CDK: Obj Storage Kit: Fix: Nested gen map
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Jan 9, 2025
1 parent e8bf09d commit 64fb7c4
Showing 1 changed file with 11 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import kotlinx.coroutines.sync.withLock
class ObjectStorageDestinationState(
// (State -> (GenerationId -> (Key -> PartNumber)))
@JsonProperty("generations_by_state")
var generationMap: ConcurrentHashMap<State, ConcurrentHashMap<Long, MutableMap<String, Long>>> =
var generationMap: ConcurrentHashMap<State, ConcurrentHashMap<Long, ConcurrentHashMap<String, Long>>> =
ConcurrentHashMap(),
@JsonProperty("count_by_key") var countByKey: MutableMap<String, Long> = mutableMapOf()
) : DestinationState {
Expand Down Expand Up @@ -188,38 +188,32 @@ class ObjectStorageFallbackPersister(
"Searching path $longestUnambiguous (matching ${matcher.regex}) for destination state metadata"
}
val matches = client.list(longestUnambiguous).mapNotNull { matcher.match(it.key) }.toList()

/* Initialize the unique key counts. */
val countByKey = mutableMapOf<String, Long>()
matches.forEach {
val key = it.path.replace(Regex("-[0-9]+$"), "")
val ordinal = it.customSuffix?.substring(1)?.toLongOrNull() ?: 0
countByKey.merge(key, ordinal) { a, b -> maxOf(a, b) }
}
matches

/* Build (generationId -> (key -> fileNumber)). */
val generationIdToKeyAndFileNumber = ConcurrentHashMap(matches
.groupBy {
client
.getMetadata(it.path)[ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY]
?.toLong()
?: 0L
}
.mapValues { (_, matches) ->
matches.associate { it.path to (it.partNumber ?: 0L) }.toMutableMap()
}
.toMutableMap()
.let {
val generationSizes = it.map { gen -> gen.key to gen.value.size }
log.info {
"Inferred state for generations with size: $generationSizes (minimum=${stream.minimumGenerationId}; current=${stream.generationId})"
}
return ObjectStorageDestinationState(
ConcurrentHashMap(
mutableMapOf(
ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it)
)
),
ConcurrentHashMap(matches.associate { it.path to (it.partNumber ?: 0L) })
})

return ObjectStorageDestinationState(
ConcurrentHashMap(mapOf(ObjectStorageDestinationState.State.FINALIZED to generationIdToKeyAndFileNumber)),
countByKey
)
}
}

override suspend fun persist(stream: DestinationStream, state: ObjectStorageDestinationState) {
// No-op; state is persisted when the generation id is set on the object metadata
Expand Down

0 comments on commit 64fb7c4

Please sign in to comment.