Skip to content

Commit

Permalink
Bulk Load CDK: Obj Storage Kit: Fix: Nested gen map (#51015)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Jan 9, 2025
1 parent b23448f commit 4e089d3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import kotlinx.coroutines.sync.withLock
class ObjectStorageDestinationState(
// (State -> (GenerationId -> (Key -> PartNumber)))
@JsonProperty("generations_by_state")
var generationMap: ConcurrentHashMap<State, ConcurrentHashMap<Long, MutableMap<String, Long>>> =
var generationMap:
ConcurrentHashMap<State, ConcurrentHashMap<Long, ConcurrentHashMap<String, Long>>> =
ConcurrentHashMap(),
@JsonProperty("count_by_key") var countByKey: MutableMap<String, Long> = mutableMapOf()
) : DestinationState {
Expand Down Expand Up @@ -188,37 +189,39 @@ class ObjectStorageFallbackPersister(
"Searching path $longestUnambiguous (matching ${matcher.regex}) for destination state metadata"
}
val matches = client.list(longestUnambiguous).mapNotNull { matcher.match(it.key) }.toList()

/* Initialize the unique key counts. */
val countByKey = mutableMapOf<String, Long>()
matches.forEach {
val key = it.path.replace(Regex("-[0-9]+$"), "")
val ordinal = it.customSuffix?.substring(1)?.toLongOrNull() ?: 0
countByKey.merge(key, ordinal) { a, b -> maxOf(a, b) }
}
matches
.groupBy {
client
.getMetadata(it.path)[ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY]
?.toLong()
?: 0L
}
.mapValues { (_, matches) ->
matches.associate { it.path to (it.partNumber ?: 0L) }.toMutableMap()
}
.toMutableMap()
.let {
val generationSizes = it.map { gen -> gen.key to gen.value.size }
log.info {
"Inferred state for generations with size: $generationSizes (minimum=${stream.minimumGenerationId}; current=${stream.generationId})"
}
return ObjectStorageDestinationState(
ConcurrentHashMap(
mutableMapOf(
ObjectStorageDestinationState.State.FINALIZED to ConcurrentHashMap(it)
)
),
countByKey

/* Build (generationId -> (key -> fileNumber)). */
val generationIdToKeyAndFileNumber =
ConcurrentHashMap(
matches
.groupBy {
client
.getMetadata(it.path)[
ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY]
?.toLong()
?: 0L
}
.mapValues { (_, matches) ->
ConcurrentHashMap(matches.associate { it.path to (it.partNumber ?: 0L) })
}
)

return ObjectStorageDestinationState(
ConcurrentHashMap(
mapOf(
ObjectStorageDestinationState.State.FINALIZED to generationIdToKeyAndFileNumber
)
}
),
countByKey
)
}

override suspend fun persist(stream: DestinationStream, state: ObjectStorageDestinationState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.0-rc.5
dockerImageTag: 1.5.0-rc.6
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.5.0-rc.6 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: transient failure due to bug in generation tracker |
| 1.5.0-rc.5 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: transient failure due to bug in filename clash prevention |
| 1.5.0-rc.4 | 2025-01-06 | [50954](https://github.com/airbytehq/airbyte/pull/50954) | Bug fix: StreamLoader::close dispatched multiple times per stream |
| 1.5.0-rc.3 | 2025-01-06 | [50949](https://github.com/airbytehq/airbyte/pull/50949) | Bug fix: parquet types/values nested in union of objects do not convert properly |
Expand Down

0 comments on commit 4e089d3

Please sign in to comment.