Skip to content

Commit

Permalink
[Iceberg] Add support for RBAC (assume aws role) auth (#50957)
Browse files Browse the repository at this point in the history
Co-authored-by: Octavia Squidington III <[email protected]>
Co-authored-by: Edward Gao <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2025
1 parent 3269b52 commit 1ed3bd3
Show file tree
Hide file tree
Showing 14 changed files with 251 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ abstract class IntegrationTest(
/** See [RecordDiffer.nullEqualsUnset]. */
val nullEqualsUnset: Boolean = false,
val configUpdater: ConfigurationUpdater = FakeConfigurationUpdater,
val envVars: Map<String, String> = emptyMap(),
) {
// Intentionally don't inject the actual destination process - we need a full factory
// because some tests want to run multiple syncs, so we need to run the destination
Expand Down Expand Up @@ -130,15 +131,13 @@ abstract class IntegrationTest(
messages: List<InputMessage>,
streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
): List<AirbyteMessage> =
runSync(
configContents,
DestinationCatalog(listOf(stream)),
messages,
streamStatus,
useFileTransfer,
envVars
)

/**
Expand Down Expand Up @@ -173,7 +172,6 @@ abstract class IntegrationTest(
*/
streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
): List<AirbyteMessage> {
val destination =
destinationProcessFactory.createDestinationProcess(
Expand Down Expand Up @@ -217,7 +215,6 @@ abstract class IntegrationTest(
inputStateMessage: StreamCheckpoint,
allowGracefulShutdown: Boolean,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
): AirbyteStateMessage {
val destination =
destinationProcessFactory.createDestinationProcess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ class DockerizedDestination(
val containerName = "$shortImageName-$command-$randomSuffix"
logger.info { "Creating docker container $containerName" }
logger.info { "File transfer ${if (useFileTransfer) "is " else "isn't"} enabled" }
val additionalEnvEntries = envVars.flatMap { (key, value) -> listOf("-e", "$key=$value") }
logger.info { "Env vars: $envVars loaded" }
val additionalEnvEntries =
envVars.flatMap { (key, value) ->
logger.info { "Env vars: $key loaded" }
listOf("-e", "$key=$value")
}

val cmd: MutableList<String> =
(listOf(
"docker",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ class NonDockerizedDestination(
private val file = File("/tmp/test_file")

init {
envVars.forEach { (key, value) -> IntegrationTest.nonDockerMockEnvVars.set(key, value) }
logger.info { "Env vars: $envVars loaded" }
envVars.forEach { (key, value) ->
IntegrationTest.nonDockerMockEnvVars.set(key, value)
logger.info { "Env vars: $key loaded" }
}

if (useFileTransfer) {
IntegrationTest.nonDockerMockEnvVars.set("USE_FILE_TRANSFER", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ abstract class BasicFunctionalityIntegrationTest(
val promoteUnionToObject: Boolean,
val preserveUndeclaredFields: Boolean,
val supportFileTransfer: Boolean,
val envVars: Map<String, String>,
/**
* Whether the destination commits new data when it receives a non-`COMPLETE` stream status. For
* example:
Expand All @@ -130,14 +129,16 @@ abstract class BasicFunctionalityIntegrationTest(
val nullUnknownTypes: Boolean = false,
nullEqualsUnset: Boolean = false,
configUpdater: ConfigurationUpdater = FakeConfigurationUpdater,
envVars: Map<String, String> = emptyMap(),
) :
IntegrationTest(
dataDumper = dataDumper,
destinationCleaner = destinationCleaner,
recordMangler = recordMangler,
nameMapper = nameMapper,
nullEqualsUnset = nullEqualsUnset,
configUpdater = configUpdater
configUpdater = configUpdater,
envVars = envVars,
) {
val parsedConfig =
ValidatedJsonUtils.parseOne(configSpecClass, configUpdater.update(configContents))
Expand Down Expand Up @@ -181,7 +182,6 @@ abstract class BasicFunctionalityIntegrationTest(
sourceRecordCount = 1,
)
),
envVars = envVars,
)

val stateMessages = messages.filter { it.type == AirbyteMessage.Type.STATE }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaDescription
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.load.command.aws.AWSArnRoleConfiguration
import io.airbyte.cdk.load.command.aws.AWSArnRoleConfigurationProvider
import io.airbyte.cdk.load.command.aws.AWSArnRoleSpecification

/**
* Interface defining the specifications for configuring an Iceberg catalog.
Expand Down Expand Up @@ -72,7 +75,10 @@ interface IcebergCatalogSpecifications {
val catalogConfiguration =
when (catalogType) {
is GlueCatalogSpecification ->
GlueCatalogConfiguration((catalogType as GlueCatalogSpecification).glueId)
GlueCatalogConfiguration(
(catalogType as GlueCatalogSpecification).glueId,
(catalogType as GlueCatalogSpecification).toAWSArnRoleConfiguration(),
)
is NessieCatalogSpecification ->
NessieCatalogConfiguration(
(catalogType as NessieCatalogSpecification).serverUri,
Expand Down Expand Up @@ -184,7 +190,8 @@ class GlueCatalogSpecification(
@JsonProperty("glue_id")
@JsonSchemaInject(json = """{"order":1}""")
val glueId: String,
) : CatalogType(catalogType)
override val roleArn: String? = null,
) : CatalogType(catalogType), AWSArnRoleSpecification

/**
* Represents a unified Iceberg catalog configuration.
Expand Down Expand Up @@ -228,8 +235,9 @@ sealed interface CatalogConfiguration
data class GlueCatalogConfiguration(
@JsonSchemaTitle("AWS Account ID")
@JsonPropertyDescription("The AWS Account ID associated with the Glue service.")
val glueId: String
) : CatalogConfiguration
val glueId: String,
override val awsArnRoleConfiguration: AWSArnRoleConfiguration,
) : CatalogConfiguration, AWSArnRoleConfigurationProvider

/**
* Nessie catalog configuration details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,19 @@ data:
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-ICEBERG_V2_S3_GLUE_ASSUME_ROLE_CONFIG
fileName: glue_assume_role.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
- name: SECRET_DESTINATION-ICEBERG_V2_S3_GLUE_ASSUME_ROLE_SYSTEM_AWS_CONFIG
fileName: glue_aws_assume_role.json
secretStore:
type: GSM
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.6
dockerImageTag: 0.2.7
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.apache.iceberg.Schema
class IcebergV2Writer(
private val icebergTableWriterFactory: IcebergTableWriterFactory,
private val icebergConfiguration: IcebergV2Configuration,
private val icebergUtil: IcebergUtil,
private val icebergUtil: IcebergUtil
) : DestinationWriter {

override fun createStreamLoader(stream: DestinationStream): StreamLoader {
Expand Down
Loading

0 comments on commit 1ed3bd3

Please sign in to comment.