Skip to content

Commit

Permalink
Bulk Load CDK: Obj Storage Kit: Fix: Nested gen map
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Jan 9, 2025
1 parent e8bf09d commit af06723
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import kotlinx.coroutines.sync.withLock
class ObjectStorageDestinationState(
// (State -> (GenerationId -> (Key -> PartNumber)))
@JsonProperty("generations_by_state")
var generationMap: ConcurrentHashMap<State, ConcurrentHashMap<Long, MutableMap<String, Long>>> =
var generationMap:
ConcurrentHashMap<State, ConcurrentHashMap<Long, ConcurrentHashMap<String, Long>>> =
ConcurrentHashMap(),
@JsonProperty("count_by_key") var countByKey: MutableMap<String, Long> = mutableMapOf()
) : DestinationState {
Expand Down Expand Up @@ -188,37 +189,39 @@ class ObjectStorageFallbackPersister(
"Searching path $longestUnambiguous (matching ${matcher.regex}) for destination state metadata"
}
val matches = client.list(longestUnambiguous).mapNotNull { matcher.match(it.key) }.toList()

/* Initialize the unique key counts. */
val countByKey = mutableMapOf<String, Long>()
matches.forEach {
val key = it.path.replace(Regex("-[0-9]+$"), "")
val ordinal = it.customSuffix?.substring(1)?.toLongOrNull() ?: 0
countByKey.merge(key, ordinal) { a, b -> maxOf(a, b) }
}
matches
.groupBy {
client
.getMetadata(it.path)[ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY]
?.toLong()
?: 0L
}
.mapValues { (_, matches) ->
matches.associate { it.path to (it.partNumber ?: 0L) }.toMutableMap()
}
.toMutableMap()
.let {
val generationSizes = it.map { gen -> gen.key to gen.value.size }
log.info {
"Inferred state for generations with size: $generationSizes (minimum=${stream.minimumGenerationId}; current=${stream.generationId})"
}
return ObjectStorageDestinationState(
ConcurrentHashMap(
mutableMapOf(
ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it)
)
),
countByKey

/* Build (generationId -> (key -> fileNumber)). */
val generationIdToKeyAndFileNumber =
ConcurrentHashMap(
matches
.groupBy {
client
.getMetadata(it.path)[
ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY]
?.toLong()
?: 0L
}
.mapValues { (_, matches) ->
ConcurrentHashMap(matches.associate { it.path to (it.partNumber ?: 0L) })
}
)

return ObjectStorageDestinationState(
ConcurrentHashMap(
mapOf(
ObjectStorageDestinationState.State.FINALIZED to generationIdToKeyAndFileNumber
)
}
),
countByKey
)
}

override suspend fun persist(stream: DestinationStream, state: ObjectStorageDestinationState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.0-rc.5
dockerImageTag: 1.5.0-rc.6
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down

0 comments on commit af06723

Please sign in to comment.