diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index e5482dee7874..aa8644d94007 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -56,6 +56,7 @@ abstract class IntegrationTest( /** See [RecordDiffer.nullEqualsUnset]. */ val nullEqualsUnset: Boolean = false, val configUpdater: ConfigurationUpdater = FakeConfigurationUpdater, + val envVars: Map = emptyMap(), ) { // Intentionally don't inject the actual destination process - we need a full factory // because some tests want to run multiple syncs, so we need to run the destination @@ -130,7 +131,6 @@ abstract class IntegrationTest( messages: List, streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE, useFileTransfer: Boolean = false, - envVars: Map = emptyMap(), ): List = runSync( configContents, @@ -138,7 +138,6 @@ abstract class IntegrationTest( messages, streamStatus, useFileTransfer, - envVars ) /** @@ -173,7 +172,6 @@ abstract class IntegrationTest( */ streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE, useFileTransfer: Boolean = false, - envVars: Map = emptyMap(), ): List { val destination = destinationProcessFactory.createDestinationProcess( @@ -217,7 +215,6 @@ abstract class IntegrationTest( inputStateMessage: StreamCheckpoint, allowGracefulShutdown: Boolean, useFileTransfer: Boolean = false, - envVars: Map = emptyMap(), ): AirbyteStateMessage { val destination = destinationProcessFactory.createDestinationProcess( diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt index a5d1e18e3fca..e81578bc4486 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt @@ -95,8 +95,12 @@ class DockerizedDestination( val containerName = "$shortImageName-$command-$randomSuffix" logger.info { "Creating docker container $containerName" } logger.info { "File transfer ${if (useFileTransfer) "is " else "isn't"} enabled" } - val additionalEnvEntries = envVars.flatMap { (key, value) -> listOf("-e", "$key=$value") } - logger.info { "Env vars: $envVars loaded" } + val additionalEnvEntries = + envVars.flatMap { (key, value) -> + logger.info { "Env vars: $key loaded" } + listOf("-e", "$key=$value") + } + val cmd: MutableList = (listOf( "docker", diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt index 7c2050855dfc..b229ebf827a1 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt @@ -45,8 +45,10 @@ class NonDockerizedDestination( private val file = File("/tmp/test_file") init { - envVars.forEach { (key, value) -> IntegrationTest.nonDockerMockEnvVars.set(key, value) } - logger.info { "Env vars: $envVars loaded" } + envVars.forEach { (key, value) -> + IntegrationTest.nonDockerMockEnvVars.set(key, value) + logger.info { "Env vars: $key loaded" } + } if (useFileTransfer) { IntegrationTest.nonDockerMockEnvVars.set("USE_FILE_TRANSFER", "true") diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 1625286f47be..00120ffc62b4 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -115,7 +115,6 @@ abstract class BasicFunctionalityIntegrationTest( val promoteUnionToObject: Boolean, val preserveUndeclaredFields: Boolean, val supportFileTransfer: Boolean, - val envVars: Map, /** * Whether the destination commits new data when it receives a non-`COMPLETE` stream status. For * example: @@ -130,6 +129,7 @@ abstract class BasicFunctionalityIntegrationTest( val nullUnknownTypes: Boolean = false, nullEqualsUnset: Boolean = false, configUpdater: ConfigurationUpdater = FakeConfigurationUpdater, + envVars: Map = emptyMap(), ) : IntegrationTest( dataDumper = dataDumper, @@ -137,7 +137,8 @@ abstract class BasicFunctionalityIntegrationTest( recordMangler = recordMangler, nameMapper = nameMapper, nullEqualsUnset = nullEqualsUnset, - configUpdater = configUpdater + configUpdater = configUpdater, + envVars = envVars, ) { val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configUpdater.update(configContents)) @@ -181,7 +182,6 @@ abstract class BasicFunctionalityIntegrationTest( sourceRecordCount = 1, ) ), - envVars = envVars, ) val stateMessages = messages.filter { it.type == AirbyteMessage.Type.STATE } diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt index 6c93e70d1e6e..35aaca87ed72 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/command/iceberg/parquet/IcebergCatalogSpecifications.kt @@ -12,6 +12,9 @@ import com.fasterxml.jackson.annotation.JsonValue import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaDescription import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import io.airbyte.cdk.load.command.aws.AWSArnRoleConfiguration +import io.airbyte.cdk.load.command.aws.AWSArnRoleConfigurationProvider +import io.airbyte.cdk.load.command.aws.AWSArnRoleSpecification /** * Interface defining the specifications for configuring an Iceberg catalog. @@ -72,7 +75,10 @@ interface IcebergCatalogSpecifications { val catalogConfiguration = when (catalogType) { is GlueCatalogSpecification -> - GlueCatalogConfiguration((catalogType as GlueCatalogSpecification).glueId) + GlueCatalogConfiguration( + (catalogType as GlueCatalogSpecification).glueId, + (catalogType as GlueCatalogSpecification).toAWSArnRoleConfiguration(), + ) is NessieCatalogSpecification -> NessieCatalogConfiguration( (catalogType as NessieCatalogSpecification).serverUri, @@ -184,7 +190,8 @@ class GlueCatalogSpecification( @JsonProperty("glue_id") @JsonSchemaInject(json = """{"order":1}""") val glueId: String, -) : CatalogType(catalogType) + override val roleArn: String? = null, +) : CatalogType(catalogType), AWSArnRoleSpecification /** * Represents a unified Iceberg catalog configuration. @@ -228,8 +235,9 @@ sealed interface CatalogConfiguration data class GlueCatalogConfiguration( @JsonSchemaTitle("AWS Account ID") @JsonPropertyDescription("The AWS Account ID associated with the Glue service.") - val glueId: String -) : CatalogConfiguration + val glueId: String, + override val awsArnRoleConfiguration: AWSArnRoleConfiguration, +) : CatalogConfiguration, AWSArnRoleConfigurationProvider /** * Nessie catalog configuration details. diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml index ab30f992fce9..d25b6772b001 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml @@ -14,9 +14,19 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-ICEBERG_V2_S3_GLUE_ASSUME_ROLE_CONFIG + fileName: glue_assume_role.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-ICEBERG_V2_S3_GLUE_ASSUME_ROLE_SYSTEM_AWS_CONFIG + fileName: glue_aws_assume_role.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store connectorType: destination definitionId: 716ca874-520b-4902-9f80-9fad66754b89 - dockerImageTag: 0.2.6 + dockerImageTag: 0.2.7 dockerRepository: airbyte/destination-s3-data-lake documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake githubIssueLabel: destination-s3-data-lake diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Writer.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Writer.kt index 84619536cbeb..20a83f29937b 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Writer.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Writer.kt @@ -17,7 +17,7 @@ import org.apache.iceberg.Schema class IcebergV2Writer( private val icebergTableWriterFactory: IcebergTableWriterFactory, private val icebergConfiguration: IcebergV2Configuration, - private val icebergUtil: IcebergUtil, + private val icebergUtil: IcebergUtil ) : DestinationWriter { override fun createStreamLoader(stream: DestinationStream): StreamLoader { diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index c3b3d627f827..7269614f1855 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -4,10 +4,12 @@ package io.airbyte.integrations.destination.iceberg.v2.io +import com.fasterxml.jackson.annotation.JsonProperty import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.command.ImportType import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration +import io.airbyte.cdk.load.command.iceberg.parquet.IcebergCatalogConfiguration import io.airbyte.cdk.load.command.iceberg.parquet.NessieCatalogConfiguration import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.data.NullValue @@ -23,12 +25,11 @@ import io.airbyte.integrations.destination.iceberg.v2.SECRET_ACCESS_KEY import io.airbyte.integrations.destination.iceberg.v2.TableIdGenerator import io.github.oshai.kotlinlogging.KotlinLogging import jakarta.inject.Singleton +import java.time.Duration import org.apache.hadoop.conf.Configuration import org.apache.iceberg.CatalogProperties import org.apache.iceberg.CatalogProperties.URI -import org.apache.iceberg.CatalogProperties.WAREHOUSE_LOCATION import org.apache.iceberg.CatalogUtil -import org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE import org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_GLUE import org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_NESSIE import org.apache.iceberg.FileFormat @@ -36,6 +37,7 @@ import org.apache.iceberg.Schema import org.apache.iceberg.SortOrder import org.apache.iceberg.Table import org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT +import org.apache.iceberg.aws.AssumeRoleAwsClientFactory import org.apache.iceberg.aws.AwsClientProperties import org.apache.iceberg.aws.AwsProperties import org.apache.iceberg.aws.s3.S3FileIO @@ -50,10 +52,27 @@ import software.amazon.awssdk.services.glue.model.ConcurrentModificationExceptio private val logger = KotlinLogging.logger {} const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at" +const val EXTERNAL_ID = "AWS_ASSUME_ROLE_EXTERNAL_ID" +const val AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" +const val AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY" -/** Collection of Iceberg related utilities. */ +data class AWSSystemCredentials( + @get:JsonProperty("AWS_ACCESS_KEY_ID") val AWS_ACCESS_KEY_ID: String, + @get:JsonProperty("AWS_SECRET_ACCESS_KEY") val AWS_SECRET_ACCESS_KEY: String, + @get:JsonProperty("AWS_ASSUME_ROLE_EXTERNAL_ID") val AWS_ASSUME_ROLE_EXTERNAL_ID: String +) + +/** + * Collection of Iceberg related utilities. + * @param awsSystemCredentials is a temporary fix to allow us to run the integrations tests. This + * will be removed when we change all of this to use Micronaut + */ @Singleton -class IcebergUtil(private val tableIdGenerator: TableIdGenerator) { +class IcebergUtil( + private val tableIdGenerator: TableIdGenerator, + val awsSystemCredentials: AWSSystemCredentials? = null +) { + internal class InvalidFormatException(message: String) : Exception(message) private val generationIdRegex = Regex("""ab-generation-id-\d+-e""") @@ -186,78 +205,160 @@ class IcebergUtil(private val tableIdGenerator: TableIdGenerator) { fun toCatalogProperties(config: IcebergV2Configuration): Map { val icebergCatalogConfig = config.icebergCatalogConfiguration val catalogConfig = icebergCatalogConfig.catalogConfiguration + val region = config.s3BucketConfiguration.s3BucketRegion.region + + // Build base S3 properties + val s3Properties = buildS3Properties(config, icebergCatalogConfig) + + // Set AWS region as system property + System.setProperty("aws.region", region) + + return when (catalogConfig) { + is NessieCatalogConfiguration -> + buildNessieProperties(config, catalogConfig, s3Properties) + is GlueCatalogConfiguration -> + buildGlueProperties(config, catalogConfig, icebergCatalogConfig) + else -> + throw IllegalArgumentException( + "Unsupported catalog type: ${catalogConfig::class.java.name}" + ) + } + } + + private fun buildS3Properties( + config: IcebergV2Configuration, + icebergCatalogConfig: IcebergCatalogConfiguration + ): Map { + return buildMap { + put(CatalogProperties.FILE_IO_IMPL, S3FileIO::class.java.name) + put(S3FileIOProperties.PATH_STYLE_ACCESS, "true") + put(CatalogProperties.WAREHOUSE_LOCATION, icebergCatalogConfig.warehouseLocation) + + // Add optional S3 endpoint if provided + config.s3BucketConfiguration.s3Endpoint?.let { endpoint -> + put(S3FileIOProperties.ENDPOINT, endpoint) + } + } + } + + private fun buildNessieProperties( + config: IcebergV2Configuration, + catalogConfig: NessieCatalogConfiguration, + s3Properties: Map + ): Map { val awsAccessKeyId = requireNotNull(config.awsAccessKeyConfiguration.accessKeyId) { - "AWS Access Key ID cannot be null" + "AWS Access Key ID is required for Nessie configuration" } val awsSecretAccessKey = requireNotNull(config.awsAccessKeyConfiguration.secretAccessKey) { - "AWS Secret Access Key cannot be null" + "AWS Secret Access Key is required for Nessie configuration" } - // Common S3/Iceberg properties shared across all catalog types. - // The S3 endpoint is optional; if provided, it will be included. - val s3CommonProperties = - mutableMapOf( - CatalogProperties.FILE_IO_IMPL to S3FileIO::class.java.name, - S3FileIOProperties.ACCESS_KEY_ID to awsAccessKeyId, - S3FileIOProperties.SECRET_ACCESS_KEY to awsSecretAccessKey, - // Required for MinIO or other S3-compatible stores using path-style access. - S3FileIOProperties.PATH_STYLE_ACCESS to "true" + val nessieProperties = buildMap { + put(CatalogUtil.ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_NESSIE) + put(URI, catalogConfig.serverUri) + put( + NessieConfigConstants.CONF_NESSIE_REF, + config.icebergCatalogConfiguration.mainBranchName + ) + put(S3FileIOProperties.ACCESS_KEY_ID, awsAccessKeyId) + put(S3FileIOProperties.SECRET_ACCESS_KEY, awsSecretAccessKey) + + // Add optional Nessie authentication if provided + catalogConfig.accessToken?.let { token -> + put(NessieConfigConstants.CONF_NESSIE_AUTH_TYPE, "BEARER") + put(NessieConfigConstants.CONF_NESSIE_AUTH_TOKEN, token) + } + } + + return nessieProperties + s3Properties + } + + private fun buildGlueProperties( + config: IcebergV2Configuration, + catalogConfig: GlueCatalogConfiguration, + icebergCatalogConfig: IcebergCatalogConfiguration + ): Map { + val baseGlueProperties = + mapOf( + CatalogUtil.ICEBERG_CATALOG_TYPE to ICEBERG_CATALOG_TYPE_GLUE, + CatalogProperties.WAREHOUSE_LOCATION to icebergCatalogConfig.warehouseLocation, + AwsProperties.GLUE_CATALOG_ID to catalogConfig.glueId + ) + + val clientProperties = + if (catalogConfig.awsArnRoleConfiguration.roleArn != null) { + buildRoleBasedClientProperties( + catalogConfig.awsArnRoleConfiguration.roleArn!!, + config ) - .apply { - config.s3BucketConfiguration.s3Endpoint?.let { endpoint -> - this[S3FileIOProperties.ENDPOINT] = endpoint - } - } + } else { + buildKeyBasedClientProperties(config) + } - return when (catalogConfig) { - is NessieCatalogConfiguration -> { - // Nessie relies on the AWS region being set as a system property. - System.setProperty("aws.region", config.s3BucketConfiguration.s3BucketRegion.region) - - val nessieProperties = - mutableMapOf( - ICEBERG_CATALOG_TYPE to ICEBERG_CATALOG_TYPE_NESSIE, - URI to catalogConfig.serverUri, - NessieConfigConstants.CONF_NESSIE_REF to - icebergCatalogConfig.mainBranchName, - WAREHOUSE_LOCATION to icebergCatalogConfig.warehouseLocation, - ) - - // Add optional Nessie auth token if provided. - catalogConfig.accessToken?.let { token -> - nessieProperties[NessieConfigConstants.CONF_NESSIE_AUTH_TYPE] = "BEARER" - nessieProperties[NessieConfigConstants.CONF_NESSIE_AUTH_TOKEN] = token - } + return baseGlueProperties + clientProperties + } - nessieProperties + s3CommonProperties + private fun buildRoleBasedClientProperties( + roleArn: String, + config: IcebergV2Configuration + ): Map { + val region = config.s3BucketConfiguration.s3BucketRegion.region + val (accessKeyId, secretAccessKey, externalId) = + if (awsSystemCredentials != null) { + Triple( + awsSystemCredentials.AWS_ACCESS_KEY_ID, + awsSystemCredentials.AWS_SECRET_ACCESS_KEY, + awsSystemCredentials.AWS_ASSUME_ROLE_EXTERNAL_ID + ) + } else { + Triple( + System.getenv(AWS_ACCESS_KEY_ID), + System.getenv(AWS_SECRET_ACCESS_KEY), + System.getenv(EXTERNAL_ID) + ) } - is GlueCatalogConfiguration -> { - val clientCredentialsProviderPrefix = - AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER + "." - - val glueProperties = - mapOf( - ICEBERG_CATALOG_TYPE to ICEBERG_CATALOG_TYPE_GLUE, - WAREHOUSE_LOCATION to icebergCatalogConfig.warehouseLocation, - AwsProperties.GLUE_CATALOG_ID to catalogConfig.glueId, - AwsClientProperties.CLIENT_REGION to - config.s3BucketConfiguration.s3BucketRegion.region, - AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER to - GlueCredentialsProvider::class.java.name, - "${clientCredentialsProviderPrefix}${ACCESS_KEY_ID}" to awsAccessKeyId, - "${clientCredentialsProviderPrefix}${SECRET_ACCESS_KEY}" to - awsSecretAccessKey - ) - - glueProperties + s3CommonProperties + + // The AssumeRoleAwsClientFactory doesn't respect the access / secret key properties from + // the map. Instead, it always uses the default AWS cred provider chain. + // So we need to manually set the secrets as system properties here. + System.setProperty("aws.region", region) + System.setProperty("aws.accessKeyId", accessKeyId) + System.setProperty("aws.secretAccessKey", secretAccessKey) + + return mapOf( + AwsProperties.REST_ACCESS_KEY_ID to accessKeyId, + AwsProperties.REST_SECRET_ACCESS_KEY to secretAccessKey, + AwsProperties.CLIENT_FACTORY to AssumeRoleAwsClientFactory::class.java.name, + AwsProperties.CLIENT_ASSUME_ROLE_ARN to roleArn, + AwsProperties.CLIENT_ASSUME_ROLE_REGION to region, + AwsProperties.CLIENT_ASSUME_ROLE_TIMEOUT_SEC to + Duration.ofHours(1).toSeconds().toString(), + AwsProperties.CLIENT_ASSUME_ROLE_EXTERNAL_ID to externalId + ) + } + + private fun buildKeyBasedClientProperties(config: IcebergV2Configuration): Map { + val awsAccessKeyId = + requireNotNull(config.awsAccessKeyConfiguration.accessKeyId) { + "AWS Access Key ID is required for key-based authentication" } - else -> - throw IllegalArgumentException( - "Unknown catalog type: ${catalogConfig::class.java.name}" - ) - } + val awsSecretAccessKey = + requireNotNull(config.awsAccessKeyConfiguration.secretAccessKey) { + "AWS Secret Access Key is required for key-based authentication" + } + val clientCredentialsProviderPrefix = "${AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER}." + + return mapOf( + S3FileIOProperties.ACCESS_KEY_ID to awsAccessKeyId, + S3FileIOProperties.SECRET_ACCESS_KEY to awsSecretAccessKey, + AwsClientProperties.CLIENT_REGION to config.s3BucketConfiguration.s3BucketRegion.region, + AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER to + GlueCredentialsProvider::class.java.name, + "${clientCredentialsProviderPrefix}${ACCESS_KEY_ID}" to awsAccessKeyId, + "${clientCredentialsProviderPrefix}${SECRET_ACCESS_KEY}" to awsSecretAccessKey + ) } fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema { diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt index 012533314939..bdedea419fa5 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2DataDumper.kt @@ -72,7 +72,8 @@ object IcebergV2DataDumper : DestinationDataDumper { stream: DestinationStream ): List { val config = IcebergV2TestUtil.getConfig(spec) - val catalog = IcebergV2TestUtil.getCatalog(config) + val catalog = + IcebergV2TestUtil.getCatalog(config, IcebergV2TestUtil.getAWSSystemCredentials()) val table = catalog.loadTable( TableIdGeneratorFactory(config).create().toTableIdentifier(stream.descriptor) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt index 2da1841dbf0c..ad0fbe4b2157 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2TestUtil.kt @@ -4,25 +4,41 @@ package io.airbyte.integrations.destination.iceberg.v2 +import com.fasterxml.jackson.core.type.TypeReference import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ValidatedJsonUtils +import io.airbyte.cdk.load.util.Jsons +import io.airbyte.integrations.destination.iceberg.v2.io.AWSSystemCredentials import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import java.nio.file.Files import java.nio.file.Path object IcebergV2TestUtil { val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") + val GLUE_ASSUME_ROLE_CONFIG_PATH: Path = Path.of("secrets/glue_assume_role.json") + private val GLUE_AWS_ASSUME_ROLE_CONFIG_PATH: Path = + Path.of("secrets/glue_aws_assume_role.json") fun parseConfig(path: Path) = getConfig( ValidatedJsonUtils.parseOne(IcebergV2Specification::class.java, Files.readString(path)) ) + fun getAWSSystemCredentials(): AWSSystemCredentials { + val configFile = GLUE_AWS_ASSUME_ROLE_CONFIG_PATH.toFile() + return Jsons.readValue(configFile, AWSSystemCredentials::class.java) + } + + fun getAWSSystemCredentialsAsMap(): Map { + val credentials = getAWSSystemCredentials() + return Jsons.convertValue(credentials, object : TypeReference>() {}) + } + fun getConfig(spec: ConfigurationSpecification) = IcebergV2ConfigurationFactory().makeWithoutExceptionHandling(spec as IcebergV2Specification) - fun getCatalog(config: IcebergV2Configuration) = - IcebergUtil(SimpleTableIdGenerator()).let { icebergUtil -> + fun getCatalog(config: IcebergV2Configuration, awsSystemCredentials: AWSSystemCredentials) = + IcebergUtil(SimpleTableIdGenerator(), awsSystemCredentials).let { icebergUtil -> val props = icebergUtil.toCatalogProperties(config) icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 21083abca4fa..2fdfd0609574 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -88,16 +88,23 @@ class IcebergGlueWriteTest : Files.readString(IcebergV2TestUtil.GLUE_CONFIG_PATH), IcebergDestinationCleaner( IcebergV2TestUtil.getCatalog( - IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH) + IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH), + IcebergV2TestUtil.getAWSSystemCredentials() ) ) - ) { + ) - @Test - override fun testBasicWrite() { - super.testBasicWrite() - } -} +class IcebergGlueAssumeRoleWriteTest : + IcebergV2WriteTest( + Files.readString(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), + IcebergDestinationCleaner( + IcebergV2TestUtil.getCatalog( + IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_ASSUME_ROLE_CONFIG_PATH), + IcebergV2TestUtil.getAWSSystemCredentials() + ) + ), + IcebergV2TestUtil.getAWSSystemCredentialsAsMap() + ) @Disabled( "This is currently disabled until we are able to make it run via airbyte-ci. It works as expected locally" diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-cloud.json index 10b76d8d8680..fb401399b204 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-cloud.json +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-cloud.json @@ -103,6 +103,11 @@ "description" : "The AWS Account ID associated with the Glue service used by the Iceberg catalog.", "title" : "AWS Account ID", "order" : 1 + }, + "role_arn" : { + "type" : "string", + "description" : "The Role ARN.", + "title" : "Role ARN" } }, "required" : [ "catalog_type", "glue_id" ] diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-oss.json index 10b76d8d8680..fb401399b204 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-oss.json +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/resources/expected-spec-oss.json @@ -103,6 +103,11 @@ "description" : "The AWS Account ID associated with the Glue service used by the Iceberg catalog.", "title" : "AWS Account ID", "order" : 1 + }, + "role_arn" : { + "type" : "string", + "description" : "The Role ARN.", + "title" : "Role ARN" } }, "required" : [ "catalog_type", "glue_id" ] diff --git a/docs/integrations/destinations/s3-data-lake.md b/docs/integrations/destinations/s3-data-lake.md index 8781386e9947..2eace268e40b 100644 --- a/docs/integrations/destinations/s3-data-lake.md +++ b/docs/integrations/destinations/s3-data-lake.md @@ -15,8 +15,9 @@ for more information.
Expand to review -| Version | Date | Pull Request | Subject | -|:-----------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------| +| 0.2.7 | 2025-01-00 | [\#50957](https://github.com/airbytehq/airbyte/pull/50991) | Add support for GLUE RBAC (Assume role) | +| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |